7.Spring Batch의 Chunk와 동작원리 살펴보기
Spring Batch

7.Spring Batch의 Chunk와 동작원리 살펴보기

반응형

Chunk

1.기본개념

  • Chunk 란 여러 개의 아이템을 묶은 하나의 덩어리, 블록을 의미
  • 한번에 하나씩 아이템을 입력 받아 Chunk 단위의 덩어리로 만든 후 Chunk 단위로 트랜잭션을 처리함, 즉 Chunk 단위의 Commit 과 Rollback 이 이루어짐
  • 일반적으로 대용량 데이터를 한번에 처리하는 것이 아닌 청크 단위로 쪼개어서 더 이상 처리할 데이터가 없을 때까지 반복해서 입출력하는데 사용됨

  • Chunk vs Chunk
    • Chunk 는 ItemReader 로 읽은 하나의 아이템을 Chunk 에서 정한 개수만큼 반복해서 저장하는 타입
    • Chunk 는 ItemReader 로부터 전달받은 Chunk 를 참조해서 ItemProcessor 에서 적절하게 가공, 필터링한 다음 ItemWriter 에 전달하는 타입

2.아키텍쳐 흐름도

  • Chunk는 ChunkSize 개수에 될 때까지 반복한다
  • ChunkSize에 도달한 이후에는 List에 저장된 Chunk정보를 Iteratior를 통해 반복문을 돌며 Chunk를 만든다.
  • 이후 ItemWriter를 활용해 DB저장한다.

💡 그림의 빨간 화살표의 ItemReader,ItemProcessor, ItemWriter의 특징을 기억하자

3.Chunk 구조

예제 코드

package com.example.springbatch_6_13_chunkconfiguration;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.List;

/**
 * packageName    : com.example.springbatch_6_13_chunkconfiguration
 * fileName       : ChunkConfiguration
 * author         : namhyeop
 * date           : 2022/08/06
 * description    :
 * chunk 테스트 예제
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/06        namhyeop       최초 생성
 */

@RequiredArgsConstructor
@Configuration
public class ChunkConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .start(step1())
                .next(step2())
                .build();
    }

    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .<String, String>chunk(5)
                .reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3","item4","item5")))
                .processor(new ItemProcessor<String, String>() {
                    @Override
                    public String process(String item) throws Exception {
                        Thread.sleep(300);
                        System.out.println("item = " + item);
                        return "my" + item;
                    }
                })
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        Thread.sleep(300);
                        System.out.println("items = " + items);
                    }
                })
                .build();
    }

    @Bean
    public Step step2(){
        return stepBuilderFactory.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println(">> step2 has executed");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
}

ChunkOrientedTasklet

1.기본 개념

  • ChunkOrientedTasklet 은 스프링 배치에서 제공하는 Tasklet 의 구현체로서 Chunk 지향 프로세싱를 담당하는 도메인 객체
  • ItemReader, ItemWriter, ItemProcessor 를 사용해 Chunk 기반의 데이터 입출력 처리를 담당한다
  • TaskletStep 에 의해서 반복적으로 실행되며 ChunkOrientedTasklet 이 실행 될 때마다 매번 새로운 트랜잭션이 생성되어 처리가 이루어진다
  • exception이 발생할 경우, 해당 Chunk는 롤백 되며 이전에 커밋한 Chunk는 완료된 상태가 유지된다
  • 내부적으로 ItemReader 를 핸들링 하는 ChunkProvider 와 ItemProcessor, ItemWriter 를 핸들링하는 ChunkProcessor 타입의 구현체를 가진다

2.구조

3.API 소개

4.현재 학습중인 영역

오타 TaskletStepBuilder → SimpleStepBuilder

예제 코드

package com.example.springbatch_6_14_chunkorientedtasklet;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.List;

/**
 * packageName    : com.example.springbatch_6_14_chunkorientedtasklet
 * fileName       : ChunkOrientedTaskletConfiguration
 * author         : namhyeop
 * date           : 2022/08/06
 * description    :
 * Chunk가 2개씩 도달할 때 마다 출력되는 예제
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/06        namhyeop       최초 생성
 */
