Spring Batch Multi Threading - How to make each thread read unique records?

This question has been asked many times in many forums. But I do not see the answer that suits me. I am trying to implement a multi-threaded step in my spring batch implementation.

  • You have an intermediate table with 100 thousand records

  • You want to process it in 10 threads of a fixed interval of 300 per thread, so 3,000 records at any given time.

  • I defined the task executor and passed it inside the step that I need for a multi-threaded thread

  • My idea is that first I get the thread pool size (10) and update the thread_id column with velue (maybe 1-10) for each of 100 thousand records. In this case, out of 10 streams and 100 thousand records, therefore, for records of 10 thousand one identifier will be assigned - I'm trying to implement a stagingsteplistener for this.

  • wrote a reader for this staging table. the task executor will create 10 readers, and each reader must read 300 different entries and process them. Now, how to pass a common identifier between the step listener and the reader, so that each stream has its own set of records for processing.

At the moment I have only one JVM. So I'm thinking of doing this in Multi Threaded, and not thinking of a partition-based approach.

Please, help......

I called the spring batch book and created an intermediate step listener that accepts the launch id from the xml job configuration using the work parameters below

<beans:bean id="stagingStepListener" class="com.apress.springbatch.statement.listener.StagingStepListener" scope="step"> <beans:property name="dataSource" ref="dataSource"/> <beans:property name="tableName" value="transaction"/> <beans:property name="whereClause" value="where jobId is null and processed is null"/> <beans:property name="jobId" value="#{jobParameters[run.id]}"/> </beans:bean> 

What didn’t I find? Where did this "run.id" come from. I do not see this anywhere in the book. I copied the same implementation in my spring batch, and when I run it, I see an exception saying that run.id is not identified. Please help me how to do this?

+4
source share
1 answer

  • What didn’t I find? Where does this "run.id" come from

Jobparameters

This is just a parameter that you pass to jobParameters. Usually, a different run.id (conditional name) is used for each instance, since the structure does not have the ability to find out what changes in JobParameters make it the "next" instance of the job.

You can pass this "run.id" to jobParameters as:

 new JobParametersBuilder().addLong("run.id", 1L).toJobParameters() 

JobParametersIncrementer look at the JobParametersIncrementer documentation for more details.


  • How to pass a common identifier between a step listener and a reader, so that each thread has its own set of records for processing

Not

This is a rather dangerous route, since many Step participants (for example, readers and writers) have a state, and if the state is not separated by a thread, then these components cannot be used in a multi-threaded step. In particular, most of Spring Batch's ready-made readers and writers are not designed for multi-threaded use.

Markup

I would recommend using Partitioning . This is much simpler than it sounds, and you can use multiple threads for it . Take a look at examples of batch jobs that use partitioning that comes from "Spring batch samples" to:

show multithreaded step execution using SPI PartitionHandler. The example uses TaskExecutorPartitionHandler to distribute the work of reading certain files across multiple threads with a single execution step for each thread. Key components are PartitionStep and MultiResourcePartitioner, which are responsible for sharing work. Please note that readers and writers at the stage, which is divided into sections, have a power-law scope, so that their state does not receive general access to the threads of execution.

+10
source

Source: https://habr.com/ru/post/1394070/


All Articles