10.Spring Batch의 Chunk와 ItemProcessor
Spring Batch

10.Spring Batch의 Chunk와 ItemProcessor

반응형

CompositeItemProcessor

기본개념

  • ItemProcessor 들을 연결(Chaining)해서 위임하면 각 ItemProcessor 를 실행시킨다
    • 예를 들자면 3개의 ItemProcessor가 있고 각 ItemProcessor에 작업중인 객체를 전달해줘야 하는 상황이라 가정해보자
      • 1번 ItemProcessor는 문자열 전체를 대문자로 바꿔준다.
      • 2번 ItemProcessor는 UUID를 문자열에 더해준다.
      • 3번 ItemProcessor는 A를 B로 바꿔주는 ItemProcessor이다.
    • 이럴 경우 각 ItemProcessore는 작업중인 문자열을 반환해줘야한다. CompositeItemProcessor는 작업중인 객체를 전달하여 동작할 ItemProcessor를 chaining 형식으로 구성한다.
  • 이전 ItemProcessor 반환 값은 다음 ItemProcessor 값 으로 연결된다

API

CompositeItemProcessor의 흐름

CompositeItemProcessor의 핵심 메소드

  • CompositeItemProcessor의 핵심 메소드를 캡쳐한 사진이다.
  • Chaning으로 이루어진 ItemProcessor가 더해서 하나의 List로 이루어지는 과정이 좌측 사진이다.
  • 우측 사진은 현재 ItemProcessor가 동작한 뒤 result를 통해 다음 ItemProcessor로 진행되는 로직이 담겨있는 부분이다.

예제코드

package com.example.springbatch_10_1_compositeitemprocessor;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.*;
import org.springframework.batch.item.support.builder.CompositeItemProcessorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;

/**
 * packageName    : com.example.springbatch_10_1_compositeitemprocessor
 * fileName       : CompositionItemConfiguration
 * author         : namhyeop
 * date           : 2022/08/16
 * description    :
 * CompositeItem을 이루는 Configuration
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/16        namhyeop       최초 생성
 */
@RequiredArgsConstructor
@Configuration
public class CompositionItemConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<String,String>chunk(5)
                .reader(new ItemReader<String>() {
                    int i = 0;
                    @Override
                    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                        i++;
                        return i > 10 ? null : "item";
                    }
                })
                .processor(new ItemProcessor<String, String>() {
                    @Override
                    public String process(String s) throws Exception {
                        return null;
                    }
                })
                .processor(customItemProcessor())
                .writer(items -> {
                    System.out.println(items);
                    System.out.println("===================");
                    }
                )
                .build();
    }

    @Bean
    public ItemProcessor<? super String, String> customItemProcessor() {
        List itemProcessor = new ArrayList<>();
        itemProcessor.add(new CustomItemProcessor());
        itemProcessor.add(new CustomItemProcessor2());

        return new CompositeItemProcessorBuilder<>()
                .delegates(itemProcessor)
                .build();
    }
}
package com.example.springbatch_10_1_compositeitemprocessor;

import org.springframework.batch.item.ItemProcessor;

/**
 * packageName    : com.example.springbatch_10_1_compositeitemprocessor
 * fileName       : CustomItemProcessor
 * author         : namhyeop
 * date           : 2022/08/16
 * description    :
 * CompositeItemProcessor를 이루는 첫 번째 ItemProcessor
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/16        namhyeop       최초 생성
 */
public class CustomItemProcessor implements ItemProcessor<String, String> {

    int cnt = 0;

    @Override
    public String process(String item) throws Exception {
        cnt++;
        return (item + " this is process1 " + cnt);
    }
}
package com.example.springbatch_10_1_compositeitemprocessor;

import org.springframework.batch.item.ItemProcessor;

