How to initialize in spark mode?

I want to geo-inspect my data in spark mode. For this, I use the geoIP MaxMind database.

What I want to do is initialize the geo-information base object once on each section, and then use this to search for the city associated with the IP address.

Does the spark have an initialization phase for each node, or should I check if the instance variable is undefined, and if so, initialize it before continuing? For instance. something like (this is python, but I want a scala solution):

class IPLookup(object): database = None def getCity(self, ip): if not database: self.database = self.initialise(geoipPath) ... 

Of course, this requires a spark that serializes the entire object, which warns docs.

+5
source share
3 answers

This seems like a good use of the broadcast variable. Have you looked at the documentation for this functionality, and if you have it, does it not meet your requirements in any way?

+1
source

In Spark, partition operations can be performed using:

 def mapPartitions[U](f: (Iterator[T]) β‡’ Iterator[U], preservesPartitioning: Boolean = false) 

This handler will execute the function f once per section on the element iterator. The idea is that the cost of setting up resources (for example, connecting to the database) will be offset by the use of such resources over several elements in the iterator.

Example:

 val logsRDD = ??? logsRDD.mapPartitions{iter => val geoIp = new GeoIPLookupDB(...) // this is local map over the iterator - do not confuse with rdd.map iter.map(elem => (geoIp.resolve(elem.ip),elem)) } 
+5
source

As @bearrito mentioned - you can use the download of your GeoDB and then broadcast it from your driver. Another option is to provide an external service that you can use to search. This could be a memory cache, such as Redis / Memcached / Tacheyon or a regular data store.

0
source

Source: https://habr.com/ru/post/1207426/


All Articles