반응형
CompositeItemProcessor
기본개념
- ItemProcessor 들을 연결(Chaining)해서 위임하면 각 ItemProcessor 를 실행시킨다
- 예를 들자면 3개의 ItemProcessor가 있고 각 ItemProcessor에 작업중인 객체를 전달해줘야 하는 상황이라 가정해보자
- 1번 ItemProcessor는 문자열 전체를 대문자로 바꿔준다.
- 2번 ItemProcessor는 UUID를 문자열에 더해준다.
- 3번 ItemProcessor는 A를 B로 바꿔주는 ItemProcessor이다.
- 이럴 경우 각 ItemProcessore는 작업중인 문자열을 반환해줘야한다. CompositeItemProcessor는 작업중인 객체를 전달하여 동작할 ItemProcessor를 chaining 형식으로 구성한다.
- 예를 들자면 3개의 ItemProcessor가 있고 각 ItemProcessor에 작업중인 객체를 전달해줘야 하는 상황이라 가정해보자
- 이전 ItemProcessor 반환 값은 다음 ItemProcessor 값 으로 연결된다
API
CompositeItemProcessor의 흐름
CompositeItemProcessor의 핵심 메소드
- CompositeItemProcessor의 핵심 메소드를 캡쳐한 사진이다.
- Chaning으로 이루어진 ItemProcessor가 더해서 하나의 List로 이루어지는 과정이 좌측 사진이다.
- 우측 사진은 현재 ItemProcessor가 동작한 뒤 result를 통해 다음 ItemProcessor로 진행되는 로직이 담겨있는 부분이다.
예제코드
package com.example.springbatch_10_1_compositeitemprocessor;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.*;
import org.springframework.batch.item.support.builder.CompositeItemProcessorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
/**
* packageName : com.example.springbatch_10_1_compositeitemprocessor
* fileName : CompositionItemConfiguration
* author : namhyeop
* date : 2022/08/16
* description :
* CompositeItem을 이루는 Configuration
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/16 namhyeop 최초 생성
*/
@RequiredArgsConstructor
@Configuration
public class CompositionItemConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job job(){
return jobBuilderFactory.get("batchJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<String,String>chunk(5)
.reader(new ItemReader<String>() {
int i = 0;
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
i++;
return i > 10 ? null : "item";
}
})
.processor(new ItemProcessor<String, String>() {
@Override
public String process(String s) throws Exception {
return null;
}
})
.processor(customItemProcessor())
.writer(items -> {
System.out.println(items);
System.out.println("===================");
}
)
.build();
}
@Bean
public ItemProcessor<? super String, String> customItemProcessor() {
List itemProcessor = new ArrayList<>();
itemProcessor.add(new CustomItemProcessor());
itemProcessor.add(new CustomItemProcessor2());
return new CompositeItemProcessorBuilder<>()
.delegates(itemProcessor)
.build();
}
}
package com.example.springbatch_10_1_compositeitemprocessor;
import org.springframework.batch.item.ItemProcessor;
/**
* packageName : com.example.springbatch_10_1_compositeitemprocessor
* fileName : CustomItemProcessor
* author : namhyeop
* date : 2022/08/16
* description :
* CompositeItemProcessor를 이루는 첫 번째 ItemProcessor
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/16 namhyeop 최초 생성
*/
public class CustomItemProcessor implements ItemProcessor<String, String> {
int cnt = 0;
@Override
public String process(String item) throws Exception {
cnt++;
return (item + " this is process1 " + cnt);
}
}
package com.example.springbatch_10_1_compositeitemprocessor;
import org.springframework.batch.item.ItemProcessor;
/**
* packageName : com.example.springbatch_10_1_compositeitemprocessor
* fileName : CustomItemProcessor
* author : namhyeop
* date : 2022/08/16
* description :
* CompositeItemProcessor를 이루는 두 번째 ItemProcessor
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/16 namhyeop 최초 생성
*/
public class CustomItemProcessor2 implements ItemProcessor<String, String> {
int cnt = 0;
@Override
public String process(String item) throws Exception {
cnt++;
return (item + " this is process2 " + cnt);
}
}
ClassifierCompositeItemProcessor
기본개념
- Classifier(분류자)로 라우팅 패턴을 구현해서 ItemProcessor 구현체 중에서 하나를 호출하는 역할을 한다
API
- ClassFier<C, T> 에서 C는 Classifier를 의미하고 T는 Classifier를 통해 반환될 return 값을 의미한다.
ClassifierCompositeItemProcessor의 동작 흐름
- 그림에서 OrderItemProcessor와 PayItemProcessor는 이해를 돕기 위해 만든 Processor 예제이다.
- Classifier를 통해 return 값이 1일 경우에는 OrderItemProcessor가 반환되고 2가 반환될 경우는 PayItemProcessor가 반환되는것으로 이해하면 된다.
- 최종적으로는 결과값을 ItemWriter에 전달하게된다.
예제코드
package com.example.springbatch_10_2_classifiercompositeitemprocessor;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.*;
import org.springframework.batch.item.support.ClassifierCompositeItemProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* packageName : com.example.springbatch_10_2_classifiercompositeitemprocessor
* fileName : ClassfileConfiguration
* author : namhyeop
* date : 2022/08/16
* description :
* ClassifierCompositeItemProcessor 예제, Classfier를 통해 3개의 ItemProcessor중 하나를 고르는 예제 코드
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/16 namhyeop 최초 생성
*/
@RequiredArgsConstructor
@Configuration
public class ClassfileConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job job(){
return jobBuilderFactory.get("batchJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.<ProcessorInfo, ProcessorInfo>chunk(10)
.reader(new ItemReader<ProcessorInfo>() {
int i = 0;
@Override
public ProcessorInfo read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
i++;
ProcessorInfo processorInfo = ProcessorInfo.builder().id(i).build();
return i > 3 ? null : processorInfo;
}
})
.processor(customItemProcessor())
.writer(items -> System.out.println(items))
.build();
}
//classfier 정의
@Bean
public ItemProcessor<? super ProcessorInfo, ? extends ProcessorInfo> customItemProcessor() {
ClassifierCompositeItemProcessor<ProcessorInfo, ProcessorInfo> processor = new ClassifierCompositeItemProcessor<>();
//구분자 설정 과정
ProcessorClassifier<ProcessorInfo, ItemProcessor<?, ? extends ProcessorInfo>> classifier = new ProcessorClassifier<>();
Map<Integer, ItemProcessor<ProcessorInfo, ProcessorInfo>> processorMap = new HashMap<>();
//반환값 1일 경우 CustomItmeProcessor1() 실행. 조회하는 로직은 ProcessorClassifier의 classfiy이 메소드를 통해 실행된다.
processorMap.put(1, new CustomItemProcessor1());
//반환값 2일 경우 CustomItmeProcessor2() 실행.
processorMap.put(2, new CustomItemProcessor2());
//반환값 3일 경우 CustomItmeProcessor3() 실행.
processorMap.put(3, new CustomItemProcessor3());
classifier.setProcessorMap(processorMap);
//구분자를 ClassifierCompositeItemProcessor에 등록
processor.setClassifier(classifier);
return processor;
}
}
package com.example.springbatch_10_2_classifiercompositeitemprocessor;
import org.apache.tomcat.jni.Proc;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.classify.Classifier;
import java.util.HashMap;
import java.util.Map;
/**
* packageName : com.example.springbatch_10_2_classifiercompositeitemprocessor
* fileName : ProcessorClassifier
* author : namhyeop
* date : 2022/08/16
* description :
* Processor를 등록해주는 역할
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/16 namhyeop 최초 생성
*/
public class ProcessorClassifier<C, T> implements Classifier<C,T> {
private Map<Integer, ItemProcessor<ProcessorInfo,ProcessorInfo>> processorMap = new HashMap<>();
@Override
public T classify(C classifiable) {
return (T)processorMap.get(((ProcessorInfo)classifiable).getId());
}
public void setProcessorMap(Map<Integer, ItemProcessor<ProcessorInfo, ProcessorInfo>> processorMap) {
this.processorMap = processorMap;
}
}
package com.example.springbatch_10_2_classifiercompositeitemprocessor;
import lombok.Builder;
import lombok.Data;
/**
* packageName : com.example.springbatch_10_2_classifiercompositeitemprocessor
* fileName : ProcessorInfo
* author : namhyeop
* date : 2022/08/16
* description :
* 조건에 따른 ItemProcessor를 선택하는 기준이 되는 Class
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/16 namhyeop 최초 생성
*/
@Data
@Builder
public class ProcessorInfo {
private int id;
}
package com.example.springbatch_10_2_classifiercompositeitemprocessor;
import org.springframework.batch.item.ItemProcessor;
/**
* packageName : com.example.springbatch_10_2_classifiercompositeitemprocessor
* fileName : CustomItemProcessor1
* author : namhyeop
* date : 2022/08/16
* description :
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/16 namhyeop 최초 생성
*/
public class CustomItemProcessor1 implements ItemProcessor<ProcessorInfo, ProcessorInfo> {
@Override
public ProcessorInfo process(ProcessorInfo processorInfo) throws Exception {
System.out.println("Checked CustomItemProcessor1");
System.out.println("=============================");
return processorInfo;
}
}
package com.example.springbatch_10_2_classifiercompositeitemprocessor;
import org.springframework.batch.item.ItemProcessor;
/**
* packageName : com.example.springbatch_10_2_classifiercompositeitemprocessor
* fileName : CustomItemProcessor2
* author : namhyeop
* date : 2022/08/16
* description :
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/16 namhyeop 최초 생성
*/
public class CustomItemProcessor2 implements ItemProcessor<ProcessorInfo, ProcessorInfo> {
@Override
public ProcessorInfo process(ProcessorInfo processorInfo) throws Exception {
System.out.println("Checked CustomItemProcessor2");
System.out.println("=============================");
return processorInfo;
}
}
package com.example.springbatch_10_2_classifiercompositeitemprocessor;
import org.springframework.batch.item.ItemProcessor;
/**
* packageName : com.example.springbatch_10_2_classifiercompositeitemprocessor
* fileName : CustomItemProcessor3
* author : namhyeop
* date : 2022/08/16
* description :
* ===========================================================
* DATE AUTHOR NOTE
* -----------------------------------------------------------
* 2022/08/16 namhyeop 최초 생성
*/
public class CustomItemProcessor3 implements ItemProcessor<ProcessorInfo, ProcessorInfo> {
@Override
public ProcessorInfo process(ProcessorInfo processorInfo) throws Exception {
System.out.println("Checked CustomItemProcessor3");
System.out.println("=============================");
return processorInfo;
}
}
반응형
'Spring Batch' 카테고리의 다른 글
9.Spring Batch의 Chunk와 ItemWriter (0) | 2024.11.08 |
---|---|
8.Spring Batch의 Chunk와 ItemReader (0) | 2024.11.08 |
7.Spring Batch의 Chunk와 동작원리 살펴보기 (0) | 2024.11.08 |
6.Spring Batch의 Flow (1) | 2024.11.07 |
5.Spring Batch의 Step (0) | 2024.11.07 |