Spring batch jpaPagingItemReader, why some lines are not readable?

I am using Spring Batch (3.0.1.RELEASE) / JPA and the HSQLBD server database. I need to view the entire table (using paging) and update the elements (one by one). Therefore, I used jpaPagingItemReader. But when I start the task, I see that some lines are skipped, and the number of skipped lines is equal to the page size. For example, if my table has 12 rows, and jpaPagingItemReader.pagesize = 3, then the task will read: rows 1,2,3, and then rows 7,8,9 (so skip rows 4,5,6) ... Not could you tell me what is wrong in my code / configuration or maybe there is a problem with HSQLDB paging? Below is my code:

[EDIT] The problem is with my ItemProcessor, which is modifying POJO objects. Since JPAPagingItemReader flashed between each reading, the objects are updated ((this is what I want). But it seems that the cursor swap also increases (as seen from the log: lines ID 4, 5 and 6 were skipped). How can I solve this a problem?

@Configuration @EnableBatchProcessing(modular=true) public class AppBatchConfig { @Inject private InfrastructureConfiguration infrastructureConfiguration; @Inject private JobBuilderFactory jobs; @Inject private StepBuilderFactory steps; @Bean public Job job() { return jobs.get("Myjob1").start(step1()).build(); } @Bean public Step step1() { return steps.get("step1") .<SNUserPerCampaign, SNUserPerCampaign> chunk(0) .reader(reader()).processor(processor()).build(); } @Bean(destroyMethod = "") @JobScope public ItemStreamReader<SNUserPerCampaign> reader() String trigramme) { JpaPagingItemReader reader = new JpaPagingItemReader(); reader.setEntityManagerFactory(infrastructureConfiguration.getEntityManagerFactory()); reader.setQueryString("select t from SNUserPerCampaign t where t.isactive=true"); reader.setPageSize(3)); return reader; } @Bean @JobScope public ItemProcessor<SNUserPerCampaign, SNUserPerCampaign> processor() { return new MyItemProcessor(); } } @Configuration @EnableBatchProcessing public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration { @Inject private EntityManagerFactory emf; @Override public EntityManagerFactory getEntityManagerFactory() { return emf; } } 

from my ItemProcessor:

 @Override public SNUserPerCampaign process(SNUserPerCampaign item) throws Exception { //do some stuff … //then if (condition) update the Entity pojo : item.setModificationDate(new Timestamp(System.currentTimeMillis()); item.setIsactive = false; } 

from Spring xml configuration file:

 <tx:annotation-driven transaction-manager="transactionManager" /> <bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager"> <property name="entityManagerFactory" ref="entityManagerFactory" /> </bean> <bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean"> <property name="dataSource" ref="dataSource" /> </bean> <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"> <property name="driverClassName" value="org.hsqldb.jdbcDriver" /> <property name="url" value="jdbc:hsqldb:hsql://localhost:9001/MYAppDB" /> <property name="username" value="sa" /> <property name="password" value="" /> </bean> 

trace / log summarized:

 11:16:05.728 TRACE MyItemProcessor - item processed: snUserInternalId=1] 11:16:06.038 TRACE MyItemProcessor - item processed: snUserInternalId=2] 11:16:06.350 TRACE MyItemProcessor - item processed: snUserInternalId=3] 11:16:06.674 DEBUG SQL- update SNUSER_CAMPAIGN set ...etc... 11:16:06.677 DEBUG SQL- update SNUSER_CAMPAIGN set ...etc... 11:16:06.679 DEBUG SQL- update SNUSER_CAMPAIGN set ...etc... 11:16:06.681 DEBUG SQL- select ...etc... from SNUSER_CAMPAIGN snuserperc0_ 11:16:06.687 TRACE MyItemProcessor - item processed: snUserInternalId=7] 11:16:06.998 TRACE MyItemProcessor - item processed: snUserInternalId=8] 11:16:07.314 TRACE MyItemProcessor - item processed: snUserInternalId=9] 
+5
source share
4 answers

org.springframework.batch.item.database.JpaPagingItemReader creates its own instance of entityManager

(from org.springframework.batch.item.database.JpaPagingItemReader # doOpen):

 entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap); 