/**
 * packageName    : com.example.springbatch_10_1_compositeitemprocessor
 * fileName       : CustomItemProcessor
 * author         : namhyeop
 * date           : 2022/08/16
 * description    :
 * CompositeItemProcessor를 이루는 두 번째 ItemProcessor
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/16        namhyeop       최초 생성
 */
public class CustomItemProcessor2 implements ItemProcessor<String, String> {

    int cnt = 0;

    @Override
    public String process(String item) throws Exception {
        cnt++;
        return (item + " this is process2 " + cnt);
    }
}

ClassifierCompositeItemProcessor

기본개념

  • Classifier(분류자)로 라우팅 패턴을 구현해서 ItemProcessor 구현체 중에서 하나를 호출하는 역할을 한다

API

  • ClassFier<C, T> 에서 C는 Classifier를 의미하고 T는 Classifier를 통해 반환될 return 값을 의미한다.

ClassifierCompositeItemProcessor의 동작 흐름

  • 그림에서 OrderItemProcessor와 PayItemProcessor는 이해를 돕기 위해 만든 Processor 예제이다.
  • Classifier를 통해 return 값이 1일 경우에는 OrderItemProcessor가 반환되고 2가 반환될 경우는 PayItemProcessor가 반환되는것으로 이해하면 된다.
  • 최종적으로는 결과값을 ItemWriter에 전달하게된다.

예제코드

package com.example.springbatch_10_2_classifiercompositeitemprocessor;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.*;
import org.springframework.batch.item.support.ClassifierCompositeItemProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * packageName    : com.example.springbatch_10_2_classifiercompositeitemprocessor
 * fileName       : ClassfileConfiguration
 * author         : namhyeop
 * date           : 2022/08/16
 * description    :
 * ClassifierCompositeItemProcessor 예제, Classfier를 통해 3개의 ItemProcessor중 하나를 고르는 예제 코드
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/16        namhyeop       최초 생성
 */
@RequiredArgsConstructor
@Configuration
public class ClassfileConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }

    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .<ProcessorInfo, ProcessorInfo>chunk(10)
                .reader(new ItemReader<ProcessorInfo>() {

                    int i = 0;

                    @Override
                    public ProcessorInfo read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                        i++;
                        ProcessorInfo processorInfo = ProcessorInfo.builder().id(i).build();
                        return i > 3 ?  null : processorInfo;
                    }
                })
                .processor(customItemProcessor())
                .writer(items -> System.out.println(items))
                .build();
    }

    //classfier 정의
    @Bean
    public ItemProcessor<? super ProcessorInfo, ? extends ProcessorInfo> customItemProcessor() {
        ClassifierCompositeItemProcessor<ProcessorInfo, ProcessorInfo> processor = new ClassifierCompositeItemProcessor<>();

        //구분자 설정 과정
        ProcessorClassifier<ProcessorInfo, ItemProcessor<?, ? extends ProcessorInfo>> classifier = new ProcessorClassifier<>();
        Map<Integer, ItemProcessor<ProcessorInfo, ProcessorInfo>> processorMap = new HashMap<>();
        //반환값 1일 경우 CustomItmeProcessor1() 실행. 조회하는 로직은 ProcessorClassifier의 classfiy이 메소드를 통해 실행된다.
        processorMap.put(1, new CustomItemProcessor1());
        //반환값 2일 경우 CustomItmeProcessor2() 실행.
        processorMap.put(2, new CustomItemProcessor2());
        //반환값 3일 경우 CustomItmeProcessor3() 실행.
        processorMap.put(3, new CustomItemProcessor3());
        classifier.setProcessorMap(processorMap);

        //구분자를 ClassifierCompositeItemProcessor에 등록
        processor.setClassifier(classifier);

        return processor;
    }
}
package com.example.springbatch_10_2_classifiercompositeitemprocessor;

import org.apache.tomcat.jni.Proc;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.classify.Classifier;

import java.util.HashMap;
import java.util.Map;

