How many copies of the environment will sparkle?

I have a PySpark application which should develop about 5 GB of compressed data (lines). I use a small server with 12 cores (24 threads) and 72Gb in RAM. My PySpark program consists of only two operations with maps, using 3 very large regular expressions (3gb each already compiled) and loaded using pickle. Spark works offline with the worker and the master on the same machine.

My question is: does each variable replicate for each executing kernel? Since it uses all available memory and then uses a lot of swap space. Or perhaps it loads all partitions into RAM? RDD contains about 10 million lines to look for using 3 regular expressions. RDD has about 1000 sections. I cannot finish this task because after a few minutes the memory is full and the spark will start to use the swap space, becoming very slow. I noticed that without regular expression the situation is the same.

This is my code, it removes all useless fields of twitter tweets and looks through texts and descriptions of tweets for certain words:

import json
import re
import twitter_util as twu
import pickle

from pyspark import SparkContext
sc = SparkContext()

prefix = '/home/lucadiliello'

source = prefix + '/data/tweets'
dest = prefix + '/data/complete_tweets'

#Regex path
companies_names_regex = prefix + '/data/comp_names_regex'
companies_names_dict = prefix + '/data/comp_names_dict'
companies_names_dict_to_legal = prefix + '/data/comp_names_dict_to_legal'

#Loading the regex's
comp_regex = pickle.load(open(companies_names_regex))
comp_dict = pickle.load(open(companies_names_dict))
comp_dict_legal = pickle.load(open(companies_names_dict_to_legal))

#Loading the RDD from textfile 
tx = sc.textFile(source).map(lambda a: json.loads(a))


def get_device(input_text):
    output_text = re.sub('<[^>]*>', '', input_text)
    return output_text

def filter_data(a):
    res = {}
    try:
        res['mentions'] = a['entities']['user_mentions']
        res['hashtags'] = a['entities']['hashtags']
        res['created_at'] = a['created_at'] 
        res['id'] = a['id'] 

        res['lang'] = a['lang']
        if 'place' in a and a['place'] is not None:      
            res['place'] = {} 
            res['place']['country_code'] = a['place']['country_code'] 
            res['place']['place_type'] = a['place']['place_type'] 
            res['place']['name'] = a['place']['name'] 
            res['place']['full_name'] = a['place']['full_name']

        res['source'] = get_device(a['source'])
        res['text'] = a['text'] 
        res['timestamp_ms'] = a['timestamp_ms'] 

        res['user'] = {} 
        res['user']['created_at'] = a['user']['created_at'] 
        res['user']['description'] = a['user']['description'] 
        res['user']['followers_count'] = a['user']['followers_count'] 
        res['user']['friends_count'] = a['user']['friends_count']
        res['user']['screen_name'] = a['user']['screen_name']
        res['user']['lang'] = a['user']['lang']
        res['user']['name'] = a['user']['name']
        res['user']['location'] = a['user']['location']
        res['user']['statuses_count'] = a['user']['statuses_count']
        res['user']['verified'] = a['user']['verified']
        res['user']['url'] = a['user']['url']
    except KeyError:
        return []

    return [res]


results = tx.flatMap(filter_data)


def setting_tweet(tweet):

    text = tweet['text'] if tweet['text'] is not None else ''
    descr = tweet['user']['description'] if tweet['user']['description'] is not None else ''
    del tweet['text']
    del tweet['user']['description']

    tweet['text'] = {}
    tweet['user']['description'] = {}
    del tweet['mentions']

    #tweet
    tweet['text']['original_text'] = text
    tweet['text']['mentions'] = twu.find_retweet(text)
    tweet['text']['links'] = []
    for j in twu.find_links(text):
        tmp = {}
        try:
            tmp['host'] = twu.get_host(j)
            tmp['link'] = j
            tweet['text']['links'].append(tmp)
        except ValueError:
            pass

    tweet['text']['companies'] = []
    for x in comp_regex.findall(text.lower()):
        tmp = {}
        tmp['id'] = comp_dict[x.lower()]
        tmp['name'] = x
        tmp['legalName'] = comp_dict_legal[x.lower()]
        tweet['text']['companies'].append(tmp)

    # descr
    tweet['user']['description']['original_text'] = descr
    tweet['user']['description']['mentions'] = twu.find_retweet(descr)
    tweet['user']['description']['links'] = []
    for j in twu.find_links(descr):
        tmp = {}
        try:
            tmp['host'] = twu.get_host(j)
            tmp['link'] = j
            tweet['user']['description']['links'].append(tmp)
        except ValueError:
            pass

    tweet['user']['description']['companies'] = []
    for x in comp_regex.findall(descr.lower()):
        tmp = {}
        tmp['id'] = comp_dict[x.lower()]
        tmp['name'] = x
        tmp['legalName'] = comp_dict_legal[x.lower()]
        tweet['user']['description']['companies'].append(tmp)

    return tweet


res = results.map(setting_tweet)

res.map(lambda a: json.dumps(a)).saveAsTextFile(dest, compressionCodecClass="org.apache.hadoop.io.compress.BZip2Codec")

1 (72gb) swap (72gb) . .

2 , , 10 144 ! (72gb RAM + 72Gb Swap)

+6
1

: -?

!

() , Python.


, comp_regex, comp_dict comp_dict_legal pickle.

+1

Source: https://habr.com/ru/post/1017236/


All Articles