I create a wrapper for Luigi tasks, and I ran into a problem Registerthat is actually an ABC metaclass and not when I create a dynamic one type.
The following code, more or less, is what I use to develop a dynamic class.
class TaskWrapper(object):
'''Luigi Spark Factory from the provided JobClass
Args:
JobClass(ScrubbedClass): The job to wrap
options: Options as passed into the JobClass
'''
def __new__(self, JobClass, **options):
valid_classes = (
ScrubbedClass01,
)
if any(vc == JobClass for vc in valid_classes) or not issubclass(JobClass, valid_classes):
raise TypeError('Job is not the correct class: {}'.format(JobClass))
luigi_identifier = 'Task'
job_name = JobClass.__name__
job_name = job_name.replace('Pail', '')
if not job_name.endswith(luigi_identifier):
job_name += luigi_identifier
LuigiTask = type(job_name, (PySparkTask, ), {})
for k, v in options.items():
setattr(LuigiTask, k, luigi.Parameter())
def main(self, sc, *args):
job = JobClass(**options)
return job._run()
LuigiTask.main = main
return LuigiTask
When I run my call function, I get PicklingError: Can't pickle <class 'abc.ScrubbedNameTask'>: attribute lookup abc.ScrubbedNameTask failed.
Call Function:
def create_task(JobClass, **options):
LuigiTask = TaskWrapper(JobClass, **options)
parameters = {
d: options.get(d)
for d in dir(LuigiTask)
if not d.startswith('_')
if isinstance(getattr(LuigiTask, d), luigi.Parameter)
if d in options
}
task = LuigiTask(**parameters)
return task
source
share