@RequiredArgsConstructor
@Configuration
public class ChunkOrientedTaskletConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .start(step1())
                .next(step2())
                .build();
    }

    @Bean
    @JobScope
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .<String,String>chunk(2)
                .reader(new ListItemReader<>(Arrays.asList("item1","item2","item3","item4","item5","item6")))
                .processor(new ItemProcessor<String, String>() {
                    @Override
                    public String process(String item) throws Exception {
                        return "my_" + item;
                    }
                })
                .writer(new ItemWriter<String>() {
                    int count = 0;
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        System.out.println("===========Cur Repeat " + count++ + " ============");
                        items.forEach(item -> System.out.println(item));
                    }
                })
                .build();
    }

    @Bean
    public Step step2(){
        return stepBuilderFactory.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step2 has executed");
                    return RepeatStatus.FINISHED;
                }).build();
    }
}
  • 청크가 2개씩 읽을 때마다 출력하는 예제이다.
  • 갯수가 남았다면 다시 돌아가서 2개를 읽고 출력한다.

ChunkkProvider/ChunkProcessor

1.ChunkProvider 기본개념

  • ItemReader 를 사용해서 소스로부터 아이템을 Chunk size 만큼 읽어서 Chunk 단위로 만들어 제공하는 도메인 객체
  • Chunk 를 만들고 내부적으로 반복문을 사용해서 ItemReader.read() 를 계속 호출하면서 item 을 Chunk 에 쌓는다
  • 외부로 부터 ChunkProvider 가 호출될 때마다 항상 새로운 Chunk 가 생성된다
  • 반복문 종료 시점
    • Chunk size 만큼 item 을 읽으면 반복문 종료되고 ChunkProcessor 로 넘어감
    • ItemReader 가 읽은 item 이 null 일 경우 반복문 종료 및 해당 Step 반복문까지 종료
  • 기본 구현체로서 SimpleChunkProvider 와 FaultTolerantChunkProvider 가 있다

2.구조

3.코드 분석

4.ChunkProcessor 기본 개념

  • ItemProcessor 를 사용해서 Item 을 변형, 가공, 필터링하고 ItemWriter 를 사용해서 Chunk 데이터를 저장, 출력한다
  • Chunk 를 만들고 앞에서 넘어온 Chunk 의 item 을 한 건씩 처리한 후 Chunk 에 저장한다
  • 외부로 부터 ChunkProcessor 가 호출될 때마다 항상 새로운 Chunk 가 생성된다
  • ItemProcessor 는 설정 시 선택사항으로서 만약 객체가 존재하지 않을 경우 ItemReader 에서 읽은 item 그대로가 Chunk 에 저장된다
  • ItemProcessor 처리가 완료되면 Chunk 에 있는 List 을 ItemWriter 에게 전달한다
  • ItemWriter 처리가 완료되면 Chunk 트랜잭션이 종료하게 되고 Step 반복문에서 ChunkOrientedTasklet 가 새롭게 실행된다
  • ItemWriter 는 Chunk size 만큼 데이터를 Commit 처리 하기 때문에 Chunk size 는 곧 Commit Interval 이 된다
    • commit interval은 커밋주기로 해석
  • 기본 구현체로서 SimpleChunkProcessor 와 FaultTolerantChunkProcessor 가 있다

5.구조

package com.example.springbatch_6_14_chunkorientedtasklet;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.List;

/**
 * packageName    : com.example.springbatch_6_14_chunkorientedtasklet
 * fileName       : ChunkOrientedTaskletConfiguration
 * author         : namhyeop
 * date           : 2022/08/06
 * description    :
 * Provider 흐름 확인하는 예제.
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/06        namhyeop       최초 생성
 */
@RequiredArgsConstructor
@Configuration
public class ChunkOrientedTaskletConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .start(step1())
                .next(step2())
                .build();
    }

    @Bean
    @JobScope
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .<String,String>chunk(2)
                .reader(new ListItemReader<>(Arrays.asList("item1","item2","item3","item4","item5","item6")))
                .processor(new ItemProcessor<String, String>() {
                    @Override
                    public String process(String item) throws Exception {
                        return "my_" + item;
                    }
                })
                .writer(new ItemWriter<String>() {
                    int count = 0;
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        System.out.println("===========Cur Repeat " + count++ + " ============");
                        items.forEach(item -> System.out.println(item));
                    }
                })
                .build();
    }

    @Bean
    public Step step2(){
        return stepBuilderFactory.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step2 has executed");
                    return RepeatStatus.FINISHED;
                }).build();
    }
}

