Python geoip2 library not working in pySpark map function

I am using the python geoip2 library and pySpark to get the geographic address of some IP addresses. My code is similar:

geoDBpath = 'somePath/geoDB/GeoLite2-City.mmdb' geoPath = os.path.join(geoDBpath) sc.addFile(geoPath) reader = geoip2.database.Reader(SparkFiles.get(geoPath)) def ip2city(ip): try: city = reader.city(ip).city.name except: city = 'not found' return city 

I tried

 print ip2city("128.101.101.101") 

It works. But when I tried to do this in rdd.map:

 rdd = sc.parallelize([ip1, ip2, ip3, ip3, ...]) print rdd.map(lambda x: ip2city(x)) 

Reported

  Traceback (most recent call last): File "/home/worker/software/spark/python/pyspark/rdd.py", line 1299, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "/home/worker/software/spark/python/pyspark/context.py", line 916, in runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "/home/worker/software/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/home/worker/software/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/worker/software/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main command = pickleSer._read_with_length(infile) File "/home/worker/software/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length return self.loads(obj) File "/home/worker/software/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads return pickle.loads(obj) TypeError: Required argument 'fileno' (pos 1) not found at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 

Can someone tell me how to make the ip2city function work in rdd.map (). Thanks!

+5
source share
1 answer

It seems that the problem with your code comes from the reader object. It cannot be correctly serialized as part of the closure and sent to the workers. To handle this, you create an instance for workers. One way to handle this is to use mapPartitions :

 from pyspark import SparkFiles geoDBpath = 'GeoLite2-City.mmdb' sc.addFile(geoDBpath) def partitionIp2city(iter): from geoip2 import database def ip2city(ip): try: city = reader.city(ip).city.name except: city = 'not found' return city reader = database.Reader(SparkFiles.get(geoDBpath)) return [ip2city(ip) for ip in iter] rdd = sc.parallelize(['128.101.101.101', '85.25.43.84']) rdd.mapPartitions(partitionIp2city).collect() ## ['Minneapolis', None] 
+6
source

Source: https://habr.com/ru/post/1236087/


All Articles