Does Apache Beam support native output file names?

In a distributed processing environment, "part" file names, such as "part-000", are usually used, can I write an extension of some kind to rename the names of individual output files (for example, the window file name) of Apache Beam?

To do this, you may need to name the window or specify a file name based on the contents of the window. I would like to know if such an approach is possible.

Regarding whether the solution should be streaming or batch, an example of a streaming mode is preferable.

+4
source share
3 answers

, jkff, , TextIO.write.to(FilenamePolicy).

:

, :

lines.apply(TextIO.write() ( "///file.txt" ).);

, . Google, local/s3.

public class MinimalWordCountJava8 {

  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    // In order to run your pipeline, you need to make following runner specific changes:
    //
    // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner
    // or FlinkRunner.
    // CHANGE 2/3: Specify runner-required options.
    // For BlockingDataflowRunner, set project and temp location as follows:
    //   DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
    //   dataflowOptions.setRunner(BlockingDataflowRunner.class);
    //   dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
    //   dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
    // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
    // for more details.
    //   options.as(FlinkPipelineOptions.class)
    //      .setRunner(FlinkRunner.class);

    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
     .apply(FlatMapElements
         .into(TypeDescriptors.strings())
         .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
     .apply(Filter.by((String word) -> !word.isEmpty()))
     .apply(Count.<String>perElement())
     .apply(MapElements
         .into(TypeDescriptors.strings())
         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
     // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
     .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));

    p.run().waitUntilFinish();
  }
}

:

 /**
   * A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data
   * being written. This always includes the shard number and the total number of shards. For
   * windowed writes, it also includes the window and pane index (a sequence number assigned to each
   * trigger firing).
   */
  protected static class PerWindowFiles extends FilenamePolicy {

    private final ResourceId prefix;

    public PerWindowFiles(ResourceId prefix) {
      this.prefix = prefix;
    }

    public String filenamePrefixForWindow(IntervalWindow window) {
      String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename();
      return String.format(
          "%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end()));
    }

    @Override
    public ResourceId windowedFilename(int shardNumber,
                                       int numShards,
                                       BoundedWindow window,
                                       PaneInfo paneInfo,
                                       OutputFileHints outputFileHints) {
      IntervalWindow intervalWindow = (IntervalWindow) window;
      String filename =
          String.format(
              "%s-%s-of-%s%s",
              filenamePrefixForWindow(intervalWindow),
              shardNumber,
              numShards,
              outputFileHints.getSuggestedFilenameSuffix());
      return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE);
    }

    @Override
    public ResourceId unwindowedFilename(
        int shardNumber, int numShards, OutputFileHints outputFileHints) {
      throw new UnsupportedOperationException("Unsupported.");
    }
  }

  @Override
  public PDone expand(PCollection<InputT> teamAndScore) {
    if (windowed) {
      teamAndScore
          .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
          .apply(new WriteToText.WriteOneFilePerWindow(filenamePrefix));
    } else {
      teamAndScore
          .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
          .apply(TextIO.write().to(filenamePrefix));
    }
    return PDone.in(teamAndScore.getPipeline());
  }
+5

. Per TextIO:

, , , FilenamePolicy โ€‹โ€‹ TextIO.Write.to(FilenamePolicy)

+1

This is a perfectly valid example with bundle 2.1.0. You can call up your data (e.g. PCollection)

import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.display.DisplayData;

@SuppressWarnings("serial")
public class FilePolicyExample {

    public static void main(String[] args) {
        FilenamePolicy policy = new WindowedFilenamePolicy("somePrefix");

        //data 
        data.apply(TextIO.write().to("your_DIRECTORY")
            .withFilenamePolicy(policy)
            .withWindowedWrites()
            .withNumShards(4));

    }

    private static class WindowedFilenamePolicy extends FilenamePolicy {

        final String outputFilePrefix;

        WindowedFilenamePolicy(String outputFilePrefix) {
            this.outputFilePrefix = outputFilePrefix;
        }

        @Override
        public ResourceId windowedFilename(
                ResourceId outputDirectory, WindowedContext input, String extension) {
            String filename = String.format(
                    "%s-%s-%s-of-%s-pane-%s%s%s",
                    outputFilePrefix,
                    input.getWindow(),
                    input.getShardNumber(),
                    input.getNumShards() - 1,
                    input.getPaneInfo().getIndex(),
                    input.getPaneInfo().isLast() ? "-final" : "",
                    extension);
            return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
        }

        @Override
        public ResourceId unwindowedFilename(
                ResourceId outputDirectory, Context input, String extension) {
            throw new UnsupportedOperationException("Expecting windowed outputs only");
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item("fileNamePrefix", outputFilePrefix)
                    .withLabel("File Name Prefix"));
        }
    }
}
0
source

Source: https://habr.com/ru/post/1687128/


All Articles