How to use mapreduce to mass update data warehouse objects that satisfy a request?

I want to use the mapreduce library to update all objects that satisfy a request. There are several complications:

  • A request that detects that entities update checks if the value of a specific property "property1" is contained in a long list of values ​​(~ 10000 entries) from the csv file
  • For each object that satisfies the request, it is necessary to change another property "property2" so that it is equal to the value in the second column and the same line of the csv file

I know how to load a csv file into Blobstore and read each line using a Blobstore input reader. I am also aware of a data warehouse data reader that receives objects using a query.

My question is, how can I create a Mapper class that reads input from a Blobstore, retrieves data warehouse objects and updates them as efficiently as possible?

+6
source share
2 answers

Given that the list of possible values ​​for property1 is long, using a filter for a query does not seem like a good option (because you will need to use an IN filter that actually runs one query per value )

An alternative to using MR would be to load your CSV into memory using a map (from property1 to property2 ), and then run an MR task that iterates through all the objects, and if their property property1 is part of the keys on the map, change it using the associated value .

As @Ryan B says, you don't need to use MR for this if you just want to use batch puts, since you can use Iterable to put using the DatastoreService.

+3
source

You can use the DatastoreInputReader in the map function to find out if property1 is really in csv: Reading from csv will be very slow every time, what you can do is use memcache to provide this information after it is read only once from the Datastore's own model. To populate the data warehouse model, I would recommend using property1 as the user identifier of each row, so the query is straightforward. You would only update the Datastore for those values ​​that actually change and use the mutation pool to make it executive (op.db.Put ()). I leave you a pseudo-code (sorry ... I only have in python) about how the different parts will look, I also recommend that you read this article about Mapreduce in the Google App Engine: http://sookocheff.com/posts/2014 -04-15-app-engine-mapreduce-api-part-1-the-basics /

 #to get the to_dict method from google.appengine.ext import ndb from mapreduce import operation as op from mapreduce.lib import pipeline from mapreduce import mapreduce_pipeline class TouchPipeline(pipeline.Pipeline): """ Pipeline to update the field of entities that have certain condition """ def run(self, *args, **kwargs): """ run """ mapper_params = { "entity_kind": "yourDatastoreKind", } yield mapreduce_pipeline.MapperPipeline( "Update entities that have certain condition", handler_spec="datastore_map", input_reader_spec="mapreduce.input_readers.DatastoreInputReader", params=mapper_params, shards=64) class csvrow(ndb.Model): #you dont store property 1 because you are going to use its value as key substitutefield=ndb.StringProperty() def create_csv_datastore(): # instead of running this, make a 10,000 row function with each csv value, # or read it from the blobstore, iterate and update the values accordingly for i in range(10000): #here we are using our own key as id of this row and just storing the other column that #eventually will be subtitute if it matches csvrow.get_or_insert('property%s' % i, substitutefield = 'substitute%s').put() def queryfromcsv(property1): csvrow=ndb.Key('csvrow', property1).get() if csvrow: return csvrow.substitutefield else: return property1 def property1InCSV(property1): data = memcache.get(property1) if data is not None: return data else: data = self.queryfromcsv(property1) memcache.add(property1, data, 60) return data def datastore_map(entity_type): datastorepropertytocheck = entity_type.property1 newvalue = property1InCSV(datastorepropertytocheck) if newvalue!=datastoreproperty: entity_type.property11 = newvalue #use the mutation pool yield op.db.Put(entity) 
+2
source

Source: https://habr.com/ru/post/981033/


All Articles