Apache Spark (Structured Streaming): S3 checkpoint support

From spark structured streaming documentation: "This checkpoint location must be a path on an HDFS-compatible file system and can be set as an option in a DataStreamWriter when the query is run."

And, of course, setting a breakpoint on path s3 calls:

 17/01/31 21:23:56 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:652) at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194) at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106) at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430) at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51) at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) at com.roku.dea.spark.streaming.FactDeviceLogsProcessor$.main(FactDeviceLogsProcessor.scala:133) at com.roku.dea.spark.streaming.FactDeviceLogsProcessor.main(FactDeviceLogsProcessor.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637) 17/01/31 21:23:56 INFO SparkContext: Invoking stop() from shutdown hook 

A few questions here:

  • Why is s3 not supported as a breakpoint (regular spark speed supports this)? What makes the file system "compatible with HDFS"?
  • I am using HDFS emphemerally (because clusters can constantly rise or fall) and use s3 as a place to store all the data - what would be the recommendations for storing breakpoint data for structured streaming data in such a setting?
+5
source share
3 answers

What makes FS HDFS "compatible"? it is a file system with the behavior specified in the Hadoop FS specification . The difference between the object store and the FS is covered there, with the key point being "ultimately agreed storage objects without adding or O (1) atomic renaming is not compatible"

For S3 in particular

  • This is consistent: after creating a new blob, the list command often does not display it. The same for removal.
  • When a drop is overwritten or deleted, it may take some time to leave.
  • rename () is implemented by the copy and then deletes

Checkpoints Spark streaming, storing everything in a location, and then renaming it to a checkpoint directory. This makes the time for the control point proportional to the time for copying data to S3, which is ~ 6-10 MB / s.

The current bit of the stream code is not suitable for s3, and although at some point I can fix it, I will not send any new patches until they install my old ones. It makes no sense for me to work on this material if it is ignored.

Now do one of

  • to HDFS and then copy the results
  • per EBS bit allocated and attached to your cluster
  • on S3, but they have a long gap between breakpoints, so time to breakpoint does not disable your streaming application.

If you use EMR, you can pay a premium for the serial, supported by the S3 dynamo server, which gives you better consistency. But the copy time remains the same, so the breakpoint will be just as slow.

+2
source

This is a known issue: https://issues.apache.org/jira/browse/SPARK-19407

Must be fixed in the next version. You can set the default file system to s3 with --conf spark.hadoop.fs.defaultFS=s3 as a workaround.

+4
source

This issue has been fixed at https://issues.apache.org/jira/browse/SPARK-19407 .

However, the Structured Streaming breakpoint does not work well in S3 due to the lack of possible consistency in S3. It is not recommended to use S3 for the checkpoint https://issues.apache.org/jira/browse/SPARK-19013 .

Micheal Armburst said that this will not be fixed in Spark, and the solution should wait for the implementation of S3guard. S3guard was once far away.

+2
source

Source: https://habr.com/ru/post/1263740/


All Articles