ItemReader

1.기본개념

  • 다양한 입력으로부터 데이터를 읽어서 제공하는 인터페이스
    • 플랫(Flat) 파일 – csv, txt (고정 위치로 정의된 데이터 필드나 특수문자로 구별된 데이터의 행)
    • XML, Json
    • Database
    • JMS(Java Message Service), RabbitMQ 와 같은 Messag Queuing 서비스
    • Custom Reader - 구현 시 멀티 스레드 환경에서 스레드에 안전하게 구현할 필요가 있음
  • ChunkOrientedTasklet 실행 시 필수적 요소로 설정해야 한다

2.구조

  • ItemRead는 한 건의 데이터만 읽는것을 자료 반환형을 통해 확인이 가능하다.
  • T read()
    • 입력 데이터를 읽고 다음 데이터로 이동한다
    • 아이템 하나를 리턴하며 더 이상 아이템이 없는 경우 null 리턴
    • 아이템 하나는 파일의 한줄, DB 의 한 row 혹은 XML 파일에서 하나의 엘리먼트가 될 수 있다
    • 더 이상 처리해야 할 Item 이 없어도 예외가 발생하지 않고 ItemProcessor 와 같은 다음 단계로 넘어 간다

3.Spring Batch가 제공하는 다양한 Reader

  • 다수의 구현체들이 ItemReader 와 ItemStream 인터페이스를 동시에 구현하고 있음
    • 파일의 스트림을 열거나 종료, DB 커넥션을 열거나 종료, 입력 장치 초기화 등의 작업
    • ExecutionContext 에 read 와 관련된 여러가지 상태 정보를 저장해서 재시작 시 다시 참조 하도록 지원
  • 일부를 제외하고 하위 클래스들은 기본적으로 스레드에 안전하지 않기 때문에 병렬 처리시 데이터 정합성을 위한 동기화 처리 필요

  • Spring Batch는 File 타입의 Reader는 4개(FlatFileItemReader, StateEventItemreader, JsonItemReader, MultiResourceItemReader)를 지원한다.
  • DB 타입 Reader는 5개(JdbcCursorItemReader, JpaCursorItemReader, JdbcPagingItemReader, JpaPagingItemReader, ItemReaderAdpater)를 지원한다.
  • 사용자가 커스텀 할 수 있는 기능은 2개(SynchronizedItemStreamReader, CustomItemReader)를 지원한다.

ItemWriter

1.기본개념

  • Chunk 단위로 데이터를 받아 일괄 출력 작업을 위한 인터페이스
    • 플랫(Flat) 파일 – csv, txt
    • XML, Json
    • Database
    • JMS, RabbitMQ 와 같은 Messag Queuing 서비스
    • Mail Service
    • Custom Writer
  • 아이템 하나가 아닌 아이템 리스트를 전달 받는다.
  • ChunkOrientedTasklet 실행 시 필수적 요소로 설정해야 한다

2.구조

  • void write(List<? extends T> items)
    • 출력 데이터를 아이템 리스트로 받아 처리한다
    • 출력이 완료되고 트랜잭션이 종료되면 새로운 Chunk 단위 프로세스로 이동한다

3.Spring Batch가 제공하는 다양한 Writer

  • 다수의 구현체들이 ItemWriter 와 ItemStream 을 동시에 구현하고 있다
    • 파일의 스트림을 열거나 종료, DB 커넥션을 열거나 종료, 출력 장치 초기화 등의 작업
  • 보통 ItemReader 구현체와 1:1 대응 관계인 구현체들로 구성되어 있다
  • 이번 글에서는 노란색박스로 그려진 JpaItemWriter, JdbcBatchItemWriter, StatEventItemWriter, FlatFileItemWeriter + JsonItemWriter를 학습한다.

ItemProcessor

