Starting airflow on a web resource

I wanted to know if airflow tasks could be performed when receiving a request through HTTP. I'm not interested in the Airflow planning part. I just want to use it as a substitute for celery.

So, an example operation would be something like this.

  • The user submits a form requesting a report.
  • The backend receives the request and sends the user a notification that the request has been received.
  • Then the backend launches, using Airflow to start immediately.
  • Airflow then performs a series of DAG related tasks. For example, first pull the data from the redshift, pull the data from MySQL, do some operations on the two sets of results, combine them, and then upload the results to Amazon S3, send an email.

From everything I read on the Internet, you can run airflow ... jobs by doing airflow ... at a command prompt. I was wondering if there is a python api that can do the exact same thing.

Thanks.

+19
source share
5 answers

Airflow REST API Plugin will help you here. After you follow the instructions for installing the plugin, you just need to click the following URL: http://{HOST}:{PORT}/admin/rest_api/api/v1.0/trigger_dag?dag_id={dag_id}&run_id={run_id}&conf={url_encoded_json_parameters} , replacing dag_id with the identifier of your dag, either omitting run_id or specifying a unique identifier, and passing json to encode the url for conf (with any of the parameters you need in the running dag).

Here is an example JavaScript function that uses jQuery to invoke the Airflow api:

 function triggerDag(dagId, dagParameters){ var urlEncodedParameters = encodeURIComponent(dagParameters); var dagRunUrl = "http://airflow:8080/admin/rest_api/api/v1.0/trigger_dag?dag_id="+dagId+"&conf="+urlEncodedParameters; $.ajax({ url: dagRunUrl, dataType: "json", success: function(msg) { console.log('Successfully started the dag'); }, error: function(e){ console.log('Failed to start the dag'); } }); } 
+17
source

A new option in airflow is the experimental but integrated API endpoint in later builds 1.7 and 1.8. This allows you to run the REST service on the airflow server to listen on the port and accept cli jobs.

I have only limited experience, but I have successfully tried the tests. According to the docs:

/api/experimental/dags/<DAG_ID>/dag_runs creates a dag_run for this dag identifier (POST).

This will schedule an immediate start for any label you want to run. However, he still uses the scheduler, waiting for the heartbeat to see that the dag is working, and transfer tasks to the worker. This is exactly the same behavior as in the CLI, so I still believe that it is suitable for your use case.

The documentation for setting it up is available here: https://airflow.apache.org/api.html.

Github also has some simple client examples under airflow / api / clients

+11
source

You should look at the HTTP Airflow Sensor for your needs. You can use this to trigger a dag.

+6
source

For this purpose, you can use the Airflow REST API.

The following query will invoke the DAG:

 curl -X POST \ http://<HOST>:8080/api/experimental/dags/process_data/dag_runs \ -H 'Cache-Control: no-cache' \ -H 'Content-Type: application/json' \ -d '{"conf":"{\"START_DATE\":\"2018-06-01 03:00:00\", \"STOP_DATE\":\"2018-06-01 23:00:00\"}' 

The following query retrieves a list of Dag runs for a specific DAG ID:

 curl -i -H "Accept: application/json" -H "Content-Type: application/json" -X GET http://<HOST>:8080/api/experimental/dags/process_data/dag_runs 

For the GET API to work, set the rbac flag to True on airflow.cfg .

Below is a list of available APIs: here and there .

+2
source

Airflow has a REST API (currently experimental) - available here: https://airflow.apache.org/api.html#endpoints

If you do not want to install plugins, as suggested in other answers - here is the code how you can do it directly using the API:

 def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None): endpoint = '/api/experimental/dags/{}/dag_runs'.format(dag_id) url = urljoin(self._api_base_url, endpoint) data = self._request(url, method='POST', json={ "run_id": run_id, "conf": conf, "execution_date": execution_date, }) return data['message'] 

Additional examples of working with the airflow API in python are available here: https://github.com/apache/airflow/blob/master/airflow/api/client/json_client.py

0
source

Source: https://habr.com/ru/post/1264598/


All Articles