Text data is not correctly written to the marklogic database using the adoop connector

I read CSV sample data and then use the Hadoop Connector API to write to the MarkLogic database as text. The problem is that only some of the data is written to the database a random number of times.

For example, let's say I store 10 records, so there should be 10 inserts in the MarkLogic database. I get that only a few records are recorded several times, randomly. Can someone explain why this is happening?

Here is the mapper code:

public static class CSVMapper extends Mapper<LongWritable, Text, DocumentURI, Text> {
    static int i = 1;
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        ObjectMapper mapper = new ObjectMapper();
         String line = value.toString();      //line contains one line of your csv file.
         System.out.println("line value is - "+line);

           String[] singleData = line.split("\n");
            for(String lineData : singleData)
            {
                String[] fields = lineData.split(",");
                Sample sd = new Sample(fields[0], fields[1], fields[2].trim(), fields[3]);

                String jsonInString = mapper.writeValueAsString(sd);
                Text txt = new Text();
                 txt.set(jsonInString);
                //do your processing here
                System.out.println("line Data is    - "+line);
                System.out.println("jsonInString is -  "+jsonInString);
                final DocumentURI outputURI1 = new DocumentURI("HadoopMarklogicNPPES-"+i+".json");
                i++;

                context.write(outputURI1,txt);                      
            }   
    }
}

Here is the main way:

    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    Job job = Job.getInstance(conf, "Hadoop Marklogic MarklogicHadoopCSVDataDump");
    job.setJarByClass(MarklogicHadoopCSVDataDump.class);

    // Map related configuration
    job.setMapperClass(CSVMapper.class);

    job.setMapOutputKeyClass(DocumentURI.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputFormatClass(ContentOutputFormat.class); 
    ContentInputFormatTest.setInputPaths(job, new Path("/marklogic/sampleData.csv"));
    conf = job.getConfiguration();
    conf.addResource("hadoopMarklogic.xml");        

    try {
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    } catch (ClassNotFoundException | InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

Here is an example csv data -

"Complaint ID "," Product "," Sub-product "," Issue 
"1350210 "," Bank account or service "," Other bank product/service "," Account opening  closing  or management "
"1348006 "," Debt collection "," Other (phone  health club  etc.) "," Improper contact or sharing of info "
"1351347 "," Bank account or service "," Checking account "," Problems caused by my funds being low"
"1347916 "," Debt collection "," Payday loan "," Communication tactics"
"1348296 "," Credit card ","  "," Identity theft / Fraud / Embezzlement"
"1348136 "," Money transfers "," International money transfer "," Money was not available when promised"
+4
source share

Source: https://habr.com/ru/post/1666239/


All Articles