반응형
Chunk
1.기본개념
- Chunk 란 여러 개의 아이템을 묶은 하나의 덩어리, 블록을 의미
- 한번에 하나씩 아이템을 입력 받아 Chunk 단위의 덩어리로 만든 후 Chunk 단위로 트랜잭션을 처리함, 즉 Chunk 단위의 Commit 과 Rollback 이 이루어짐
- 일반적으로 대용량 데이터를 한번에 처리하는 것이 아닌 청크 단위로 쪼개어서 더 이상 처리할 데이터가 없을 때까지 반복해서 입출력하는데 사용됨
- Chunk vs Chunk
- Chunk 는 ItemReader 로 읽은 하나의 아이템을 Chunk 에서 정한 개수만큼 반복해서 저장하는 타입
- Chunk 는 ItemReader 로부터 전달받은 Chunk 를 참조해서 ItemProcessor 에서 적절하게 가공, 필터링한 다음 ItemWriter 에 전달하는 타입
2.아키텍쳐 흐름도
- Chunk는 ChunkSize 개수에 될 때까지 반복한다
- ChunkSize에 도달한 이후에는 List에 저장된 Chunk정보를 Iteratior를 통해 반복문을 돌며 Chunk를 만든다.
- 이후 ItemWriter를 활용해 DB저장한다.
💡 그림의 빨간 화살표의 ItemReader,ItemProcessor, ItemWriter의 특징을 기억하자
3.Chunk 구조
예제 코드
package com.example.springbatch_6_13_chunkconfiguration;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.List;
/**
* packageName : com.example.springbatch_6_13_chunkconfiguration
* fileName : ChunkConfiguration
* author : namhyeop
* date : 2022/08/06
* description :
* chunk 테스트 예제
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/06 namhyeop 최초 생성
*/
@RequiredArgsConstructor
@Configuration
public class ChunkConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job job(){
return jobBuilderFactory.get("batchJob")
.start(step1())
.next(step2())
.build();
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.<String, String>chunk(5)
.reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3","item4","item5")))
.processor(new ItemProcessor<String, String>() {
@Override
public String process(String item) throws Exception {
Thread.sleep(300);
System.out.println("item = " + item);
return "my" + item;
}
})
.writer(new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
Thread.sleep(300);
System.out.println("items = " + items);
}
})
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet((contribution, chunkContext) -> {
System.out.println(">> step2 has executed");
return RepeatStatus.FINISHED;
})
.build();
}
}
ChunkOrientedTasklet
1.기본 개념
- ChunkOrientedTasklet 은 스프링 배치에서 제공하는 Tasklet 의 구현체로서 Chunk 지향 프로세싱를 담당하는 도메인 객체
- ItemReader, ItemWriter, ItemProcessor 를 사용해 Chunk 기반의 데이터 입출력 처리를 담당한다
- TaskletStep 에 의해서 반복적으로 실행되며 ChunkOrientedTasklet 이 실행 될 때마다 매번 새로운 트랜잭션이 생성되어 처리가 이루어진다
- exception이 발생할 경우, 해당 Chunk는 롤백 되며 이전에 커밋한 Chunk는 완료된 상태가 유지된다
- 내부적으로 ItemReader 를 핸들링 하는 ChunkProvider 와 ItemProcessor, ItemWriter 를 핸들링하는 ChunkProcessor 타입의 구현체를 가진다
2.구조
3.API 소개
4.현재 학습중인 영역
예제 코드
package com.example.springbatch_6_14_chunkorientedtasklet;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.List;
/**
* packageName : com.example.springbatch_6_14_chunkorientedtasklet
* fileName : ChunkOrientedTaskletConfiguration
* author : namhyeop
* date : 2022/08/06
* description :
* Chunk가 2개씩 도달할 때 마다 출력되는 예제
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/06 namhyeop 최초 생성
*/
@RequiredArgsConstructor
@Configuration
public class ChunkOrientedTaskletConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job job(){
return jobBuilderFactory.get("batchJob")
.start(step1())
.next(step2())
.build();
}
@Bean
@JobScope
public Step step1(){
return stepBuilderFactory.get("step1")
.<String,String>chunk(2)
.reader(new ListItemReader<>(Arrays.asList("item1","item2","item3","item4","item5","item6")))
.processor(new ItemProcessor<String, String>() {
@Override
public String process(String item) throws Exception {
return "my_" + item;
}
})
.writer(new ItemWriter<String>() {
int count = 0;
@Override
public void write(List<? extends String> items) throws Exception {
System.out.println("===========Cur Repeat " + count++ + " ============");
items.forEach(item -> System.out.println(item));
}
})
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet((contribution, chunkContext) -> {
System.out.println("step2 has executed");
return RepeatStatus.FINISHED;
}).build();
}
}
- 청크가 2개씩 읽을 때마다 출력하는 예제이다.
- 갯수가 남았다면 다시 돌아가서 2개를 읽고 출력한다.
ChunkkProvider/ChunkProcessor
1.ChunkProvider 기본개념
- ItemReader 를 사용해서 소스로부터 아이템을 Chunk size 만큼 읽어서 Chunk 단위로 만들어 제공하는 도메인 객체
- Chunk 를 만들고 내부적으로 반복문을 사용해서 ItemReader.read() 를 계속 호출하면서 item 을 Chunk 에 쌓는다
- 외부로 부터 ChunkProvider 가 호출될 때마다 항상 새로운 Chunk 가 생성된다
- 반복문 종료 시점
- Chunk size 만큼 item 을 읽으면 반복문 종료되고 ChunkProcessor 로 넘어감
- ItemReader 가 읽은 item 이 null 일 경우 반복문 종료 및 해당 Step 반복문까지 종료
- 기본 구현체로서 SimpleChunkProvider 와 FaultTolerantChunkProvider 가 있다
2.구조
3.코드 분석
4.ChunkProcessor 기본 개념
- ItemProcessor 를 사용해서 Item 을 변형, 가공, 필터링하고 ItemWriter 를 사용해서 Chunk 데이터를 저장, 출력한다
- Chunk 를 만들고 앞에서 넘어온 Chunk 의 item 을 한 건씩 처리한 후 Chunk 에 저장한다
- 외부로 부터 ChunkProcessor 가 호출될 때마다 항상 새로운 Chunk 가 생성된다
- ItemProcessor 는 설정 시 선택사항으로서 만약 객체가 존재하지 않을 경우 ItemReader 에서 읽은 item 그대로가 Chunk 에 저장된다
- ItemProcessor 처리가 완료되면 Chunk 에 있는 List 을 ItemWriter 에게 전달한다
- ItemWriter 처리가 완료되면 Chunk 트랜잭션이 종료하게 되고 Step 반복문에서 ChunkOrientedTasklet 가 새롭게 실행된다
- ItemWriter 는 Chunk size 만큼 데이터를 Commit 처리 하기 때문에 Chunk size 는 곧 Commit Interval 이 된다
- commit interval은 커밋주기로 해석
- 기본 구현체로서 SimpleChunkProcessor 와 FaultTolerantChunkProcessor 가 있다
5.구조
package com.example.springbatch_6_14_chunkorientedtasklet;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.List;
/**
* packageName : com.example.springbatch_6_14_chunkorientedtasklet
* fileName : ChunkOrientedTaskletConfiguration
* author : namhyeop
* date : 2022/08/06
* description :
* Provider 흐름 확인하는 예제.
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/06 namhyeop 최초 생성
*/
@RequiredArgsConstructor
@Configuration
public class ChunkOrientedTaskletConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job job(){
return jobBuilderFactory.get("batchJob")
.start(step1())
.next(step2())
.build();
}
@Bean
@JobScope
public Step step1(){
return stepBuilderFactory.get("step1")
.<String,String>chunk(2)
.reader(new ListItemReader<>(Arrays.asList("item1","item2","item3","item4","item5","item6")))
.processor(new ItemProcessor<String, String>() {
@Override
public String process(String item) throws Exception {
return "my_" + item;
}
})
.writer(new ItemWriter<String>() {
int count = 0;
@Override
public void write(List<? extends String> items) throws Exception {
System.out.println("===========Cur Repeat " + count++ + " ============");
items.forEach(item -> System.out.println(item));
}
})
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet((contribution, chunkContext) -> {
System.out.println("step2 has executed");
return RepeatStatus.FINISHED;
}).build();
}
}
ItemReader
1.기본개념
- 다양한 입력으로부터 데이터를 읽어서 제공하는 인터페이스
- 플랫(Flat) 파일 – csv, txt (고정 위치로 정의된 데이터 필드나 특수문자로 구별된 데이터의 행)
- XML, Json
- Database
- JMS(Java Message Service), RabbitMQ 와 같은 Messag Queuing 서비스
- Custom Reader - 구현 시 멀티 스레드 환경에서 스레드에 안전하게 구현할 필요가 있음
- ChunkOrientedTasklet 실행 시 필수적 요소로 설정해야 한다
2.구조
- ItemRead는 한 건의 데이터만 읽는것을 자료 반환형을 통해 확인이 가능하다.
- T read()
- 입력 데이터를 읽고 다음 데이터로 이동한다
- 아이템 하나를 리턴하며 더 이상 아이템이 없는 경우 null 리턴
- 아이템 하나는 파일의 한줄, DB 의 한 row 혹은 XML 파일에서 하나의 엘리먼트가 될 수 있다
- 더 이상 처리해야 할 Item 이 없어도 예외가 발생하지 않고 ItemProcessor 와 같은 다음 단계로 넘어 간다
3.Spring Batch가 제공하는 다양한 Reader
- 다수의 구현체들이 ItemReader 와 ItemStream 인터페이스를 동시에 구현하고 있음
- 파일의 스트림을 열거나 종료, DB 커넥션을 열거나 종료, 입력 장치 초기화 등의 작업
- ExecutionContext 에 read 와 관련된 여러가지 상태 정보를 저장해서 재시작 시 다시 참조 하도록 지원
- 일부를 제외하고 하위 클래스들은 기본적으로 스레드에 안전하지 않기 때문에 병렬 처리시 데이터 정합성을 위한 동기화 처리 필요
- Spring Batch는 File 타입의 Reader는 4개(FlatFileItemReader, StateEventItemreader, JsonItemReader, MultiResourceItemReader)를 지원한다.
- DB 타입 Reader는 5개(JdbcCursorItemReader, JpaCursorItemReader, JdbcPagingItemReader, JpaPagingItemReader, ItemReaderAdpater)를 지원한다.
- 사용자가 커스텀 할 수 있는 기능은 2개(SynchronizedItemStreamReader, CustomItemReader)를 지원한다.
ItemWriter
1.기본개념
- Chunk 단위로 데이터를 받아 일괄 출력 작업을 위한 인터페이스
- 플랫(Flat) 파일 – csv, txt
- XML, Json
- Database
- JMS, RabbitMQ 와 같은 Messag Queuing 서비스
- Mail Service
- Custom Writer
- 아이템 하나가 아닌 아이템 리스트를 전달 받는다.
- ChunkOrientedTasklet 실행 시 필수적 요소로 설정해야 한다
2.구조
- void write(List<? extends T> items)
- 출력 데이터를 아이템 리스트로 받아 처리한다
- 출력이 완료되고 트랜잭션이 종료되면 새로운 Chunk 단위 프로세스로 이동한다
3.Spring Batch가 제공하는 다양한 Writer
- 다수의 구현체들이 ItemWriter 와 ItemStream 을 동시에 구현하고 있다
- 파일의 스트림을 열거나 종료, DB 커넥션을 열거나 종료, 출력 장치 초기화 등의 작업
- 보통 ItemReader 구현체와 1:1 대응 관계인 구현체들로 구성되어 있다
- 이번 글에서는 노란색박스로 그려진 JpaItemWriter, JdbcBatchItemWriter, StatEventItemWriter, FlatFileItemWeriter + JsonItemWriter를 학습한다.
ItemProcessor
1.기본개념
- 데이터를 출력하기 전에 데이터를 가공, 변형, 필터링하는 역할
- ItemReader 및 ItemWriter 와 분리되어 비즈니스 로직을 구현할 수 있다
- ItemReader 로 부터 받은 아이템을 특정 타입으로 변환해서 ItemWriter에 넘겨 줄 수 있다
- ItemReader 로 부터 받은 아이템들 중 필터과정을 거쳐 원하는 아이템들만 ItemWriter 에게 넘겨줄 수 있다
- ItemProcessor 에서 process() 실행결과 null을 반환하면 Chunk 에 저장되지 않기 때문에 결국 ItemWriter에 전달되지 않는다
- ChunkOrientedTasklet 실행 시 선택적 요소이기 때문에 청크 기반 프로세싱에서 ItemProcessor 단계가 반드시 필요한 것은 아니다.
2.구조
- O Process
- 제네릭은 ItemReader에서 받을 데이터 타입 지정
- 제네릭은 ItemWriter 에게 보낼 데이터 타입 지정
- 아이템 하나씩 가공 처리하며 null 리턴할 경우 해당 아이템은 Chunk 에 저장되지 않음
4.Spring Batch가 제공하는 다양한 Reader
- ItemStream 을 구현하지 않는다
- 거의 대부분 Customizing 해서 사용하기 때문에 기본적으로 제공되는 구현체가 적다
예제 코드.
package com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* packageName : com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor
* fileName : Customer
* author : namhyeop
* date : 2022/08/06
* description :
* 테스트 예제 자료형
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/06 namhyeop 최초 생성
*/
@Data
@AllArgsConstructor
public class Customer {
private String name;
}
package com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor;
import org.springframework.batch.item.ItemWriter;
import java.util.List;
/**
* packageName : com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor
* fileName : CustomerItemWriter
* author : namhyeop
* date : 2022/08/06
* description :
* 사용자정의 writer
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/06 namhyeop 최초 생성
*/
public class CustomerItemWriter implements ItemWriter<Customer> {
@Override
public void write(List<? extends Customer> items) throws Exception {
items.forEach(item -> System.out.println(item));
}
}
package com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor;
import org.springframework.batch.item.ItemProcessor;
import java.util.Locale;
/**
* packageName : com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor
* fileName : CustomItemProcessor
* author : namhyeop
* date : 2022/08/06
* description :
* 사용자 정의 Processor. reader로 데이터가 넘어올 때마다 customer 객체의 name을 대문자로 바꿔주는 Processor이다.
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/06 namhyeop 최초 생성
*/
public class CustomItemProcessor implements ItemProcessor<Customer, Customer> {
@Override
public Customer process(Customer customer) throws Exception {
customer.setName(customer.getName().toUpperCase());
return customer;
}
}
package com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import java.util.ArrayList;
import java.util.List;
/**
* packageName : com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor
* fileName : CustomItemReader
* author : namhyeop
* date : 2022/08/06
* description :
* 사용자 정의 reader
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/06 namhyeop 최초 생성
*/
public class CustomItemReader implements ItemReader<Customer> {
private List<Customer> list;
public CustomItemReader(List<Customer> list){
this.list = new ArrayList<>(list);
}
@Override
public Customer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if(!list.isEmpty()){
return list.remove(0);
}
return null;
}
}
package com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
/**
* packageName : com.example.springbatch_6_15_itemreaderanditemwriteranditemprocessor
* fileName : ItemReader_ItemProcessor_ItemWriter_Configuration
* author : namhyeop
* date : 2022/08/06
* description :
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/06 namhyeop 최초 생성
*/
@RequiredArgsConstructor
@Configuration
public class ItemReader_ItemProcessor_ItemWriter_Configuration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job job(){
return jobBuilderFactory.get("batchJob")
.start(step1())
.next(step2())
.build();
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.<Customer, Customer> chunk(3)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public ItemWriter<? super Customer> itemWriter(){
return new CustomerItemWriter();
}
@Bean
public ItemProcessor<? super Customer, ? extends Customer> itemProcessor(){
return new CustomItemProcessor();
}
@Bean
public ItemReader<Customer> itemReader(){
return new CustomItemReader(Arrays.asList(
new Customer("user1"),
new Customer("user2"),
new Customer("user3")));
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet((contribution, chunkContext) -> {
System.out.println("step2 has executed");
return RepeatStatus.FINISHED;
})
.build();
}
}
ItemStream
1.기본 개념
- ItemReader 와 ItemWriter 처리 과정 중 상태를 저장하고 오류가 발생하면 해당 상태를 참조하여 실패한 곳에서 재 시작 하도록 지원
- 리소스를 열고(open) 닫아야(close) 하며 입출력 장치 초기화 등의 작업을 해야 하는 경우
- ExecutionContext 를 매개변수로 받아서 상태 정보를 업데이트(update) 한다
- ItemReader 및 ItemWriter 는 ItemStream 을 구현해야 한다
2.구조
예제코드
package com.example.springbatch_7_5_itemstream;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* packageName : com.example.springbatch_7_5_itemstream
* fileName : ItemStreamConfiguration
* author : namhyeop
* date : 2022/08/06
* description :
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/06 namhyeop 최초 생성
*/
@RequiredArgsConstructor
@Configuration
public class ItemStreamConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job job(){
return jobBuilderFactory.get("batchJob")
.start(step1())
.next(step2())
.build();
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.<String,String > chunk(5)
.reader(itemReader())
.writer(itemWriter())
.build();
}
@Bean
public ItemWriter itemWriter() {
return new CustomItemWriter();
}
@Bean
public CustomItemStreamReader itemReader(){
List<String> items = new ArrayList<>(10);
for(int i = 0; i <= 10; i++){
items.add(String.valueOf(i));
}
return new CustomItemStreamReader(items);
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet((contribution, chunkContext) -> {
System.out.println("step2 has executed");
return RepeatStatus.FINISHED;
})
.build();
}
}
package com.example.springbatch_7_5_itemstream;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.batch.item.ItemWriter;
import java.util.List;
/**
* packageName : com.example.springbatch_7_5_itemstream
* fileName : CustomItemWriter
* author : namhyeop
* date : 2022/08/06
* description :
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/06 namhyeop 최초 생성
*/
public class CustomItemWriter implements ItemStreamWriter<String> {
@Override
public void write(List<? extends String> items) throws Exception {
System.out.println("ItemStreamWriter - write");
items.forEach(item -> System.out.println(item));
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
System.out.println("ItemStreamWriter - open");
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
System.out.println("ItemStreamWriter - update");
}
@Override
public void close() throws ItemStreamException {
System.out.println("ItemStreamWriter - close");
}
}
package com.example.springbatch_7_5_itemstream;
import org.springframework.batch.item.*;
import java.util.List;
/**
* packageName : com.example.springbatch_7_5_itemstream
* fileName : CustomItemStreamReader
* author : namhyeop
* date : 2022/08/06
* description :
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/06 namhyeop 최초 생성
*/
public class CustomItemStreamReader implements ItemStreamReader<String> {
private final List<String> items;
private int index = -1;
private boolean restart = false;
public CustomItemStreamReader(List<String> items){
this.items = items;
this.index = 0;
}
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
String item = null;
if(this.index < this.items.size()){
item = this.items.get(index);
index++;
}
if(this.index == 6 && !restart){
throw new RuntimeException("Restart is required");
}
return item;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
if(executionContext.containsKey("index")){
index = executionContext.getInt("index");
this.restart = true;
}else{
index = 0;
executionContext.put("index", index);
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.put("index", index);
}
@Override
public void close() throws ItemStreamException {
System.out.println("close");
}
}
아키텍쳐
반응형
'Spring Batch' 카테고리의 다른 글
9.Spring Batch의 Chunk와 ItemWriter (0) | 2024.11.08 |
---|---|
8.Spring Batch의 Chunk와 ItemReader (0) | 2024.11.08 |
6.Spring Batch의 Flow (1) | 2024.11.07 |
5.Spring Batch의 Step (0) | 2024.11.07 |
4.Spring Batch의 Job (0) | 2024.11.06 |