I have a PySpark application which should develop about 5 GB of compressed data (lines). I use a small server with 12 cores (24 threads) and 72Gb in RAM. My PySpark program consists of only two operations with maps, using 3 very large regular expressions (3gb each already compiled) and loaded using pickle
. Spark works offline with the worker and the master on the same machine.
My question is: does each variable replicate for each executing kernel? Since it uses all available memory and then uses a lot of swap space. Or perhaps it loads all partitions into RAM? RDD contains about 10 million lines to look for using 3 regular expressions. RDD has about 1000 sections. I cannot finish this task because after a few minutes the memory is full and the spark will start to use the swap space, becoming very slow. I noticed that without regular expression the situation is the same.
This is my code, it removes all useless fields of twitter tweets and looks through texts and descriptions of tweets for certain words:
import json
import re
import twitter_util as twu
import pickle
from pyspark import SparkContext
sc = SparkContext()
prefix = '/home/lucadiliello'
source = prefix + '/data/tweets'
dest = prefix + '/data/complete_tweets'
companies_names_regex = prefix + '/data/comp_names_regex'
companies_names_dict = prefix + '/data/comp_names_dict'
companies_names_dict_to_legal = prefix + '/data/comp_names_dict_to_legal'
comp_regex = pickle.load(open(companies_names_regex))
comp_dict = pickle.load(open(companies_names_dict))
comp_dict_legal = pickle.load(open(companies_names_dict_to_legal))
tx = sc.textFile(source).map(lambda a: json.loads(a))
def get_device(input_text):
output_text = re.sub('<[^>]*>', '', input_text)
return output_text
def filter_data(a):
res = {}
try:
res['mentions'] = a['entities']['user_mentions']
res['hashtags'] = a['entities']['hashtags']
res['created_at'] = a['created_at']
res['id'] = a['id']
res['lang'] = a['lang']
if 'place' in a and a['place'] is not None:
res['place'] = {}
res['place']['country_code'] = a['place']['country_code']
res['place']['place_type'] = a['place']['place_type']
res['place']['name'] = a['place']['name']
res['place']['full_name'] = a['place']['full_name']
res['source'] = get_device(a['source'])
res['text'] = a['text']
res['timestamp_ms'] = a['timestamp_ms']
res['user'] = {}
res['user']['created_at'] = a['user']['created_at']
res['user']['description'] = a['user']['description']
res['user']['followers_count'] = a['user']['followers_count']
res['user']['friends_count'] = a['user']['friends_count']
res['user']['screen_name'] = a['user']['screen_name']
res['user']['lang'] = a['user']['lang']
res['user']['name'] = a['user']['name']
res['user']['location'] = a['user']['location']
res['user']['statuses_count'] = a['user']['statuses_count']
res['user']['verified'] = a['user']['verified']
res['user']['url'] = a['user']['url']
except KeyError:
return []
return [res]
results = tx.flatMap(filter_data)
def setting_tweet(tweet):
text = tweet['text'] if tweet['text'] is not None else ''
descr = tweet['user']['description'] if tweet['user']['description'] is not None else ''
del tweet['text']
del tweet['user']['description']
tweet['text'] = {}
tweet['user']['description'] = {}
del tweet['mentions']
tweet['text']['original_text'] = text
tweet['text']['mentions'] = twu.find_retweet(text)
tweet['text']['links'] = []
for j in twu.find_links(text):
tmp = {}
try:
tmp['host'] = twu.get_host(j)
tmp['link'] = j
tweet['text']['links'].append(tmp)
except ValueError:
pass
tweet['text']['companies'] = []
for x in comp_regex.findall(text.lower()):
tmp = {}
tmp['id'] = comp_dict[x.lower()]
tmp['name'] = x
tmp['legalName'] = comp_dict_legal[x.lower()]
tweet['text']['companies'].append(tmp)
tweet['user']['description']['original_text'] = descr
tweet['user']['description']['mentions'] = twu.find_retweet(descr)
tweet['user']['description']['links'] = []
for j in twu.find_links(descr):
tmp = {}
try:
tmp['host'] = twu.get_host(j)
tmp['link'] = j
tweet['user']['description']['links'].append(tmp)
except ValueError:
pass
tweet['user']['description']['companies'] = []
for x in comp_regex.findall(descr.lower()):
tmp = {}
tmp['id'] = comp_dict[x.lower()]
tmp['name'] = x
tmp['legalName'] = comp_dict_legal[x.lower()]
tweet['user']['description']['companies'].append(tmp)
return tweet
res = results.map(setting_tweet)
res.map(lambda a: json.dumps(a)).saveAsTextFile(dest, compressionCodecClass="org.apache.hadoop.io.compress.BZip2Codec")
1 (72gb) swap (72gb) . .
2
, , 10 144 ! (72gb RAM + 72Gb Swap)