How to make our MapReduce queries faster?
We created the application using five clusters of the Riak DB node. Our data model consists of three buckets: matches, leagues and teams.
Matches contain links to leagues and teams:
Model
var match = { id: matchId, leagueId: meta.leagueId, homeTeamId: meta.homeTeamId, awayTeamId: meta.awayTeamId, startTime: m.match.startTime, firstHalfStartTime: m.match.firstHalfStartTime, secondHalfStartTime: m.match.secondHalfStartTime, score: { goals: { a: 1*safeGet(m.match, 'score.goals.a'), b: 1*safeGet(m.match, 'score.goals.b') }, corners: { a: 1*safeGet(m.match, 'score.corners.a'), b: 1*safeGet(m.match, 'score.corners.b') } } }; var options = { index: { leagueId: match.leagueId, teamId: [match.homeTeamId, match.awayTeamId], startTime: match.startTime || match.firstHalfStartTime || match.secondHalfStartTime }, links: [ { bucket: 'leagues', key: match.leagueId, tag: 'league' }, { bucket: 'teams', key: match.homeTeamId, tag: 'home' }, { bucket: 'teams', key: match.awayTeamId, tag: 'away' } ] }; match.model = 'match'; modelCache.save('matches', match.id, match, options, callback);
Inquiries
We are writing a query that returns results from several buckets, one way is to query each bucket separately. Another way is to use links to combine results from a single query.
The two versions of the query we tried took second place, no matter how small the size of our bucket. The first version uses two phases of the map, which we modeled after this message ( Practical map-Zoom out: forwarding and collection ).
#!/bin/bash curl -X POST \ -H "content-type: application/json" \ -d @- \ http://localhost:8091/mapred \ <<EOF { "inputs":{ "bucket":"matches", "index":"startTime_bin", "start":"2012-10-22T23:00:00", "end":"2012-10-24T23:35:00" }, "query": [ {"map":{"language": "javascript", "source":" function(value, keydata, arg){ var match = Riak.mapValuesJson(value)[0]; var links = value.values[0].metadata.Links; var result = links.map(function(l) { return [l[0], l[1], match]; }); return result; } "} }, {"map":{"language": "javascript", "source": " function(value, keydata, arg) { var doc = Riak.mapValuesJson(value)[0]; return [doc, keydata]; } "} }, {"reduce":{ "language": "javascript", "source":" function(values) { var merged = {}; values.forEach(function(v) { if(!merged[v.id]) { merged[v.id] = v; } }); var results = []; for(key in merged) { results.push(merged[key]); } return results; } " } } ] } EOF
In the second version, we make four separate Map-Reduce queries to get objects from three buckets:
async.series([ //First get all matches function(callback) { db.mapreduce .add(inputs) .map(function (val, key, arg) { var data = Riak.mapValuesJson(val)[0]; if(arg.leagueId && arg.leagueId != data.leagueId) { return []; } var d = new Date(); var date = data.startTime || data.firstHalfStartTime || data.secondHalfStartTime; d.setFullYear(date.substring(0, 4)); d.setMonth(date.substring(5, 7) - 1); d.setDate(date.substring(8, 10)); d.setHours(date.substring(11, 13)); d.setMinutes(date.substring(14, 16)); d.setSeconds(date.substring(17, 19)); d.setMilliseconds(0); startTimestamp = d.getTime(); var short = { id: data.id, l: data.leagueId, h: data.homeTeamId, a: data.awayTeamId, t: startTimestamp, s: data.score, c: startTimestamp }; return [short]; }, {leagueId: query.leagueId, page: query.page}).reduce(function (val, key) { return val; }).run(function (err, matches) { matches.forEach(function(match) { result.match[match.id] = match; //Should maybe filter this leagueIds.push(match.l); teamIds.push(match.h); teamIds.push(match.a); }); callback(); }); }, //Then get all leagues, teams and lines in parallel function(callback) { async.parallel([ //Leagues function(callback) { db.getMany('leagues', leagueIds, function(err, leagues) { if (err) { callback(err); return; } leagues.forEach(function(league) { visibleLeagueIds[league.id] = true; result.league[league.id] = { r: league.regionId, n: league.name, s: league.name }; }); callback(); }); }, //Teams function(callback) { db.getMany('teams', teamIds, function(err, teams) { if (err) { callback(err); return; } teams.forEach(function(team) { result.team[team.id] = { n: team.name, h: team.name, s: team.stats }; }); callback(); }); } ], callback); } ], function(err) { if (err) { callback(err); return; } _.each(regionModel.getAll(), function(region) { result.region[region.id] = { id: region.id, c: 'https://d1goqbu19rcwi8.cloudfront.net/icons/silk-flags/' + region.icon + '.png', n: region.name }; }); var response = { success: true, result: { modelRecords: result, paging: { page: query.page, pageSize: 50, total: result.match.length }, time: moment().diff(a)/1000.00, visibleLeagueIds: visibleLeagueIds } }; callback(null, JSON.stringify(response, null, '\t')); });
How to make these requests faster?
Additional Information:
We use riak-js and node.js to fulfill our requests.