Celery dynamic tasks / hiding Celery implementation behind an interface

I am trying to figure out how to implement my asynchronous jobs with Celery without binding them to the Celery implementation.

If I have an interface that accepts objects for a schedule, for example, called calls (or an object that wraps the called):

ITaskManager(Interface): def schedule(task): #eventually run task 

And I could implement it with a tread module:

 ThreadingTaskManager(object) def schedule(task): Thread(task).start() # or similar 

But it seems that this cannot be done with celery, right?

+4
source share
1 answer

Perhaps one, albeit rather ugly, solution could be to define a single celery task that dynamically loads the task object, which is passed as an argument:

 @celery.task def taskrunner(taskname): taskModule = __import__(taskname) taskModule.run() CeleryTaskManager(object) def schedule(task): taskrunner.delay(task.__file__) from mytask import run CeleryTaskManager().schedule(run) 
+3
source

Source: https://habr.com/ru/post/1436960/


All Articles