I need help building my brains around creating a (efficient) chain of marks in a spark (via python). I wrote this as best as possible, but the code I came up with does not scale. Basically, for different stages of the map, I wrote custom functions, and they work fine for sequences of several thousand, but when we get in 20,000+ (and I have up to 800 thousand) things are slower than scanning.
For those of you who are not familiar with markov moodels, this is the essence of this.
This is my data. I have actual data (without a header) in RDD at the moment.
ID, SEQ
500, HNL, LNH, MLH, HML
We look at sequences in tuples, therefore
(HNL, LNH), (LNH,MLH), etc..
And I need to go to this point .. where I return the dictionary (for each row of data), which I then serialize and store in the database in memory.
{500:
{HNLLNH : 0.333},
{LNHMLH : 0.333},
{MLHHML : 0.333},
{LNHHNL : 0.000},
etc..
}
Thus, in fact, each sequence is combined with the next (HNL, LNH becomes "HNLLNH"), then for all possible transitions (sequence combinations) we calculate their appearance, and then divide by the total number of transitions (3 in this case) and get their frequency of occurrence.
There were 3 transitions above, and one of them was HNLLNH .. So, for HNLLNH, 1/3 = 0.333
As a side, no, and I'm not sure that this is relevant, but the values for each position in the sequence are limited. 1st position (H / M / L), 2nd position (M / L), 3rd (H, M, L).
, () rdd, , . , [1] [2], [2] [3], [3] [4] .., - ..
[HNLLNH],[LNHMLH],[MHLHML], etc..
, , , len (list), . ( 2- , ).
, , 100 +.
, , . 10-20 . , 500-800 000 .
, pyspark ( API map/reduce/agg/etc..), ?
. , . , , (Python Spark), , , .
def f(x):
cust_id = x[0]
trans = ','.join(x[1])
y = trans.split(",")
s = ''
for i in range(len(y)-1):
s= s + str(y[i] + str(y[i+1]))+","
return str(cust_id+','+s[:-1])
def g(x):
cust_id=str(x.split(",")[0])
trans = x.split(",")[1:]
temp_list=[]
middle = int((len(trans[0])+1)/2)
for i in trans:
temp_list.append( (''.join(i)[:middle], ''.join(i)[middle:]) )
state_trans = {}
for i in temp_list:
state_trans[i] = temp_list.count(i)/(len(temp_list))
my_dict = {}
my_dict[cust_id]=state_trans
return my_dict
def gen_tsm_dict_spark(lines):
cust_trans = lines.map(lambda s: (s.split(",")[0],s.split(",")[1:]))
with_seq = cust_trans.map(f)
full_tsm_dict = with_seq.map(g)
return full_tsm_dict
def main():
result = gen_tsm_spark(my_rdd)
for x in result.collect():
for k,v in x.iteritems():
db_insert(k,v)