I have a web application written using spring boot and spring batch job that reads from CSV and writes data to RDS table(with MySQL) using spring jdbcTemplate. I have split spring metadata data tables for a separate database to avoid confusion with my business logic tables. I need to use connection pool capabilities for both databases. I am using the default connection pool provided by spring boot (Tomcat jdbc pool). I am setting up several jdbctemplate for both databases, as shown below in the configuratinon file. I have to put the @Primary annotation for the jobMetaData table because the spring batch job will not be able to identify its database differently. Because of this, when I checked jconsole, I found that only one connection pool was created for the @Primary datasource. How to enable connection pooling for both data sources in this case?
My data source configuration section:
@Primary
@Bean(name = "mysqlDs")
@ConfigurationProperties(prefix = "datasource.sql.jobMetaDataDb")
public DataSource sqlDataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name = "mysql")
@Autowired
public JdbcTemplate slaveJdbcTemplate(@Qualifier("mysqlDs") DataSource mysqlDs) {
return new JdbcTemplate(mysqlDs);
}
@Bean(name = "rdsDataSource")
@ConfigurationProperties(prefix = "datasource.sql.rdsWriterDb")
public DataSource rdsDataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name = "rdsJdbcTemplate")
@Autowired
public JdbcTemplate rdsJdbcTemplate(@Qualifier("rdsDataSource") DataSource rdsDataSource) {
return new JdbcTemplate(rdsDataSource);
}
:
#Mysql db end point confiugrations for
#spring job metadata.
datasource.sql.jobMetaDataDb.url=jdbc:mysql:
datasource.sql.jobMetaDataDb.username=root
datasource.sql.jobMetaDataDb.password=root
datasource.sql.jobMetaDataDb.driverClassName=com.mysql.jdbc.Driver
datasource.sql.jobMetaDataDb.jmx-enabled=true
#Configuration to avoid wait_timeout
datasource.sql.jobMetaDataDb.testWhileIdle = true
datasource.sql.jobMetaDataDb.timeBetweenEvictionRunsMillis = 7200000
datasource.sql.jobMetaDataDb.validationQuery = SELECT 1
#Database configuration for RdsWriter.
datasource.sql.rdsWriterDb.url=jdbc:mysql:
datasource.sql.rdsWriterDb.username=root
datasource.sql.rdsWriterDb.password=root
datasource.sql.rdsWriterDb.driverClassName=com.mysql.jdbc.Driver
#Configuration to avoid wait_timeout
datasource.sql.rdsWriterDb.testWhileIdle = true
datasource.sql.rdsWriterDb.timeBetweenEvictionRunsMillis = 7200000
datasource.sql.rdsWriterDb.validationQuery = SELECT 1
datasource.sql.rdsWriterDb.jmx-enabled=true
spring.datasource.jmx-enabled=true
RDS, spring, , jdbcTemplate.
package com.fastretailing.catalogPlatformSCMProducer.producerjob.writer.rds;
import com.fastretailing.catalogPlatformSCMProducer.constants.ProducerJobConstants;
import com.fastretailing.catalogPlatformSCMProducer.model.Configuration;
import com.fastretailing.catalogPlatformSCMProducer.model.ProducerMessage;
import com.fastretailing.catalogPlatformSCMProducer.model.RDSColumnInfo;
import com.fastretailing.catalogPlatformSCMProducer.notification.JobStatus;
import com.fastretailing.catalogPlatformSCMProducer.util.ProducerUtil;
import com.fastretailing.catalogPlatformSCMProducer.util.QueryGenerator;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.support.AbstractItemStreamItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.Transactional;
import java.sql.BatchUpdateException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
public class RdsWriter extends AbstractItemStreamItemWriter<ProducerMessage> {
@Autowired
@Qualifier("rdsJdbcTemplate")
JdbcTemplate rdsJdbcTemplate;
@Autowired
Configuration configuration;
QueryGenerator queryGenerator;
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
@Override
public void write(List<? extends ProducerMessage> list) throws Exception {
handleRecord(list);
}
@Transactional
public void handleRecord(List<? extends ProducerMessage> list) {
List<Object[]> objectList_insert = new ArrayList<>();
List<Object> insertObject = null;
String tableName = null;
for (ProducerMessage message : list) {
try {
insertObject = new ArrayList<>();
if (null == queryGenerator) {
queryGenerator = new QueryGenerator(message);
}
if (null == tableName)
tableName = message.getTableName();
String timestampValidationPS = ProducerUtil.generateTimestampCheckPS(message);
Long returnValue;
try {
returnValue = rdsJdbcTemplate.queryForObject(timestampValidationPS,
ProducerUtil.generatePrimaryKeyObjectList(message), Long.class);
} catch (EmptyResultDataAccessException e) {
LOGGER.debug("Primary key not exists in RDS table. This will insert new row");
returnValue = null;
}
if (null == returnValue || returnValue <= message.getTimeStamp()) {
for (RDSColumnInfo columnInfo : message.getRecord()) {
if(columnInfo.getRdsColumnValue().isEmpty() && !columnInfo.getRdsVarType().equalsIgnoreCase(ProducerJobConstants.TYPE_VARCHAR)){
insertObject.add(null);
}else {
insertObject.add(columnInfo.getRdsColumnValue());
}
}
objectList_insert.add(insertObject.toArray());
} else {
JobStatus.addRowsSkippedWriting(1);
LOGGER.debug("Skipped row due to timestamp check failure for feedName {}",
message.getFeedConfigName());
}
} catch (Exception e) {
JobStatus.changeStatus(ProducerUtil.SNS_NOTIFICATION_EVENT_IN_COMPLETE);
JobStatus.addExceptionInLogWriter(ExceptionUtils.getStackTrace(e));
JobStatus.addRowsSkippedWriting(1);
LOGGER.error("Exception while processing records for RDS write. These records will be skipped from writing.",
e);
}
}
try {
if (objectList_insert != null) {
String insertQuery = queryGenerator.generateRdsInsertPS(tableName);
LOGGER.debug("Executing Query {}", insertQuery);
rdsJdbcTemplate.batchUpdate(insertQuery, objectList_insert);
JobStatus.addRowsWritten(objectList_insert.size() );
}
} catch (Exception e) {
if(ExceptionUtils.indexOfThrowable(e,BatchUpdateException.class ) != -1){
BatchUpdateException be = (BatchUpdateException) e.getCause();
handleUpdateCountOnException(be);
}
JobStatus.changeStatus(ProducerUtil.SNS_NOTIFICATION_EVENT_IN_COMPLETE);
JobStatus.addExceptionInLogWriter(ExceptionUtils.getStackTrace(e));
LOGGER.error("Exception while writing records to RDS table. These records will be skipped from writing.",
e);
}
}
private void handleUpdateCountOnException(BatchUpdateException be){
for (int count : be.getUpdateCounts()){
if (count == Statement.EXECUTE_FAILED) {
JobStatus.addRowsSkippedWriting(1);
}else {
JobStatus.addRowsWritten(1);
}
}
}
}