How do you automate pyspark jobs on emr using boto3 (or otherwise)?

I create a task for parsing huge volumes of server data, and then upload it to the Redshift database.

My workflow is as follows:

  • Get log data from S3
  • Either use the dataframes spark or sql spark to analyze the data and write back to S3
  • Download data from S3 to Redshift.

I focus on how to automate this, so that my process depends on the EMR cluster, download the correct installation programs, and run my python script, which will contain code for parsing and writing.

Does anyone have examples, tutorials, or experiences that they could share with me to help me learn how to do this?

+5
source share
1 answer

Take a look at boto3 EMR docs to create a cluster. You essentially need to call run_job_flow and create steps that run the program you need.

 import boto3 client = boto3.client('emr', region_name='us-east-1') S3_BUCKET = 'MyS3Bucket' S3_KEY = 'spark/main.py' S3_URI = 's3://{bucket}/{key}'.format(bucket=S3_BUCKET, key=S3_KEY) # upload file to an S3 bucket s3 = boto3.resource('s3') s3.meta.client.upload_file("myfile.py", S3_BUCKET, S3_KEY) response = client.run_job_flow( Name="My Spark Cluster", ReleaseLabel='emr-4.6.0', Instances={ 'MasterInstanceType': 'm4.xlarge', 'SlaveInstanceType': 'm4.xlarge', 'InstanceCount': 4, 'KeepJobFlowAliveWhenNoSteps': True, 'TerminationProtected': False, }, Applications=[ { 'Name': 'Spark' } ], BootstrapActions=[ { 'Name': 'Maximize Spark Default Config', 'ScriptBootstrapAction': { 'Path': 's3://support.elasticmapreduce/spark/maximize-spark-default-config', } }, ], Steps=[ { 'Name': 'Setup Debugging', 'ActionOnFailure': 'TERMINATE_CLUSTER', 'HadoopJarStep': { 'Jar': 'command-runner.jar', 'Args': ['state-pusher-script'] } }, { 'Name': 'setup - copy files', 'ActionOnFailure': 'CANCEL_AND_WAIT', 'HadoopJarStep': { 'Jar': 'command-runner.jar', 'Args': ['aws', 's3', 'cp', S3_URI, '/home/hadoop/'] } }, { 'Name': 'Run Spark', 'ActionOnFailure': 'CANCEL_AND_WAIT', 'HadoopJarStep': { 'Jar': 'command-runner.jar', 'Args': ['spark-submit', '/home/hadoop/main.py'] } } ], VisibleToAllUsers=True, JobFlowRole='EMR_EC2_DefaultRole', ServiceRole='EMR_DefaultRole' ) 

You can also add steps to a running cluster if you know the job flow identifier:

 job_flow_id = response['JobFlowId'] print("Job flow ID:", job_flow_id) step_response = client.add_job_flow_steps(JobFlowId=job_flow_id, Steps=SomeMoreSteps) step_ids = step_response['StepIds'] print("Step IDs:", step_ids) 

For more configurations, check out sparksteps .

+9
source

Source: https://habr.com/ru/post/1247393/


All Articles