If you are in a transaction, it seems reader objects are not detached (from org.springframework.batch.item.database.JpaPagingItemReader # doReadPage):

  if (!transacted) { List<T> queryResult = query.getResultList(); for (T entity : queryResult) { entityManager.detach(entity); results.add(entity); }//end if } else { results.addAll(query.getResultList()); tx.commit(); } 

For this reason, when you update an element in a processor or record, that element is still controlled by the entityManager reader.

When the element reader reads the next piece of data, it flushes the context to the database.

So, if we look at your case, after the first piece of data processes we have in the database:

 |id|active |1 | false |2 | false |3 | false 

org.springframework.batch.item.database.JpaPagingItemReader uses restriction and offset to retrieve paginated data. So, the following choice created by the reader looks like this:

 select * from table where active = true offset 3 limits 3. 

The reader will skip items with identifiers 4,5,6, since now they are the first rows retrieved from the database.

As a workaround, you can use the jdbc implementation (org.springframework.batch.item.database.JdbcPagingItemReader) as it does not use restriction and offset. It is based on a sorted column (usually the id column), so you won't miss any data. Of course, you will have to update your data in the record (using either JPA for a clean JDBC implementation)

The reader will be more detailed:

 @Bean public ItemReader<? extends Entity> reader() { JdbcPagingItemReader<Entity> reader = new JdbcPagingItemReader<Entity>(); final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean(); sqlPagingQueryProviderFactoryBean.setDataSource(dataSource); sqlPagingQueryProviderFactoryBean.setSelectClause("select *"); sqlPagingQueryProviderFactoryBean.setFromClause("from <your table name>"); sqlPagingQueryProviderFactoryBean.setWhereClause("where active = true"); sqlPagingQueryProviderFactoryBean.setSortKey("id"); try { reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject()); } catch (Exception e) { e.printStackTrace(); } reader.setDataSource(dataSource); reader.setPageSize(3); reader.setRowMapper(new BeanPropertyRowMapper<Entity>(Entity.class)); return reader; 
+7
source

A few notes:

  • All objects returned from JpaPaginingItemReader are deleted. We do this in one of two ways. We either create a transaction before requesting the page, and then commit the transaction (which separates all entities related to the EntityManager for this transaction), or we explicitly call entityManager.detach . We do this so that functions like retry attempts and skipping can be correctly executed.
  • Until you sent all the code in your processor, I suspect that in the //do some stuff section your element is connected again, so the update is happening. However, not being able to see this code, I cannot be sure.
  • In either case, an explicit ItemWriter should be used. Actually, I consider it a mistake that we do not need ItemWriter when using java config (we do for XML).
  • For your specific problem with missing entries, you need to keep in mind that the cursor is not used by any of *PagingItemReader s. They all perform independent queries for each data page. Therefore, if you update the baseline data between each page, this may affect the items returned on future pages. For example, if my swap request indicates where val1 > 4 , and I have an entry in which val1 was 1 equal to 5, in block 2 this element can be returned, since now it meets the criteria. If you need to update the values ​​that are contained in your where argument (thereby influencing what falls into the data set that you should process), it is best to add a processed flag that you can request instead.
+2
source

I had the same issue with line skipping based on pageSize. If I have a pageSize parameter equal to 2, it will read 2, ignore 2, read 2, ignore 2, etc.

I created a daemon handler to query the database table "Query" for records in the status "Waiting to be processed." The daemon is designed to run in the background.

I had a "status" field that was defined in @NamedQuery and selected records whose status was "10": Waiting for processing. After processing the record, the status field will be updated to β€œ20”: error or β€œ30”: success. This turned out to be the cause of the problem - I updated the field that was defined in the request. If I entered the "processed filter" and updated it instead of the "status" field, then there was no problem - all the entries would be read.

As a possible solution for updating the status field, setMaxItemCount is the same as for the PageSize parameter; this correctly updated the entries before the step was completed. Then I continue with the step until a request is made to stop the daemon. OK, maybe this is not the most efficient way to do this (but I still take advantage of the ease of use that JPA provides), but I think it would be better to use JdbcPagingItemReader (described above - thanks!). Opinions on the best approach to this database polling problem would be welcome :)

0
source

I came across the same case, my reader was JpaPagingItemReader, which requested a field that was updated in the writer. Consequently, half of the items that need to be updated are skipped due to the fact that the page window is progressing, and already read items are no longer in the reading area.

The simplest workaround for me was to override the getPage method on JpaPagingItemReader to always return the first page.

 JpaPagingItemReader<XXXXX> jpaPagingItemReader = new JpaPagingItemReader() { @Override public int getPage() { return 0; } }; 
0
source

Source: https://habr.com/ru/post/1205275/


All Articles