My MapReduce should read records from HBase and write to zip files. Our client specifically asked that the output files of the reducer should be only .zip
.
To do this, I wrote a ZipFileOutputFormat
wrapper to compress records and write to zip files.
Also, we cannot use the buffer and save all the lines in the buffer, and then iterate, because some file contains 19 GB of records, and at that time it will throw java.lang.OutOfMemoryError
.
Everything looks fine, but there is one problem:
A .zip
file is created for each key. Inside my output file, I see a lot of output files, and they split the file into line by line. I do not know how to combine it in a zip file.
Here is my implementation of ZipFileOutputFormat.java
public class ZipFileOutputFormat<K, V> extends FileOutputFormat<K, V> { public static class ZipRecordWriter<K, V> extends org.apache.hadoop.mapreduce.RecordWriter<K, V> { private ZipOutputStream zipOut; public ZipRecordWriter(FSDataOutputStream fileOut) { zipOut = new ZipOutputStream(fileOut); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub zipOut.closeEntry(); zipOut.finish(); zipOut.close(); zipOut.flush(); } @Override public void write(K key, V value) throws IOException { String fname = null; if (key instanceof BytesWritable) { BytesWritable bk = (BytesWritable) key; fname = new String(bk.getBytes(), 0, bk.getLength()); } else { fname = key.toString(); } ZipEntry ze = new ZipEntry(fname); zipOut.closeEntry(); zipOut.putNextEntry(ze); if (value instanceof BytesWritable) { zipOut.write(((BytesWritable) value).getBytes(), 0, ((BytesWritable) value).getLength()); } else { zipOut.write(value.toString().getBytes()); } } } // // @Override // public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf // job, // String name, Progressable progress) throws IOException { // Path file = FileOutputFormat.getTaskOutputPath(job, name); // FileSystem fs = file.getFileSystem(job); // FSDataOutputStream fileOut = fs.create(file, progress); // return new ZipRecordWriter<K, V>(fileOut); // } @Override public org.apache.hadoop.mapreduce.RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { // TODO Auto-generated method stub Configuration conf = job.getConfiguration(); getOutputCommitter(job); getOutputName(job); Path file = getDefaultWorkFile(job, ".zip"); // Path file = new Path(committer.getWorkPath()+"/"+fileName); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileOut = fs.create(file); return new ZipRecordWriter<K, V>(fileOut); } }