How can I say whether AsyncResult will never be ready?

Scenario. I have a process pool that I set. However, if a subprocess is killed while the task is executing, the object AsyncResultwill never be marked as finished. I would hope that this would mean that he would be ready and unsuccessful.

To reproduce this:

>>> import multiprocessing
>>> import time
>>> p = multiprocessing.Pool(processes=1)
>>> result = p.apply_async(time.sleep, args=(1000,))
>>> result.ready()
False

In another shell, find the process ID and kill it.

>>> result.ready()
False
>>> result.wait(5) # Waits 5 seconds even though subprocess is dead

This is a problem because I have a thread waiting for the job to complete, and it usually has a rather long timeout. How can I end a call result.wait(timeout)without waiting for the timeout to end? In addition, how can I say that it was left, and not only that the task is still working, but we have reached a timeout?

+4
2

Pebble . .

.

from pebble import ProcessPool
from concurrent.futures import TimeoutError

with ProcessPool() as pool:
    future = pool.schedule(time.sleep, args=(1000,), timeout=100)

    try:
        results = future.result()
        print(results)
    except TimeoutError as error:
        print("Function took longer than %d seconds" % error.args[1])
    except ProcessExpired as error:
        print("%s. Exit code: %d" % (error, error.exitcode))
    except Exception as error:
        print("function raised %s" % error)
        print(error.traceback)  # Python traceback of remote process

.

+1

result.wait() , timeout, . , kill -9 [pid], .

, , "" ready(). , , , ready() False, .

, , pid . ApplyResult pid, . - :

def test(identifier):
    pid = os.getpid()

    f = open("pids/" + str(pid), "w")
    f.write(str(identifier))
    f.close()

    # do stuff
    time.sleep(1000)

, ( jobs = []).

job = (identifier, pool.apply_async(test, (identifier,)))
jobs.append(job)

, , , ApplyResult pid.

, (pid):

def is_alive(pid):
    return os.path.exists("/proc/" + str(pid))

for pid in os.listdir("pids"):
    if is_alive(pid):
        ...
    else:
        ...

pid-named . , identifier, jobs, , ApplyResult pid , ready(), .


.

r, w = os.pipe()

def child():
    global r, w

    data = ...
    time.sleep(100)

    os.close(r)
    w = os.fdopen(w, "w")
    w.write(data)
    w.close()

.

def parent(child_pid):
    global r, w

    os.close(w)

    r = os.fdopen(r)
    data = r.read()
    r.close()

    status = os.waitpid(child_pid, 0)
    if status == 0:
        # Everything is fine
    elif status == 9:
        # kill -9 [pid]

    # Process data

status data, , .

.

if __name__ == "__main__":
    child_pid = os.fork()
    if child_pid:
        parent(child_pid)
    else:
        child()

Unix. . , - -Python 2.7 , .

+1

Source: https://habr.com/ru/post/1668463/


All Articles