How to use the future using the executor map method from dask.distrubuted (Python library)?

I am running a dask.distributed cluster .

My task includes chains of calculations, where the last step is parallel processing of the list created in the previous steps using the method Executor.map. The length of the list is not known in advance because it is generated from intermediate results in the calculation.

The code is as follows:

from distributed import Executor, progress


def process():
    e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
                                           port=config('SERVER_PORT')))
    futures = []
    gen_list1 = get_list_1()
    gen_f1 = e.map(generate_1, gen_list1)
    futures.append(gen_f1)

    gen_list2 = get_list_2()
    gen_f2 = e.map(generate_2, gen_list2)
    futures.append(gen_f2)

    m_list = e.submit(create_m_list)  # m_list is created from gen_list1 and gen_list2
                                      # some results of processing are stored in the database
                                      # and create_m_list doesn't need additional arguments
    futures.append(m_list)

    m_result = e.map(process_m_list, m_list)
    futures.append(m_result)

    return futures

if __name__ == '__main__':
    r = process()
    progress(r)

However, I get the error message TypeError: zip argument #1 must support iteration:

File "F:/wl/under_development/database/jobs.py", line 366, in start_job
  match_result = e.map(process_m_list, m_list)
File "C:\Anaconda\lib\site-packages\distributed\executor.py", line 672, in map
  iterables = list(zip(*zip(*iterables)))
TypeError: zip argument #1 must support iteration

gen_list1and gen_list2are calculated independently, but m_listis created from gen_list1and gen_list2and therefore depends on them.

.result() m_list, process, gen_list1 gen_list2.

._result m_list, " zip # 1 ". dask.delayed (m_result = e.map(process_m_list, delayed(m_list))).

dask.distributed , , . , SO, Google, , .

Python

Python 2.7.11 |Anaconda custom (64-bit)| (default, Feb 16 2016, 09:58:36) [MSC v.1500 64 bit (AMD64)] on win32

+4
1

, , :

m_list = e.submit(create_m_list)
m_result = e.map(process_m_list, m_list)

, . map. Dask , , . .result() :

m_list = e.submit(create_m_list)
m_result = e.map(process_m_list, m_list.result())

.result() m_list, gen_list1 gen_list2.

. - , . , create_m_list, , create_m_list.

m_list = e.submit(create_m_list)                   # give this highest priority
f1 = e.map(generate_1, get_list_1())
f2 = e.map(generate_2, gen_list_2())

L = m_list.result()                                # block on m_list until done
m_result = e.map(process_m_list, L)                # submit more tasks

return [f1, f2, m_result]
+1

Source: https://habr.com/ru/post/1652364/


All Articles