I am running a dask.distributed cluster .
My task includes chains of calculations, where the last step is parallel processing of the list created in the previous steps using the method Executor.map. The length of the list is not known in advance because it is generated from intermediate results in the calculation.
The code is as follows:
from distributed import Executor, progress
def process():
e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
port=config('SERVER_PORT')))
futures = []
gen_list1 = get_list_1()
gen_f1 = e.map(generate_1, gen_list1)
futures.append(gen_f1)
gen_list2 = get_list_2()
gen_f2 = e.map(generate_2, gen_list2)
futures.append(gen_f2)
m_list = e.submit(create_m_list) # m_list is created from gen_list1 and gen_list2
futures.append(m_list)
m_result = e.map(process_m_list, m_list)
futures.append(m_result)
return futures
if __name__ == '__main__':
r = process()
progress(r)
However, I get the error message TypeError: zip argument #1 must support iteration:
File "F:/wl/under_development/database/jobs.py", line 366, in start_job
match_result = e.map(process_m_list, m_list)
File "C:\Anaconda\lib\site-packages\distributed\executor.py", line 672, in map
iterables = list(zip(*zip(*iterables)))
TypeError: zip argument
gen_list1and gen_list2are calculated independently, but m_listis created from gen_list1and gen_list2and therefore depends on them.
.result() m_list, process, gen_list1 gen_list2.
._result m_list, " zip # 1 ". dask.delayed (m_result = e.map(process_m_list, delayed(m_list))).
dask.distributed , , . , SO, Google, , .
Python
Python 2.7.11 |Anaconda custom (64-bit)| (default, Feb 16 2016, 09:58:36) [MSC v.1500 64 bit (AMD64)] on win32