Simple Data Subsetting Using MapReduce
This example shows how to extract a subset of a large data set.
There are two aspects of subsetting, or performing a query. One is selecting a subset of the variables (columns) in the data set. The other is selecting a subset of the observations, or rows.
In this example, the selection of variables takes place in the definition of the datastore. (The map function could perform a further sub-selection of variables, but that is not within the scope of this example). In this example, the role of the map function is to perform the selection of observations. The role of the reduce function is to concatenate the subsetted records extracted by each call to the map function. This approach assumes that the data set can fit in memory after the Map phase.
Prepare Data
Create a datastore using the airlinesmall.csv
data set. This 12-megabyte data set contains 29 columns of flight information for several airline carriers, including arrival and departure times. This example uses 15 variables out of the 29 variables available in the data.
ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA'); ds.SelectedVariableNames = ds.VariableNames([1 2 5 9 12 13 15 16 17 ... 18 20 21 25 26 27]); ds.SelectedVariableNames
ans = 1x15 cell
{'Year'} {'Month'} {'DepTime'} {'UniqueCarrier'} {'ActualElapsedTime'} {'CRSElapsedTime'} {'ArrDelay'} {'DepDelay'} {'Origin'} {'Dest'} {'TaxiIn'} {'TaxiOut'} {'CarrierDelay'} {'WeatherDelay'} {'NASDelay'}
The datastore treats 'NA'
values as missing, and replaces the missing values with NaN
values by default. Additionally, the SelectedVariableNames
property allows you to work with only the specified variables of interest, which you can verify using preview
.
preview(ds)
ans=8×15 table
Year Month DepTime UniqueCarrier ActualElapsedTime CRSElapsedTime ArrDelay DepDelay Origin Dest TaxiIn TaxiOut CarrierDelay WeatherDelay NASDelay
____ _____ _______ _____________ _________________ ______________ ________ ________ _______ _______ ______ _______ ____________ ____________ ________
1987 10 642 {'PS'} 53 57 8 12 {'LAX'} {'SJC'} NaN NaN NaN NaN NaN
1987 10 1021 {'PS'} 63 56 8 1 {'SJC'} {'BUR'} NaN NaN NaN NaN NaN
1987 10 2055 {'PS'} 83 82 21 20 {'SAN'} {'SMF'} NaN NaN NaN NaN NaN
1987 10 1332 {'PS'} 59 58 13 12 {'BUR'} {'SJC'} NaN NaN NaN NaN NaN
1987 10 629 {'PS'} 77 72 4 -1 {'SMF'} {'LAX'} NaN NaN NaN NaN NaN
1987 10 1446 {'PS'} 61 65 59 63 {'LAX'} {'SJC'} NaN NaN NaN NaN NaN
1987 10 928 {'PS'} 84 79 3 -2 {'SAN'} {'SFO'} NaN NaN NaN NaN NaN
1987 10 859 {'PS'} 155 143 11 -1 {'SEA'} {'LAX'} NaN NaN NaN NaN NaN
Run MapReduce
The mapreduce
function requires a map function and a reduce function as inputs. The mapper receives blocks of data and outputs intermediate results. The reducer reads the intermediate results and produces a final result.
In this example, the mapper receives a table with the variables described by the SelectedVariableNames
property in the datastore. Then, the mapper extracts flights that had a high amount of delay after pushback from the gate. Specifically, it identifies flights with a duration exceeding 2.5 times the length of the scheduled duration. The mapper ignores flights prior to 1995, because some of the variables of interest for this example were not collected before that year.
Display the map function file.
function subsettingMapper(data, ~, intermKVStore) % Select flights from 1995 and later that had exceptionally long % elapsed flight times (including both time on the tarmac and time in % the air). idx = data.Year > 1994 & (data.ActualElapsedTime - data.CRSElapsedTime)... > 1.50 * data.CRSElapsedTime; intermVal = data(idx,:); add(intermKVStore,'Null',intermVal); end
The reducer receives the subsetted observations obtained from the mapper and simply concatenates them into a single table. The reducer returns one key (which is relatively meaningless) and one value (the concatenated table).
Display the reduce function file.
function subsettingReducer(~, intermValList, outKVStore) % get all intermediate results from the list outVal = {}; while hasnext(intermValList) outVal = [outVal; getnext(intermValList)]; end % Note that this approach assumes the concatenated intermediate values (the % subset of the whole data) fit in memory. add(outKVStore, 'Null', outVal); end
Use mapreduce
to apply the map and reduce functions to the datastore, ds
.
result = mapreduce(ds, @subsettingMapper, @subsettingReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 100%
mapreduce
returns an output datastore, result
, with files in the current folder.
Display Results
Look for patterns in the first 10 variables that were pulled from the data set. These variables identify the airline, the destination, and the arrival airports, as well as some basic delay information.
r = readall(result); tbl = r.Value{1}; tbl(:,1:10)
ans=37×10 table
Year Month DepTime UniqueCarrier ActualElapsedTime CRSElapsedTime ArrDelay DepDelay Origin Dest
____ _____ _______ _____________ _________________ ______________ ________ ________ _______ _______
1995 6 1601 {'US'} 162 58 118 14 {'BWI'} {'PIT'}
1996 6 1834 {'CO'} 241 75 220 54 {'IAD'} {'EWR'}
1997 1 730 {'DL'} 110 43 137 70 {'ATL'} {'GSP'}
1997 4 1715 {'UA'} 152 57 243 148 {'IND'} {'ORD'}
1997 9 2232 {'NW'} 143 50 115 22 {'DTW'} {'CMH'}
1997 10 1419 {'CO'} 196 58 157 19 {'DFW'} {'IAH'}
1998 3 2156 {'DL'} 139 49 146 56 {'TYS'} {'ATL'}
1998 10 1803 {'NW'} 291 81 213 3 {'MSP'} {'ORD'}
2000 5 830 {'WN'} 140 55 85 0 {'DAL'} {'HOU'}
2000 8 1630 {'CO'} 357 123 244 10 {'EWR'} {'CLT'}
2002 6 1759 {'US'} 260 67 192 -1 {'LGA'} {'BOS'}
2003 3 1214 {'XE'} 214 84 124 -6 {'GPT'} {'IAH'}
2003 3 604 {'XE'} 175 60 114 -1 {'LFT'} {'IAH'}
2003 4 1556 {'MQ'} 142 52 182 92 {'PIA'} {'ORD'}
2003 5 1954 {'US'} 127 48 78 -1 {'RDU'} {'CLT'}
2003 7 1250 {'FL'} 261 95 166 0 {'ATL'} {'IAD'}
⋮
Looking at the first record, a U.S. Air flight departed the gate 14 minutes after its scheduled departure time and arrived 118 minutes late. The flight experienced a delay of 104 minutes after pushback from the gate which is the difference between ActualElapsedTime
and CRSElapsedTime
.
There is one anomalous record. In February of 2006, a JetBlue flight had a departure time of 3:24 a.m. and an elapsed flight time of 1650 minutes, but an arrival delay of only 415 minutes. This might be a data entry error.
Otherwise, there are no clear cut patterns concerning when and where these exceptionally delayed flights occur. No airline, time of year, time of day, or single airport dominates. Some intuitive patterns, such as O'Hare (ORD) in the winter months, are certainly present.
Delay Patterns
Beginning in 1995, the airline system performance data began including measurements of how much delay took place in the taxi phases of a flight. Then, in 2003, the data also began to include certain causes of delay.
Examine these two variables in closer detail.
tbl(:,[1,7,8,11:end])
ans=37×8 table
Year ArrDelay DepDelay TaxiIn TaxiOut CarrierDelay WeatherDelay NASDelay
____ ________ ________ ______ _______ ____________ ____________ ________
1995 118 14 7 101 NaN NaN NaN
1996 220 54 12 180 NaN NaN NaN
1997 137 70 2 12 NaN NaN NaN
1997 243 148 4 38 NaN NaN NaN
1997 115 22 4 98 NaN NaN NaN
1997 157 19 6 95 NaN NaN NaN
1998 146 56 9 47 NaN NaN NaN
1998 213 3 11 205 NaN NaN NaN
2000 85 0 5 51 NaN NaN NaN
2000 244 10 4 273 NaN NaN NaN
2002 192 -1 6 217 NaN NaN NaN
2003 124 -6 13 131 NaN NaN NaN
2003 114 -1 8 106 NaN NaN NaN
2003 182 92 9 106 NaN NaN NaN
2003 78 -1 5 90 NaN NaN NaN
2003 166 0 11 170 0 0 166
⋮
For these exceptionally delayed flights, the great majority of delay occurs during taxi out, on the tarmac. Moreover, the major cause of the delay is NASDelay. NAS delays are holds imposed by the national aviation authorities on departures headed for an airport that is forecast to be unable to handle all scheduled arrivals at the time the flight is scheduled to arrive. NAS delay programs in effect at any given time are posted at https:/
/
nasstatus.
faa.
gov/
.
Preferably, when NAS delays are imposed, boarding of the aircraft is simply delayed. Such a delay would show up as a departure delay. However, for most of the flights selected for this example, the delays took place largely after departure from the gate, leading to a taxi delay.
Rerun MapReduce
The previous map function had the subsetting criteria hard-wired in the function file. A new map function would have to be written for any new query, such as flights departing San Francisco on a given day.
A generic mapper can be more adaptive by separating out the subsetting criteria from the map function definition and using an anonymous function to configure the mapper for each query. This generic mapper uses a fourth input argument that supplies the desired query variable.
Display the generic map function file.
function subsettingMapperGeneric(data, ~, intermKVStore, subsetter) intermKey = 'Null'; intermVal = data(subsetter(data), :); add(intermKVStore,intermKey,intermVal); end
Create an anonymous function that performs the same selection of rows that is hard-coded in subsettingMapper
.
inFlightDelay150percent = ... @(data) data.Year > 1994 & ... (data.ActualElapsedTime-data.CRSElapsedTime) > 1.50*data.CRSElapsedTime;
Since the mapreduce
function requires the map and reduce functions to accept exactly three inputs, use another anonymous function to specify the fourth input to the mapper, subsettingMapperGeneric
. Subsequently, you can use this anonymous function to call subsettingMapperGeneric
using only three arguments (the fourth is implicit).
configuredMapper = ... @(data, info, intermKVStore) subsettingMapperGeneric(data, info, ... intermKVStore, inFlightDelay150percent);
Use mapreduce
to apply the generic map function to the input datastore.
result2 = mapreduce(ds, configuredMapper, @subsettingReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 100%
mapreduce
returns an output datastore, result2
, with files in the current folder.
Verify Results
Confirm that the generic mapper gets the same result as with the hard-wired subsetting logic.
r2 = readall(result2); tbl2 = r2.Value{1}; if isequaln(tbl, tbl2) disp('Same results with the configurable mapper.') else disp('Oops, back to the drawing board.') end
Same results with the configurable mapper.
See Also
mapreduce
| tabularTextDatastore