Record multiple outputs using multiple Hadoop outputs with speculative execution enabled

I wrote a mapreduce log handling program. The job writes side data in addition to the actual output to another location external to the output path set in the driver code. But with speculative execution enabled, the output of killed task attempts is not deleted. Is there any way to avoid this problem? Is it possible to solve the problem, besides writing to the usual output location and copying to an external location after the task is completed?

Is it possible to solve this problem with "OutputCommitter"?

Has anyone tried this? Any help would be appreciated.

+4
source share
2 answers

Yes, you can use FileOutputCommitter, which moves the contents of the temporary task directory to the destination output directory when the task succeeds and deletes the original task folder.

I believe that most of the built-in output formats that extend FileOutputFormat in Hadoop use an OutputCommitter, by default it is a FileOutputCommitter.

This is the code from FileOutputFormat

public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context ) throws IOException { if (committer == null) { Path output = getOutputPath(context); committer = new FileOutputCommitter(output, context); } return committer; } 

To write on multiple paths, you can probably learn MultipleOutputs , which uses OutputCommitter by default.

Or you can create your own output format and extend FileOutputFomat and override the above function in FileOutputFormat, create your own implementation of OutputCommitter by looking at the code of FileOutputCommitter.

In the FileOoutputcommitter code you will find a function that may interest you -

  /** * Delete the work directory */ @Override public void abortTask(TaskAttemptContext context) { try { if (workPath != null) { context.progress(); outputFileSystem.delete(workPath, true); } } catch (IOException ie) { LOG.warn("Error discarding output" + StringUtils.stringifyException(ie)); } } 

If the task is successful, then commitTask () is called, which, by default, moves the timearay task output directory (which has the task ID in its name to avoid conflicts between the attempt tasks) to the final output path, $ {mapred.out put.dir }. Otherwise, the framework calls abortTask (), which removes the temporary output directory task.

+1
source

To avoid creating _logs and _SUCCESS files in the mapreduce output folder, you can use the following settings:
conf.setBoolean ("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); conf.set ("hadoop.job.history.user.location", "none");

0
source

Source: https://habr.com/ru/post/1493926/


All Articles