1.기본개념

  • 데이터를 출력하기 전에 데이터를 가공, 변형, 필터링하는 역할
  • ItemReader 및 ItemWriter 와 분리되어 비즈니스 로직을 구현할 수 있다
  • ItemReader 로 부터 받은 아이템을 특정 타입으로 변환해서 ItemWriter에 넘겨 줄 수 있다
  • ItemReader 로 부터 받은 아이템들 중 필터과정을 거쳐 원하는 아이템들만 ItemWriter 에게 넘겨줄 수 있다
    • ItemProcessor 에서 process() 실행결과 null을 반환하면 Chunk 에 저장되지 않기 때문에 결국 ItemWriter에 전달되지 않는다
  • ChunkOrientedTasklet 실행 시 선택적 요소이기 때문에 청크 기반 프로세싱에서 ItemProcessor 단계가 반드시 필요한 것은 아니다.

2.구조

  • O Process
  • 제네릭은 ItemReader에서 받을 데이터 타입 지정
  • 제네릭은 ItemWriter 에게 보낼 데이터 타입 지정
  • 아이템 하나씩 가공 처리하며 null 리턴할 경우 해당 아이템은 Chunk 에 저장되지 않음

4.Spring Batch가 제공하는 다양한 Reader

  • ItemStream 을 구현하지 않는다
  • 거의 대부분 Customizing 해서 사용하기 때문에 기본적으로 제공되는 구현체가 적다

예제 코드.

package com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * packageName    : com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor
 * fileName       : Customer
 * author         : namhyeop
 * date           : 2022/08/06
 * description    :
 * 테스트 예제 자료형
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/06        namhyeop       최초 생성
 */
@Data
@AllArgsConstructor
public class Customer {
    private String name;
}
package com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor;

import org.springframework.batch.item.ItemWriter;

import java.util.List;

/**
 * packageName    : com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor
 * fileName       : CustomerItemWriter
 * author         : namhyeop
 * date           : 2022/08/06
 * description    :
 * 사용자정의 writer
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/06        namhyeop       최초 생성
 */
public class CustomerItemWriter implements ItemWriter<Customer> {

    @Override
    public void write(List<? extends Customer> items) throws Exception {
        items.forEach(item -> System.out.println(item));
    }
}
package com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor;

import org.springframework.batch.item.ItemProcessor;

import java.util.Locale;

/**
 * packageName    : com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor
 * fileName       : CustomItemProcessor
 * author         : namhyeop
 * date           : 2022/08/06
 * description    :
 * 사용자 정의 Processor. reader로 데이터가 넘어올 때마다 customer 객체의 name을 대문자로 바꿔주는 Processor이다.
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/06        namhyeop       최초 생성
 */
public class CustomItemProcessor implements ItemProcessor<Customer, Customer> {

    @Override
    public Customer process(Customer customer) throws Exception {
        customer.setName(customer.getName().toUpperCase());
        return customer;
    }
}
package com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

import java.util.ArrayList;
import java.util.List;

/**
 * packageName    : com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor
 * fileName       : CustomItemReader
 * author         : namhyeop
 * date           : 2022/08/06
 * description    :
 * 사용자 정의 reader
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/06        namhyeop       최초 생성
 */
public class CustomItemReader implements ItemReader<Customer> {

    private List<Customer> list;

    public CustomItemReader(List<Customer> list){
        this.list = new ArrayList<>(list);
    }

    @Override
    public Customer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        if(!list.isEmpty()){
            return list.remove(0);
        }
        return null;
    }
}
package com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;

/**
 * packageName    : com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor
 * fileName       : ItemReader_ItemProcessor_ItemWriter_Configuration
 * author         : namhyeop
 * date           : 2022/08/06
 * description    :
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/06        namhyeop       최초 생성
 */

@RequiredArgsConstructor
@Configuration
public class ItemReader_ItemProcessor_ItemWriter_Configuration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .start(step1())
                .next(step2())
                .build();
    }

    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .<Customer, Customer> chunk(3)
                .reader(itemReader())
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public ItemWriter<? super Customer> itemWriter(){
        return new CustomerItemWriter();
    }
    @Bean
    public ItemProcessor<? super Customer, ? extends Customer> itemProcessor(){
        return new CustomItemProcessor();
    }

    @Bean
    public ItemReader<Customer> itemReader(){
        return new CustomItemReader(Arrays.asList(
                new Customer("user1"),
                new Customer("user2"),
                new Customer("user3")));
    }

    @Bean
    public Step step2(){
        return stepBuilderFactory.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step2 has executed");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
}

