Hasoop multiple exception already thrown

I do not get the error below when I run the code on small data. But I get the error below when using multiple outputs when I run the same code in a larger dataset. Help Pls!

org.apache.hadoop.ipc.RemoteException: org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException: failed to create file /home/users/mlakshm/alop176/data-r-00001 for DFSClient_attempt_201208010142_0043_r_000001_1 on client 10.0.1.100, because this file is already being created by DFSClient_attempt_201208010142_0043_r_000001_0 on 10.0.1.130 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:1406) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:1246) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1188) at org.apache.hadoop.hdfs.server.namenode.NameNode.create(NameNode.java:628) at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382) at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at $Proxy2.create(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:616) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) at $Proxy2.create(Unknown Source) at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.<init>(DFSClient.java:3248) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:713) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:182) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:555) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:455) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:118) at com.a.MultipleOutputs$InternalFileOutputFormat.getRecordWriter(MultipleOutputs.java:565) at com.a.MultipleOutputs.getRecordWriter(MultipleOutputs.java:432) at com.a.MultipleOutputs.getCollector(MultipleOutputs.java:518) at com.a.MultipleOutputs.getCollector(MultipleOutputs.java:482) at com.a.ReduceThree1.reduce(ReduceThree1.java:56) at com.a.ReduceThree1.reduce(ReduceThree1.java:1) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:519) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) 

at org.apache.hadoop.mapred.Child.main (Child.java:249)


The abbreviation class is as follows:

 public class ReduceThree1 extends MapReduceBase implements Reducer<Text, Text, Text, Text>{ // @SuppressWarnings("unchecked") private MultipleOutputs mos; public void configure(JobConf conf1) { mos = new MultipleOutputs(conf1); } public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { // MultipleOutputs mos; int sum = 0; ArrayList<CustomMapI> alcmap = new ArrayList<CustomMapI>(); while(values.hasNext()) { String val = values.next().toString(); StringTokenizer st = new StringTokenizer(val); String uid = st.nextToken(); String f_val = st.nextToken(); CustomMapI cmap = new CustomMapI(uid, f_val); alcmap.add(cmap); sum += Integer.parseInt(f_val); } StringTokenizer st = new StringTokenizer(key.toString()); String t = st.nextToken(); String data = st.nextToken(); for(int i = 0; i<alcmap.size(); i++) { String str_key = t+" "+alcmap.get(i).getUid(); String str_val = data+" "+alcmap.get(i).getF_val()+" "+sum; // output.collect(new Text(str_key), new Text(str_val)); mos.getCollector("/home/users/mlakshm/alop176/data", reporter).collect(new Text(str_key), new Text(str_val)); for(int j = 1; j<alcmap.size(); j++) { if((j>i)&&(!alcmap.get(i).equals(alcmap.get(j)))) { String mul_key = "null"; String uidi = alcmap.get(i).getUid(); String uidj = alcmap.get(j).getUid(); ArrayList<String> alsort = new ArrayList<String>(); alsort.add(uidi); alsort.add(uidj); Collections.sort(alsort); int fi = Integer.parseInt(alcmap.get(i).getF_val()); int fj = Integer.parseInt(alcmap.get(j).getF_val()); String intersection = "null"; if(fi<fj) { intersection = String.valueOf(fi); } else { intersection = String.valueOf(fj); } String mul_val = t+" "+alsort.get(0)+" "+alsort.get(1)+" "+intersection; // System.out.println(mul_key+ " "+mul_val); mos.getCollector("/home/users/mlakshm/alop177/datepairs", reporter).collect(new Text(mul_key), new Text(mul_val)); } } } } public void close() throws IOException { mos.close(); } } 

The task of tasks looks as follows:

Configuration config1 = new Configuration ();

  JobConf conf1 = new JobConf(config1, DJob.class); conf1.setJobName("DJob1"); conf1.setOutputKeyClass(Text.class); conf1.setOutputValueClass(Text.class); // conf.setMapOutputValueClass(Text.class); // conf.setMapOutputKeyClass(Text.class); // conf.setNumMapTasks(20); conf.setNumReduceTasks(10); conf1.setMapperClass(MapThree1.class); // conf.setCombinerClass(Combiner.class); conf1.setReducerClass(ReduceThree1.class); conf1.setPartitionerClass(CustomPartitioner.class); conf1.setInputFormat(TextInputFormat.class); conf1.setOutputFormat(TextOutputFormat.class); // mos = new MultipleOutputs(conf1); MultipleOutputs.addNamedOutput(conf1, "/home/users/mlakshm/alop176/data", TextOutputFormat.class, LongWritable.class, Text.class); MultipleOutputs.addNamedOutput(conf1, "/home/users/mlakshm/alop177/datepairs", TextOutputFormat.class, LongWritable.class, Text.class); FileInputFormat.setInputPaths(conf1, new Path(other_args.get(2))); FileOutputFormat.setOutputPath(conf1, new Path(other_args.get(3))); JobClient.runJob(conf1); 
+6
source share
1 answer

Most likely, you have speculative execution, and you try to write two different attempts to reduce task 1 to the path /home/users/mlakshm/alop176/data-r-00001 . This probably succeeds in smaller tasks, since they end before the chaop speculatively performs the second attempt.

I see that your implementation of MultipleOutputs is normal ( com.a.MultipleOutputs ), you should write all the HDFS data to the working task directory and let the OutputComitter move it to the final output directory when commit exits. If you are able, use the code and we can take a look.

+3
source

Source: https://habr.com/ru/post/921981/


All Articles