Reading multiple files from S3 in parallel (Spark, Java)

I saw several discussions on this, but could not figure out the right solution: I want to upload a couple of hundred files from S3 to RDD. Here's how I do it now:

ObjectListing objectListing = s3.listObjects(new ListObjectsRequest(). withBucketName(...). withPrefix(...)); List<String> keys = new LinkedList<>(); objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated() JavaRDD<String> events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps)); 

ReadFromS3Function does the actual reading using the AmazonS3 client:

  public Iterator<String> call(String s) throws Exception { AmazonS3 s3Client = getAmazonS3Client(properties); S3Object object = s3Client.getObject(new GetObjectRequest(...)); InputStream is = object.getObjectContent(); List<String> lines = new LinkedList<>(); String str; try { BufferedReader reader = new BufferedReader(new InputStreamReader(is)); if (is != null) { while ((str = reader.readLine()) != null) { lines.add(str); } } else { ... } } finally { ... } return lines.iterator(); 

I kind of โ€œtranslatedโ€ this from the answers I saw on the same issue in Scala. I think it is also possible to pass the entire list of paths to sc.textFile(...) , but I'm not sure if this is the best way.

+3
source share
3 answers

The main problem is that the enumeration of objects in s3 is very slow, and the way it looks like a directory tree kills performance whenever something does a binding (according to the template template templates).

The code in the message makes a list of all the children, which provides better performance, mainly that comes with Hadoop 2.8 and s3a listFiles (path, recursive), see HADOOP-13208 .

After receiving this list, you have lines for the paths of the objects, which you can then map to the s3a / s3n paths for the spark, which will be processed as inputs of text files, and which you can then apply to

 val files = keys.map(key -> s"s3a://$bucket/$key").mkString(",") sc.textFile(files).map(...) 

And as requested, java code is used here.

 String prefix = "s3a://" + properties.get("s3.source.bucket") + "/"; objectListing.getObjectSummaries().forEach(summary -> keys.add(prefix+summary.getKey())); // repeat while objectListing truncated JavaRDD<String> events = sc.textFile(String.join(",", keys)) 

Please note that I switched s3n to s3a because if you had hadoop-aws and amazon-sdk JAR devices on your CP, the s3a connector is the one you should use. This is better, and it is one that is supported and tested against the workloads of people (I). See Hadoop S3 Connector History .

+3
source

You can use sc.textFile to read multiple files.

You can pass multiple file url as your argument.

You can specify integer directories , use wildcards and even CSV directories and wildcards.

Example:

 sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file") 

Reference to this aspect

+2
source

I assume that if you try to parallelize and reading aws will use the executor and definitely improve performance

 val bucketName=xxx val keyname=xxx val df=sc.parallelize(new AmazonS3Client(new BasicAWSCredentials("awsccessKeyId", "SecretKey")).listObjects(request).getObjectSummaries.map(_.getKey).toList) .flatMap { key => Source.fromInputStream(s3.getObject(bucketName, keyname).getObjectContent: InputStream).getLines } 
0
source

Source: https://habr.com/ru/post/1011867/


All Articles