How to make a quick request to Riak MapReduce?

How to make our MapReduce queries faster?

We created the application using five clusters of the Riak DB node. Our data model consists of three buckets: matches, leagues and teams.

Matches contain links to leagues and teams:

Model

var match = { id: matchId, leagueId: meta.leagueId, homeTeamId: meta.homeTeamId, awayTeamId: meta.awayTeamId, startTime: m.match.startTime, firstHalfStartTime: m.match.firstHalfStartTime, secondHalfStartTime: m.match.secondHalfStartTime, score: { goals: { a: 1*safeGet(m.match, 'score.goals.a'), b: 1*safeGet(m.match, 'score.goals.b') }, corners: { a: 1*safeGet(m.match, 'score.corners.a'), b: 1*safeGet(m.match, 'score.corners.b') } } }; var options = { index: { leagueId: match.leagueId, teamId: [match.homeTeamId, match.awayTeamId], startTime: match.startTime || match.firstHalfStartTime || match.secondHalfStartTime }, links: [ { bucket: 'leagues', key: match.leagueId, tag: 'league' }, { bucket: 'teams', key: match.homeTeamId, tag: 'home' }, { bucket: 'teams', key: match.awayTeamId, tag: 'away' } ] }; match.model = 'match'; modelCache.save('matches', match.id, match, options, callback); 

Inquiries

We are writing a query that returns results from several buckets, one way is to query each bucket separately. Another way is to use links to combine results from a single query.

The two versions of the query we tried took second place, no matter how small the size of our bucket. The first version uses two phases of the map, which we modeled after this message ( Practical map-Zoom out: forwarding and collection ).

 #!/bin/bash curl -X POST \ -H "content-type: application/json" \ -d @- \ http://localhost:8091/mapred \ <<EOF { "inputs":{ "bucket":"matches", "index":"startTime_bin", "start":"2012-10-22T23:00:00", "end":"2012-10-24T23:35:00" }, "query": [ {"map":{"language": "javascript", "source":" function(value, keydata, arg){ var match = Riak.mapValuesJson(value)[0]; var links = value.values[0].metadata.Links; var result = links.map(function(l) { return [l[0], l[1], match]; }); return result; } "} }, {"map":{"language": "javascript", "source": " function(value, keydata, arg) { var doc = Riak.mapValuesJson(value)[0]; return [doc, keydata]; } "} }, {"reduce":{ "language": "javascript", "source":" function(values) { var merged = {}; values.forEach(function(v) { if(!merged[v.id]) { merged[v.id] = v; } }); var results = []; for(key in merged) { results.push(merged[key]); } return results; } " } } ] } EOF 

In the second version, we make four separate Map-Reduce queries to get objects from three buckets:

 async.series([ //First get all matches function(callback) { db.mapreduce .add(inputs) .map(function (val, key, arg) { var data = Riak.mapValuesJson(val)[0]; if(arg.leagueId && arg.leagueId != data.leagueId) { return []; } var d = new Date(); var date = data.startTime || data.firstHalfStartTime || data.secondHalfStartTime; d.setFullYear(date.substring(0, 4)); d.setMonth(date.substring(5, 7) - 1); d.setDate(date.substring(8, 10)); d.setHours(date.substring(11, 13)); d.setMinutes(date.substring(14, 16)); d.setSeconds(date.substring(17, 19)); d.setMilliseconds(0); startTimestamp = d.getTime(); var short = { id: data.id, l: data.leagueId, h: data.homeTeamId, a: data.awayTeamId, t: startTimestamp, s: data.score, c: startTimestamp }; return [short]; }, {leagueId: query.leagueId, page: query.page}).reduce(function (val, key) { return val; }).run(function (err, matches) { matches.forEach(function(match) { result.match[match.id] = match; //Should maybe filter this leagueIds.push(match.l); teamIds.push(match.h); teamIds.push(match.a); }); callback(); }); }, //Then get all leagues, teams and lines in parallel function(callback) { async.parallel([ //Leagues function(callback) { db.getMany('leagues', leagueIds, function(err, leagues) { if (err) { callback(err); return; } leagues.forEach(function(league) { visibleLeagueIds[league.id] = true; result.league[league.id] = { r: league.regionId, n: league.name, s: league.name }; }); callback(); }); }, //Teams function(callback) { db.getMany('teams', teamIds, function(err, teams) { if (err) { callback(err); return; } teams.forEach(function(team) { result.team[team.id] = { n: team.name, h: team.name, s: team.stats }; }); callback(); }); } ], callback); } ], function(err) { if (err) { callback(err); return; } _.each(regionModel.getAll(), function(region) { result.region[region.id] = { id: region.id, c: 'https://d1goqbu19rcwi8.cloudfront.net/icons/silk-flags/' + region.icon + '.png', n: region.name }; }); var response = { success: true, result: { modelRecords: result, paging: { page: query.page, pageSize: 50, total: result.match.length }, time: moment().diff(a)/1000.00, visibleLeagueIds: visibleLeagueIds } }; callback(null, JSON.stringify(response, null, '\t')); }); 

How to make these requests faster?

Additional Information:

We use riak-js and node.js to fulfill our requests.

+4
source share
1 answer

One way to do this, at least a little faster, is to deploy the mapreduce JavaScript functions on the server instead of passing them as part of the job. (see description of js_source_dir parameter here ). This is usually recommended if you have JavaScript functions that you re-run.

Since there is some overhead associated with running JavaScript mapreduce functions compared to native ones implemented in Erlang, using functions other than JavaScript, where possible, can also help.

The two phase map functions in your first request appear to be designed to limit the limitations that the normal linking phase (which, in my opinion, is more efficient) does not go over the record being processed (record of matches). The first function includes all links and passes the matching data in the form of additional data in the form of JSON, and the second passes the match data, as well as the associated record in the form of JSON.

I wrote a simple Erlang function, which includes all the links, as well as the identifier of the transmitted record. This can be used in conjunction with Erlang's own function riak_kv_mapreduce: map_object_value to replace the two map phase functions in your first example, removing some of the use of JavaScript. As with the existing solution, I expect you to get multiple duplicates, as multiple matches may refer to the same league / team.

 -module(riak_mapreduce_example). -export([map_link/3]). %% @spec map_link(riak_object:riak_object(), term(), term()) -> %% [{{Bucket :: binary(), Key :: binary()}, Props :: term()}] %% @doc map phase function for adding linked records to result set map_link({error, notfound}, _, _) -> []; map_link(RiakObject, Props, _) -> Bucket = riak_object:bucket(RiakObject), Key = riak_object:key(RiakObject), Meta = riak_object:get_metadata(RiakObject), Current = [{{Bucket, Key}, Props}], Links = case dict:find(<<"Links">>, Meta) of {ok, List} -> [{{B, K}, Props} || {{B, K}, _Tag} <- List]; error -> [] end, lists:append([Current, Links]). 

The results of these actions can be sent back to the client for aggregation or transferred to the reduction phase function, as in the example that you provided.

The sample function must be compiled and installed on all nodes and may require a reboot.

Another way to improve performance (which may not be a good option for you) may be to change the data model to avoid having to use mapreduce queries for critical performance queries.

+7
source

Source: https://habr.com/ru/post/1441946/


All Articles