How to run luigi task using spark-submit and pyspark

I have a luigi python task that includes some pyspark libraries. Now I would like to submit this task to mesos using spark-submit. What to do to launch it? Below is my code skeleton:

from pyspark.sql import functions as F
from pyspark import SparkContext

class myClass(SparkSubmitTask):
# date = luigi.DateParameter()

  def __init__(self, date):
    self.date = date # date is datetime.date.today().isoformat()

  def output(self):

  def input(self):

  def run(self):
    # Some functions are using pyspark libs

if __name__ == "__main__":
  luigi.run()

Without luigi, I subordinate this task to the following command line:

/opt/spark/bin/spark-submit --master mesos://host:port --deploy-mode cluster --total-executor-cores 1 --driver-cores 1 --executor-memory 1G --driver-memory 1G my_module.py

Now the problem is how can I fix the sending of the luigi task, which includes the luigi command line, for example:

luigi --module my_module myClass --local-scheduler --date 2016-01

Another question is, should my_module.py first complete the required task, do I need to do something for this, or just install the same as the current command line?

I really appreciate any hints or suggestions about this. Thank you so much.

+4
1

Luigi . PySparkTask. :

https://github.com/spotify/luigi/blob/master/luigi/contrib/spark.py.

, luigi, :

import my_module


class MyPySparkTask(PySparkTask):
    date = luigi.DateParameter()

    @property
    def name(self):
        return self.__class__.__name__

    @property
    def master(self):
        return 'mesos://host:port'

    @property
    def deploy_mode(self):
        return 'cluster'

    @property
    def total_executor_cores(self):
        return 1

    @property
    def driver_cores(self):
        return 1

    @property
    def executor-memory(self):
        return 1G

    @property
    def driver-memory(self):
        return 1G

    def main(self, sc, *args):
        my_module.run(sc)

    def self.app_options():
        return [date]

:   luigi --module task_module MyPySparkTask --local-scheduler --date 2016-01

client.cfg, PySparkTasks:

[spark]
master: mesos://host:port
deploy_mode: cluster
total_executor_cores: 1
driver_cores: 1
executor-memory: 1G
driver-memory: 1G
+5

Source: https://habr.com/ru/post/1653105/


All Articles