Workers die early due to uneven distribution of work in Luigi (2.6.1)

We are trying to run a simple pipeline distributed in a docker cluster. Luigi workers are deployed as replicated dockers. They start successfully and a few seconds after a request to work on a luigi server, they begin to die due to the fact that they are not assigned any work, and all tasks are ultimately assigned to one employee.

We had to set keep_alive = True in our workers luigi.cfg so that they did not die, but keeping workers around after the pipeline was done seems to be a bad idea. Is there a way to control the distribution of work?

Our test pipeline:

class RunAllTasks(luigi.Task):

    tasks = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    def requires(self):
        for i in range(self.tasks):
            yield RunExampleTask(i, self.sleep_time)

    def run(self):
        with self.output().open('w') as f:
            f.write('All done!')

    def output(self):
        return LocalTarget('/data/RunAllTasks.txt')


class RunExampleTask(luigi.Task):

    number = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    @property
    def cmd(self):
        return """
               docker run --rm --name example_{number} hello-world
           """.format(number=self.number)

    def run(self):
        time.sleep(self.sleep_time)
        logger.debug(self.cmd)
        out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True)
        logger.debug(out)
        with self.output().open('w') as f:
            f.write(str(out))

    def output(self):
        return LocalTarget('/data/{number}.txt'.format(number=self.number))


if __name__ == "__main__":
    luigi.run()
+6
1

- yield , yield :

def requires(self):
    reqs = []
    for i in range(self.tasks):
        reqs.append(RunExampleTask(i, self.sleep_time))
    yield reqs
+1

Source: https://habr.com/ru/post/1017079/


All Articles