How to enable dynamic requirements in Luigi?

I built a task pipeline in Luigi. Since this pipeline will be used in different contexts, you may need to include more tasks at the beginning or end of the pipeline, or even completely different dependencies between tasks.

What when I thought, “Hey, why declare dependencies between tasks in my configuration file?”, So I added something like this to my config.py:

PIPELINE_DEPENDENCIES = {
     "TaskA": [],
     "TaskB": ["TaskA"],
     "TaskC": ["TaskA"],
     "TaskD": ["TaskB", "TaskC"]
}

I was annoyed by the presence of these summation parameters in all tasks, so at some point I entered only one parameter task_configthat everyone Taskhas and where each information or data needed for is stored run(), so I put it PIPELINE_DEPENDENCIESthere.

Finally, I would like everyone to Taskdefine inheritance with both the luigi.Taskregular and the Mixin class, which would implement a dynamic requires()one that looks something like this:

class TaskRequirementsFromConfigMixin(object):
    task_config = luigi.DictParameter()

    def requires(self):
        required_tasks = self.task_config["PIPELINE_DEPENDENCIES"]
        requirements = [
            self._get_task_cls_from_str(required_task)(task_config=self.task_config)
            for required_task in required_tasks
        ]
        return requirements

    def _get_task_cls_from_str(self, cls_str):
        ...

Unfortunately, this does not work, since running the pipeline gives me the following:

===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 were left pending, among these:
    * 4 was not granted run permission by the scheduler:
        - 1 TaskA(...)
        - 1 TaskB(...)
        - 1 TaskC(...)
        - 1 TaskD(...)

Did not run any tasks
This progress looks :| because there were tasks that were not granted run permission by the scheduler

===== Luigi Execution Summary =====

and more

DEBUG: Not all parameter values are hashable so instance isn't coming from the cache

Although I'm not sure if this is important.

So: 1. What is my mistake? Is this a fix? 2. Is there any other way to achieve this?

+4
source share

Source: https://habr.com/ru/post/1671285/


All Articles