/**
 * packageName    : com.example.springbatch_10_2_classifiercompositeitemprocessor
 * fileName       : ProcessorClassifier
 * author         : namhyeop
 * date           : 2022/08/16
 * description    :
 * Processor를 등록해주는 역할
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/16        namhyeop       최초 생성
 */
public class ProcessorClassifier<C, T> implements Classifier<C,T> {

    private Map<Integer, ItemProcessor<ProcessorInfo,ProcessorInfo>> processorMap = new HashMap<>();

    @Override
    public T classify(C classifiable) {
        return (T)processorMap.get(((ProcessorInfo)classifiable).getId());
    }

    public void setProcessorMap(Map<Integer, ItemProcessor<ProcessorInfo, ProcessorInfo>> processorMap) {
        this.processorMap = processorMap;
    }
}
package com.example.springbatch_10_2_classifiercompositeitemprocessor;

import lombok.Builder;
import lombok.Data;

/**
 * packageName    : com.example.springbatch_10_2_classifiercompositeitemprocessor
 * fileName       : ProcessorInfo
 * author         : namhyeop
 * date           : 2022/08/16
 * description    :
 * 조건에 따른 ItemProcessor를 선택하는 기준이 되는 Class
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/16        namhyeop       최초 생성
 */
@Data
@Builder
public class ProcessorInfo {
    private int id;
}
package com.example.springbatch_10_2_classifiercompositeitemprocessor;

import org.springframework.batch.item.ItemProcessor;

/**
 * packageName    : com.example.springbatch_10_2_classifiercompositeitemprocessor
 * fileName       : CustomItemProcessor1
 * author         : namhyeop
 * date           : 2022/08/16
 * description    :
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/16        namhyeop       최초 생성
 */
public class CustomItemProcessor1 implements ItemProcessor<ProcessorInfo, ProcessorInfo> {

    @Override
    public ProcessorInfo process(ProcessorInfo processorInfo) throws Exception {
        System.out.println("Checked CustomItemProcessor1");
        System.out.println("=============================");
        return processorInfo;
    }
}
package com.example.springbatch_10_2_classifiercompositeitemprocessor;

import org.springframework.batch.item.ItemProcessor;

/**
 * packageName    : com.example.springbatch_10_2_classifiercompositeitemprocessor
 * fileName       : CustomItemProcessor2
 * author         : namhyeop
 * date           : 2022/08/16
 * description    :
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/16        namhyeop       최초 생성
 */
public class CustomItemProcessor2 implements ItemProcessor<ProcessorInfo, ProcessorInfo> {

    @Override
    public ProcessorInfo process(ProcessorInfo processorInfo) throws Exception {
        System.out.println("Checked CustomItemProcessor2");
        System.out.println("=============================");
        return processorInfo;
    }
}
package com.example.springbatch_10_2_classifiercompositeitemprocessor;

import org.springframework.batch.item.ItemProcessor;

/**
 * packageName    : com.example.springbatch_10_2_classifiercompositeitemprocessor
 * fileName       : CustomItemProcessor3
 * author         : namhyeop
 * date           : 2022/08/16
 * description    :
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/16        namhyeop       최초 생성
 */
public class CustomItemProcessor3 implements ItemProcessor<ProcessorInfo, ProcessorInfo> {

    @Override
    public ProcessorInfo process(ProcessorInfo processorInfo) throws Exception {
        System.out.println("Checked CustomItemProcessor3");
        System.out.println("=============================");
        return processorInfo;
    }
}
반응형

'Spring Batch' 카테고리의 다른 글

9.Spring Batch의 Chunk와 ItemWriter  (0) 2024.11.08
8.Spring Batch의 Chunk와 ItemReader  (0) 2024.11.08
7.Spring Batch의 Chunk와 동작원리 살펴보기  (0) 2024.11.08
6.Spring Batch의 Flow  (1) 2024.11.07
5.Spring Batch의 Step  (0) 2024.11.07