MapReduce on MongoDB collection becomes empty

I am trying to bring many large datasets into one collection, but I am having trouble writing the MapReduce function to get there.

Here's what my data looks like (here are 17 lines, in fact I have 4+ million):

{"user": 1, "day": 1, "type": "a", "sum": 10} {"user": 1, "day": 2, "type": "a", "sum": 32} {"user": 1, "day": 1, "type": "b", "sum": 11} {"user": 2, "day": 4, "type": "b", "sum": 2} {"user": 1, "day": 2, "type": "b", "sum": 1} {"user": 1, "day": 3, "type": "b", "sum": 9} {"user": 1, "day": 4, "type": "b", "sum": 12} {"user": 2, "day": 2, "type": "a", "sum": 3} {"user": 3, "day": 2, "type": "b", "sum": 81} {"user": 1, "day": 4, "type": "a", "sum": 22} {"user": 1, "day": 5, "type": "a", "sum": 39} {"user": 2, "day": 5, "type": "a", "sum": 8} {"user": 2, "day": 3, "type": "b", "sum": 1} {"user": 3, "day": 3, "type": "b", "sum": 99} {"user": 2, "day": 3, "type": "a", "sum": 5} {"user": 1, "day": 3, "type": "a", "sum": 41} {"user": 3, "day": 4, "type": "b", "sum": 106} ... 

I am trying to make it look like this at the end (an array for each type, where the contents are just the sums in the corresponding index defined by the day, if that day does not exist for this type, it's just 0):

 {"user": 1, "type_a_sums": [10, 32, 41, 22, 39], "type_b_sums": [11, 1, 9, 12, 0]} {"user": 2, "type_a_sums": [0, 3, 5, 0, 8], "type_b_sums": [0, 0, 1, 2, 0]} {"user": 3, "type_a_sums": [0, 0, 0, 0, 0], "type_b_sums": [0, 81, 99, 106, 0]} ... 

This is the MapReduce I tried:

 var mapsum = function(){ var output = {user: this.user, type_a_sums: [0, 0, 0, 0, 0], type_b_sums: [0, 0, 0, 0, 0], tempType: this.type, tempSum: this.sum, tempDay: this.day} if(this.type == "a") { output.type_a_sums[this.day-1] = this.sum; } if(this.type == "b") { output.type_b_sums[this.day-1] = this.sum; } emit(this.user, output); }; var r = function(key, values) { var outs = {user: 0, type_a_sums: [0, 0, 0, 0, 0], type_b_sums: [0, 0, 0, 0, 0], tempType: -1, tempSum: -1, tempDay: -1} values.forEach(function(v){ outs.user = v.user; if(v.tempType == "a") { outs.type_a_sums[v.tempDay-1] = v.tempSum; } if(v.tempType == "b") { outs.type_b_sums[v.tempDay-1] = v.tempSum; } }); return outs; }; res = db.sums.mapReduce(mapsum, r, {out: 'joined_sums'}) 

This gives me my result on a small sample, but when I run it on all 4 million, I get a ton of outputs that look like this:

 {"user": 1, "type_a_sums": [0, 0, 0, 0, 0], "type_b_sums": [0, 0, 0, 0, 0]} {"user": 2, "type_a_sums": [0, 3, 5, 0, 8], "type_b_sums": [0, 0, 1, 2, 0]} {"user": 3, "type_a_sums": [0, 0, 0, 0, 0], "type_b_sums": [0, 0, 0, 0, 0]} 

If most of the users , which should have sums in their arrays, are actually just populated with 0, which were in the dummy array in the reduce object of the outs function, before I populate it with the actual function.

Which is really strange if I run the exact same function in one collection, but only check one user res = db.sums.mapReduce(mapsum, r, {query: {user: 1}, out: 'joined_sums'}) , which, as I know, should have sums in its arrays, but was previously included as all 0, I actually get the result that I wanted for this user. Run it again for all 4 million and I will go back to 0 everywhere. It looks like he just writes all the work he has done with dummy placeholder arrays.

Do I have too much data? Can't this break through it in time? Or am I getting into some kind of barrier that I donโ€™t know about?

+4
source share
1 answer

Thanks for including a lot of details. There are several issues here.

Let it start from above.

I'm trying to make it look at the end

{"user": 2, "type_a_sums": [0, 3, 5, 0, 8], "type_b_sums": [0, 0, 1, 2, 0]}

In fact, it will look like this:

 { _id: { "user": 2 }, value: { "type_a_sums": [0, 3, 5, 0, 8], "type_b_sums": [0, 0, 1, 2, 0] } 

Note that _id like your "group", and value is like your "sum" columns.

So, problem # 1 is that you are using user as your key, but also part of your value. It's not obligatory. Decreasing will only reduce two values โ€‹โ€‹that share the same key; you also do not need this line: outs.user = v.user;

You also have problem # 2: the wrong reduce .

I think this is because reduce () is called more than once per key.

The purpose of reduce() is that it will be called multiple times. It should scale on different servers. Thus, one server can cause a reduction a couple of times, and these results can be combined and sent to another server.

Here's another way to look at it. The reduction takes an array of value objects and reduces them to a single value object .

There are some consequences here:

  • If I do reduce([a, b]) , it should be the same as reduce([b, a]) .
  • If I do reduce([a, reduce([b,c])) , it should be the same as reduce([reduce([a,b]), c])

So no matter what order I run them, or how many times the value decreases, it is always the same conclusion.

If you look at your code, this is not what happens. Just take a look at type_a_sums . What happens if I get the following two values to reduce?

 reduce([ [0,0,1,0,0], [0,2,0,0,0] ]) => ??? 

It seems to me that the conclusion should be [0,2,1,0,0] . If so, then you do not need all of these temp_X fields. Instead, you need to focus on emit correct arrays and then merge these arrays correctly.

+2
source

Source: https://habr.com/ru/post/1403317/


All Articles