ItemStream

1.기본 개념

  • ItemReader 와 ItemWriter 처리 과정 중 상태를 저장하고 오류가 발생하면 해당 상태를 참조하여 실패한 곳에서 재 시작 하도록 지원
  • 리소스를 열고(open) 닫아야(close) 하며 입출력 장치 초기화 등의 작업을 해야 하는 경우
  • ExecutionContext 를 매개변수로 받아서 상태 정보를 업데이트(update) 한다
  • ItemReader 및 ItemWriter 는 ItemStream 을 구현해야 한다

2.구조

예제코드

package com.example.springbatch_7_5_itemstream;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * packageName    : com.example.springbatch_7_5_itemstream
 * fileName       : ItemStreamConfiguration
 * author         : namhyeop
 * date           : 2022/08/06
 * description    :
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/06        namhyeop       최초 생성
 */
@RequiredArgsConstructor
@Configuration
public class ItemStreamConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .start(step1())
                .next(step2())
                .build();
    }

    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .<String,String > chunk(5)
                .reader(itemReader())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public ItemWriter itemWriter() {
        return new CustomItemWriter();
    }

    @Bean
    public CustomItemStreamReader itemReader(){
        List<String> items = new ArrayList<>(10);
        for(int i = 0; i <= 10; i++){
            items.add(String.valueOf(i));
        }
        return new CustomItemStreamReader(items);
    }

    @Bean
    public Step step2(){
        return stepBuilderFactory.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step2 has executed");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
}
package com.example.springbatch_7_5_itemstream;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.batch.item.ItemWriter;

import java.util.List;

/**
 * packageName    : com.example.springbatch_7_5_itemstream
 * fileName       : CustomItemWriter
 * author         : namhyeop
 * date           : 2022/08/06
 * description    :
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/06        namhyeop       최초 생성
 */
public class CustomItemWriter implements ItemStreamWriter<String> {

    @Override
    public void write(List<? extends String> items) throws Exception {
        System.out.println("ItemStreamWriter - write");
        items.forEach(item -> System.out.println(item));
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        System.out.println("ItemStreamWriter - open");

    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        System.out.println("ItemStreamWriter - update");
    }

    @Override
    public void close() throws ItemStreamException {
        System.out.println("ItemStreamWriter - close");
    }
}
package com.example.springbatch_7_5_itemstream;

import org.springframework.batch.item.*;

import java.util.List;

/**
 * packageName    : com.example.springbatch_7_5_itemstream
 * fileName       : CustomItemStreamReader
 * author         : namhyeop
 * date           : 2022/08/06
 * description    :
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/06        namhyeop       최초 생성
 */
public class CustomItemStreamReader implements ItemStreamReader<String> {

    private final List<String> items;
    private int index = -1;
    private boolean restart = false;

    public CustomItemStreamReader(List<String> items){
        this.items = items;
        this.index = 0;
    }

    @Override
    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        String item = null;

        if(this.index < this.items.size()){
            item = this.items.get(index);
            index++;
        }

        if(this.index == 6 && !restart){
            throw new RuntimeException("Restart is required");
        }
        return item;
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if(executionContext.containsKey("index")){
            index = executionContext.getInt("index");
            this.restart = true;
        }else{
            index = 0;
            executionContext.put("index", index);
        }
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.put("index", index);
    }

    @Override
    public void close() throws ItemStreamException {
        System.out.println("close");
    }
}

아키텍쳐

반응형

'Spring Batch' 카테고리의 다른 글

9.Spring Batch의 Chunk와 ItemWriter  (0) 2024.11.08
8.Spring Batch의 Chunk와 ItemReader  (0) 2024.11.08
6.Spring Batch의 Flow  (1) 2024.11.07
5.Spring Batch의 Step  (0) 2024.11.07
4.Spring Batch의 Job  (0) 2024.11.06