Algorithmic / coding help for the Markov model PySpark

I need help building my brains around creating a (efficient) chain of marks in a spark (via python). I wrote this as best as possible, but the code I came up with does not scale. Basically, for different stages of the map, I wrote custom functions, and they work fine for sequences of several thousand, but when we get in 20,000+ (and I have up to 800 thousand) things are slower than scanning.

For those of you who are not familiar with markov moodels, this is the essence of this.

This is my data. I have actual data (without a header) in RDD at the moment.

ID, SEQ
500, HNL, LNH, MLH, HML

We look at sequences in tuples, therefore

(HNL, LNH), (LNH,MLH), etc..

And I need to go to this point .. where I return the dictionary (for each row of data), which I then serialize and store in the database in memory.

{500:
    {HNLLNH : 0.333},
    {LNHMLH : 0.333},
    {MLHHML : 0.333},
    {LNHHNL : 0.000},
    etc..
}

Thus, in fact, each sequence is combined with the next (HNL, LNH becomes "HNLLNH"), then for all possible transitions (sequence combinations) we calculate their appearance, and then divide by the total number of transitions (3 in this case) and get their frequency of occurrence.

There were 3 transitions above, and one of them was HNLLNH .. So, for HNLLNH, 1/3 = 0.333

As a side, no, and I'm not sure that this is relevant, but the values ​​for each position in the sequence are limited. 1st position (H / M / L), 2nd position (M / L), 3rd (H, M, L).

, () rdd, , . , [1] ​​ [2], [2] [3], [3] [4] .., - ..

[HNLLNH],[LNHMLH],[MHLHML], etc..

, , , len (list), . ( 2- , ).

, , 100 +.

, , . 10-20 . , 500-800 000 .

, pyspark ( API map/reduce/agg/etc..), ?

. , . , , (Python Spark), , , .

def f(x):
    # Custom RDD map function
    # Combines two separate transactions
    # into a single transition state

    cust_id = x[0]
    trans = ','.join(x[1])
    y = trans.split(",")
    s = ''
    for i in range(len(y)-1):
        s= s + str(y[i] + str(y[i+1]))+","
    return str(cust_id+','+s[:-1])

def g(x):
    # Custom RDD map function
    # Calculates the transition state probabilities
    # by adding up state-transition occurrences
    # and dividing by total transitions
    cust_id=str(x.split(",")[0])
    trans = x.split(",")[1:]
    temp_list=[]
    middle = int((len(trans[0])+1)/2)
    for i in trans:
        temp_list.append( (''.join(i)[:middle], ''.join(i)[middle:]) )

    state_trans = {}
    for i in temp_list:
            state_trans[i] = temp_list.count(i)/(len(temp_list))

    my_dict = {}
    my_dict[cust_id]=state_trans
    return my_dict


def gen_tsm_dict_spark(lines):
    # Takes RDD/string input with format CUST_ID(or)PROFILE_ID,SEQ,SEQ,SEQ....
    # Returns RDD of dict with CUST_ID and tsm per customer
    #  i.e.  {cust_id : { ('NLN', 'LNN') : 0.33, ('HPN', 'NPN') : 0.66}

    # creates a tuple ([cust/profile_id], [SEQ,SEQ,SEQ])
    cust_trans = lines.map(lambda s: (s.split(",")[0],s.split(",")[1:]))

    with_seq = cust_trans.map(f)

    full_tsm_dict = with_seq.map(g)

    return full_tsm_dict


def main():
result = gen_tsm_spark(my_rdd)

# Insert into DB
for x in result.collect():
    for k,v in x.iteritems():
         db_insert(k,v)
+4
2

- . tooolz, , Python.

from __future__ import division
from collections import Counter
from itertools import product
from toolz.curried import sliding_window, map, pipe, concat
from toolz.dicttoolz import merge

# Generate all possible transitions 
defaults = sc.broadcast(dict(map(
    lambda x: ("".join(concat(x)), 0.0), 
    product(product("HNL", "NL", "HNL"), repeat=2))))

rdd = sc.parallelize(["500, HNL, LNH, NLH, HNL", "600, HNN, NNN, NNN, HNN, LNH"])

def process(line):
    """
    >>> process("000, HHH, LLL, NNN")
    ('000', {'LLLNNN': 0.5, 'HHHLLL': 0.5})
    """
    bits = line.split(", ")
    transactions = bits[1:]
    n = len(transactions) - 1
    frequencies = pipe(
        sliding_window(2, transactions), # Get all transitions
        map(lambda p: "".join(p)), # Joins strings
        Counter, # Count 
        lambda cnt: {k: v / n for (k, v) in cnt.items()} # Get frequencies
    )
    return bits[0], frequencies

def store_partition(iter):
    for (k, v) in iter:
        db_insert(k, merge([defaults.value, v]))

rdd.map(process).foreachPartition(store_partition)

, . , , .

+1

, Pyspark, , Pyspark.

, , ,

ID, SEQ

500, [HNL, LNH, MLH, HML ...]

, , (HNL, LNH),(LNH, MLH)....

inputRDD..map(lambda (k, list): get_frequencies(list)).flatMap(lambda x: x) \
        .reduceByKey(lambda v1,v2: v1 +v2)


get_frequencies(states_list):
    """
    :param states_list: Its a list of Customer States.
    :return: Customer states probability as a Dictionary.
    """
    rest = []
    tuples_list = []

    for idx in range(0,len(states_list)):
        if idx + 1 < len(states_list):
            tuples_list.append((states_list[idx],states_list[idx+1]))

    unique = set(tuples_list)

    for value in unique:
        rest.append((value, tuples_list.count(value)))
    return rest

((HNL, LNH), 98),((LNH, MLH), 458),() ......

Dataframes RDDs Dataframes DB RDDs mapPartitions

0

Source: https://habr.com/ru/post/1607929/


All Articles