Background
We are using Spring batch to process data with MySQL.
We use JdbcCursorItemReader as Reader.
This get data and process (reading) item using cursor.
Process cursor one by one, and get data, but query is getting all data.
We get a lot of data (ex 1 day data for customers), so keep all data in memory, and finally Out of memory
SELECT * FROM Table; not SELECT * FROM Table LIMIT 10 OFFSET 10;
JdbcPagingItemReader
To support Pagenation, JdbcPaingItemReader seems to be good.
Preparation
This is an environment of this trial
- Spring Boot 2.6.4
- MySQL 8.0
Before staring, I prepared MySQL Schema to test.
In this example, Read data and process (change object) and write different Table.
There are 2 tables, read / write
loga table
CREATE TABLE `testdb`.`loga` ( `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, `member_id` VARCHAR(255) NOT NULL, `point` INT NULL, `grant_date` DATE NULL, PRIMARY KEY (`id`));
logb table
CREATE TABLE `testdb`.`logb` ( `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, `member_id` VARCHAR(255) NOT NULL, `point` INT NULL, `converted_date` DATE NULL, PRIMARY KEY (`id`));
Codes
Let’s start code part
build.gradle
plugins { id 'org.springframework.boot' version '2.6.4' id 'io.spring.dependency-management' version '1.0.11.RELEASE' id 'java' } group = 'jp.co.rakuten.reward' version = '0.0.1-SNAPSHOT' sourceCompatibility = '8' configurations { compileOnly { extendsFrom annotationProcessor } } repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter-batch' implementation 'org.springframework.boot:spring-boot-starter-data-jdbc' implementation 'org.springframework.boot:spring-boot-starter-logging' compileOnly 'org.projectlombok:lombok' runtimeOnly 'mysql:mysql-connector-java' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.batch:spring-batch-test' } tasks.named('test') { useJUnitPlatform() }
application.yml
spring.profiles.active: local spring: batch: job: enabled: true initialize-schema: always logging: level: org: springframework: jdbc: core: trace hibernate: sql: debug type: trace --- ###################################### # LOCAL Environment ###################################### spring: profiles: local datasource: driver: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/testdb?autoReconnect=true&useSSL=false username: root password: xxxxxx jpa: show-sql: true properties: hibernate: format_sql: true
DemoApplication.java
This is Spring Boot trigger part
@ComponentScan({"com.daiji110.demo"}) @SpringBootApplication public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
BatchConfig.java
Job description and Batch configuration codes.
@Configuration @EnableBatchProcessing @EnableAutoConfiguration public class BatchConfig { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; private static final Integer CHUNK_SIZE = 12500; @Bean public Step converterStep(ConverterReader reader, ConverterProcessor converterProcessor, ConverterWriter writer) { return stepBuilderFactory.get("converterStep") .<PointConverterDto, PointConverterDto> chunk(CHUNK_SIZE) .reader(reader) .processor(converterProcessor) .writer(writer) .build(); } @Bean public Job job(Step converterStep) { return jobBuilderFactory.get("job") .incrementer(new RunIdIncrementer()) .start(converterStep) .build(); } }
LogA.java
This is table model codes
@Data public class LogA { private Long id; private String memberId; private Integer point; private LocalDate grantDate; }
LogB.java
@Data public class LogB { private Long id; private String memberId; private Integer point; private LocalDate convertedDate; }
PointConverterDto.java
This is used by Reader / Processor / Writer common part
@Data public class PointConverterDto { private LogA logA; private LogB logB; }
ConverterReader.java
This is main topic of this article.
Reader with JdbcPagingItemReader
Set PageSize (10), and Set Row handler to convert data
@Component @StepScope public class ConverterReader extends JdbcPagingItemReader<PointConverterDto> { ConverterReader(DataSource dataSource) { Map<String, Object> parameterValues = new HashMap(); parameterValues.put("grantdate", "2022-03-16"); setDataSource(dataSource); setPageSize(10); setQueryProvider(createQueryProvider()); setParameterValues(parameterValues); setRowMapper(new LogARowMapper()); } private PagingQueryProvider createQueryProvider() { Map<String, Order> sortKeys = new HashMap(1); sortKeys.put("id", Order.ASCENDING); MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("SELECT id, member_id, point, grant_date"); queryProvider.setFromClause("FROM loga"); queryProvider.setWhereClause("WHERE grant_date = :grantdate"); queryProvider.setSortKeys(sortKeys); return queryProvider; } }
LogARowMapper.java
This is handler to process from ResultSet to Object
public class LogARowMapper implements RowMapper { @Override public Object mapRow(ResultSet rs, int rowNum) throws SQLException { PointConverterDto dto = new PointConverterDto(); LogA logA = new LogA(); logA.setId(rs.getLong("id")); logA.setMemberId(rs.getString("member_id")); logA.setPoint(rs.getInt("point")); logA.setGrantDate(LocalDate.parse(rs.getString("grant_date"))); dto.setLogA(logA); return dto; } }
ConverterProcessor.java
This is Processor part.
Get data and create new object to write
@Component public class ConverterProcessor implements ItemProcessor<PointConverterDto, PointConverterDto> { @Override public PointConverterDto process(PointConverterDto item) throws Exception { LogA logA = item.getLogA(); if (logA != null) { LogB logB = new LogB(); logB.setConvertedDate(logA.getGrantDate().plusDays(1)); logB.setPoint(logA.getPoint()); logB.setMemberId(logA.getMemberId()); item.setLogB(logB); } return item; } }
ConverterWriter.java
This is the last part.
Write data into table.
@Component public class ConverterWriter extends JdbcBatchItemWriter<PointConverterDto> { public ConverterWriter(DataSource dataSource) { this.setDataSource(dataSource); this.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); this.setSql(insertTenantPointLogsSQL()); } @Override public void write(List<? extends PointConverterDto> items) throws Exception { super.write(items); } private String insertTenantPointLogsSQL() { return "INSERT INTO logb(member_id, point, converted_date) VALUES(:logB.memberId, :logB.point, :logB.convertedDate)"; } }
Run
If you run this program you can see SQL log
SELECT * FROM loga ORDER BY id ASC, LIMIT 10;
コメント