Spring Batch Reader Pagenation

スポンサーリンク

Background

We are using Spring batch to process data with MySQL.

We use JdbcCursorItemReader as Reader.

This get data and process (reading) item using cursor.

Process cursor one by one, and get data, but query is getting all data.

We get a lot of data (ex 1 day data for customers), so keep all data in memory, and finally Out of memory

SELECT * FROM Table; not SELECT * FROM Table LIMIT 10 OFFSET 10;

JdbcPagingItemReader

To support Pagenation, JdbcPaingItemReader seems to be good.

Preparation

This is an environment of this trial

  • Spring Boot 2.6.4
  • MySQL 8.0

Before staring, I prepared MySQL Schema to test.

In this example, Read data and process (change object) and write different Table.

There are 2 tables, read / write

loga table

CREATE TABLE `testdb`.`loga` (
  `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  `member_id` VARCHAR(255) NOT NULL,
  `point` INT NULL,
  `grant_date` DATE NULL,
  PRIMARY KEY (`id`));

logb table

CREATE TABLE `testdb`.`logb` (
  `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  `member_id` VARCHAR(255) NOT NULL,
  `point` INT NULL,
  `converted_date` DATE NULL,
  PRIMARY KEY (`id`));

Codes

Let’s start code part

build.gradle

plugins {
	id 'org.springframework.boot' version '2.6.4'
	id 'io.spring.dependency-management' version '1.0.11.RELEASE'
	id 'java'
}

group = 'jp.co.rakuten.reward'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '8'

configurations {
	compileOnly {
		extendsFrom annotationProcessor
	}
}

repositories {
	mavenCentral()
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-batch'
	implementation 'org.springframework.boot:spring-boot-starter-data-jdbc'
	implementation 'org.springframework.boot:spring-boot-starter-logging'
	compileOnly 'org.projectlombok:lombok'
	runtimeOnly 'mysql:mysql-connector-java'
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testImplementation 'org.springframework.batch:spring-batch-test'
}

tasks.named('test') {
	useJUnitPlatform()
}

application.yml

spring.profiles.active: local

spring:
  batch:
    job:
      enabled: true
    initialize-schema: always
logging:
  level:
    org:
      springframework:
        jdbc:
          core: trace
      hibernate:
        sql: debug
        type: trace

---
######################################
# LOCAL Environment
######################################
spring:
  profiles: local
  datasource:
    driver: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/testdb?autoReconnect=true&useSSL=false
    username: root
    password: xxxxxx
  jpa:
    show-sql: true
    properties:
      hibernate:
        format_sql: true

DemoApplication.java

This is Spring Boot trigger part

@ComponentScan({"com.daiji110.demo"})
@SpringBootApplication
public class DemoApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoApplication.class, args);
	}

}

BatchConfig.java

Job description and Batch configuration codes.

@Configuration
@EnableBatchProcessing
@EnableAutoConfiguration
public class BatchConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    private static final Integer CHUNK_SIZE = 12500;

    @Bean
    public Step converterStep(ConverterReader reader, ConverterProcessor converterProcessor, ConverterWriter writer) {
        return stepBuilderFactory.get("converterStep")
                .<PointConverterDto, PointConverterDto> chunk(CHUNK_SIZE)
                .reader(reader)
                .processor(converterProcessor)
                .writer(writer)
                .build();
    }

    @Bean
    public Job job(Step converterStep) {
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .start(converterStep)
                .build();
    }
}

LogA.java

This is table model codes

@Data
public class LogA {

    private Long id;

    private String memberId;

    private Integer point;

    private LocalDate grantDate;
}

LogB.java

@Data
public class LogB {
    private Long id;

    private String memberId;

    private Integer point;

    private LocalDate convertedDate;
}

PointConverterDto.java

This is used by Reader / Processor / Writer common part

@Data
public class PointConverterDto {
    private LogA logA;

    private LogB logB;
}

ConverterReader.java

This is main topic of this article.

Reader with JdbcPagingItemReader

Set PageSize (10), and Set Row handler to convert data

@Component
@StepScope
public class ConverterReader extends JdbcPagingItemReader<PointConverterDto> {

    ConverterReader(DataSource dataSource) {

        Map<String, Object> parameterValues = new HashMap();
        parameterValues.put("grantdate", "2022-03-16");

        setDataSource(dataSource);
        setPageSize(10);
        setQueryProvider(createQueryProvider());
        setParameterValues(parameterValues);
        setRowMapper(new LogARowMapper());
    }

    private PagingQueryProvider createQueryProvider() {
        Map<String, Order> sortKeys = new HashMap(1);
        sortKeys.put("id", Order.ASCENDING);
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("SELECT id, member_id, point, grant_date");
        queryProvider.setFromClause("FROM loga");
        queryProvider.setWhereClause("WHERE grant_date = :grantdate");
        queryProvider.setSortKeys(sortKeys);
        return queryProvider;
    }
}

LogARowMapper.java

This is handler to process from ResultSet to Object

public class LogARowMapper implements RowMapper {

    @Override
    public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
        PointConverterDto dto = new PointConverterDto();
        LogA logA = new LogA();
        logA.setId(rs.getLong("id"));
        logA.setMemberId(rs.getString("member_id"));
        logA.setPoint(rs.getInt("point"));
        logA.setGrantDate(LocalDate.parse(rs.getString("grant_date")));
        dto.setLogA(logA);
        return dto;
    }
}

ConverterProcessor.java

This is Processor part.

Get data and create new object to write

@Component
public class ConverterProcessor implements ItemProcessor<PointConverterDto, PointConverterDto> {

    @Override
    public PointConverterDto process(PointConverterDto item) throws Exception {
        LogA logA = item.getLogA();
        if (logA != null) {
            LogB logB = new LogB();
            logB.setConvertedDate(logA.getGrantDate().plusDays(1));
            logB.setPoint(logA.getPoint());
            logB.setMemberId(logA.getMemberId());
            item.setLogB(logB);
        }
        return item;
    }
}

ConverterWriter.java

This is the last part.

Write data into table.

@Component
public class ConverterWriter extends JdbcBatchItemWriter<PointConverterDto> {

    public ConverterWriter(DataSource dataSource) {
        this.setDataSource(dataSource);
        this.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        this.setSql(insertTenantPointLogsSQL());
    }

    @Override
    public void write(List<? extends PointConverterDto> items) throws Exception {
        super.write(items);
    }

    private String insertTenantPointLogsSQL() {
        return "INSERT INTO logb(member_id, point, converted_date) VALUES(:logB.memberId, :logB.point, :logB.convertedDate)";
    }
}

Run

If you run this program you can see SQL log

SELECT * FROM loga ORDER BY id ASC, LIMIT 10;

未分類
スポンサーリンク
Professional Programmer2

コメント