잘 정리해보자

Spring Batch - Tasklet, Chunk 간단 비교 본문

Spring/Spring Boot

Spring Batch - Tasklet, Chunk 간단 비교

토마토오이 2022. 1. 13. 00:10

Step에서 데이터를 처리하는 방식은 Tasklet과 chunksize 가 있다.

 

 

Tasklet

: 데이터 처리과정이 tasklet안에서 한번에 이뤄진다.

배치 처리과정이 쉬운 경우 쉽게 사용되며, 대량처리 경우 더 복잡해질 수 있다.

 

Chunksize

: chunksize 단위로 데이터가 페이징처럼 처리된다.

대용량 데이터를 처리할때 사용되며, reader / processor / writer 로 구분되어 처리된다.

(reader와 writer는 필수이며, processor는 사용안해도 된다.)

 

- reader

: (파일/DB) 데이터(item)를 읽어오며, reader안에서도 페이징처리가 가능한 bean들이 있다.

ItemReader, MybatisPagingItemReader, JpaPagingItemReader, ...

 

- processor

: reader에서 읽어온 데이터(item)를 필터/처리 하고 write로 보내는 역할.

item을 필터 도중 null로 리턴하면, 그 item은 write로 전달되지 못한다.

 

ex) 10개 read 후 processor에서 4개 필터링 해서 6개만 리턴하면 write에서는 6개만 처리한다.

 

- writer

: processor에서 처리된 데이터들(items : List<item>)을 파일이나 DB에 적재하는 역할.

writer는 기본으로 List단위로 처리되며, List는 chunksize에 의해 처리된다.

 

 

 

 

 

1. Tasklet 테스트

private String JOBNAME = "testTaskChunkJob";

    @Bean
    public Job testTaskChunkJob(){
        return jobBuilderFactory.get(JOBNAME)
                .incrementer(new RunIdIncrementer())
                .start(this.taskStep1())
                .build();
    }

    @Bean
    public Step taskStep1(){
        return stepBuilderFactory.get("taskStep1")
                .tasklet(tasklet())
                .build();
    }

    private Tasklet tasklet(){ //tasklet으로 모두 처리
        return (contribution, chunkContext) -> {
            List<String> items = getItems();
            
            log.info("items : "+ items.toString());

            return RepeatStatus.FINISHED;
        };
    }
    private List<String> getItems() {
        List<String> items = new ArrayList<>();

        for (int i = 0; i < 100; i++) {
            items.add(i + " test!");
        }

        return items;
    }

단순하게 100개의 숫자를 가져와 log를 출력하는 tasklet이다.

 

tasklet은 함수안에서 모두 처리되서 1~100까지 모두 출력된다.

 

결과 :

items : [0 test!, 1 test!, 2 test!, 3 test!, ... 99 test!]

 

 

 

2. chunksize로 처리 테스트

private String JOBNAME = "testTaskChunkJob";

    @Bean
    public Job testTaskChunkJob(){
        return jobBuilderFactory.get(JOBNAME)
                .incrementer(new RunIdIncrementer())
                .start(this.taskStep1()) //tasklet 처리
                .next(this.chunkStep1()) //chunk 처리
                .build();
    }

// ...

    @Bean
    public Step chunkStep1() { //chunksize로 처리
        return stepBuilderFactory.get("chunkStep1")
                .<String,String>chunk(10)
                .reader(itemReader())
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();
    }

    private ItemReader<String> itemReader() {
        return new ListItemReader<>(getItems());
    }
    
    private ItemProcessor<String, String> itemProcessor() {
        return item -> item + " now processor!!";
    }

    private ItemWriter<String> itemWriter() {
        return items -> log.info("### writer : " + items.toString());
    }

 

chunksize로 처리 시, Step에 .chunksize(크기) 로 정의하며 앞에 처리될 타입을 정의한다.

.<String,String>chunk(10)

<String,String>

앞의 Stringreader에서 읽은 데이터의 타입이고

뒤의 Stringwriter에서 받을 데이터의 타입이다.

 

따라서 processor 처리 함수에서는 앞/뒤 타입을 모두 선언해서 필터나 데이터 변형 처리가 가능하다.

 

 

- reader

private ItemReader<String> itemReader() {
    return new ListItemReader<>(getItems());
}

reader에서 리턴으로 선언한 ListItemReader는 spring batch에서 제공하는 item처리 클래스이다.

getItems에서 가져온 List<String>을 LIstItemReader에서 알아서 item반환해준다.

 

 

- processor

private ItemProcessor<String, String> itemProcessor() {
    return item -> item + " now processor!!";
}

processor에서는 앞서 말한것과 같이 <reader에서 읽은 타입, writer에 보낼 타입> 으로 정의해 처리한다.

지금은 그냥 읽은 item에 문자열만 더해서 처리.

 

 

- writer

private ItemWriter<String> itemWriter() {
    return items -> log.info("### writer : " + items.toString());
}

writer에서 <String>이지만, List<String>단위로 처리되며, 

items를 받아 log를 출력하면 chunksize=10개씩 읽은 String이 List로 출력된다.

 

 

 

결과 :

writer : [0 test! now processor!!, 1 test! now processor!!, ... 9 test! now processor!!]
writer : [10 test! now processor!!, 11 test! now processor!!, ... 19 test! now processor!!]
...
writer : [90 test! now processor!!, 91 test! now processor!!, ... 99 test! now processor!!]

 

log를 총 10개 출력했으며, 각 로그는 items를 출력한다.

 

reader에서 10개씩 읽고 List<String> 으로 items 가 writer에서 chunksize=10만큼 처리된다.

 

 

 

 

 

3. Tasklet으로 페이징 처리 테스트

    
    @Bean
    public Job testTaskChunkJob(){
        return jobBuilderFactory.get(JOBNAME)
                .incrementer(new RunIdIncrementer())
                .start(this.taskStep1())
                .next(this.chunkStep1())
                .next(this.taskPagingStep1()) //tasklet으로 페이징 처리
                .build();
    }
    
//...
    
    private Tasklet pagingTasklet() {
        List<String> items = getItems();

        return (contribution, chunkContext) -> {
            //stepexecution : 읽은 item을 저장
            StepExecution stepExecution = contribution.getStepExecution();
            int chunksize = 10;
            int readCnt = stepExecution.getReadCount();
            int idx = readCnt + chunksize;

            if(idx > items.size()){
                return RepeatStatus.FINISHED;
            }

            //sublist : list 중간 데이터 읽기
            List<String> sublist = items.subList(readCnt,idx);

            log.info("### sublist size : " + sublist.toString());
            stepExecution.setReadCount(idx); //read count 읽은만큼 다시 저장

            return RepeatStatus.CONTINUABLE;
        };
    }

 

tasklet의 contributution에서 StepExecution 으로 step이 얼마나 처리되고 있는지의 정보를 알 수 있다.

(chunksize를 10으로 정의하고 readCount로 페이징을 처리)

 

 

tasklet으로 chunksize=10으로 정해 처리할 수 있지만, 소스가 길어지고 복잡해짐을 볼 수 있다.

 

 

결과 :

sublist size : [0 test!, 1 test!, 2 test!, ... 9 test!]
sublist size : [10 test!, 11 test!, 12 test!, ... 19 test!]
...
sublist size : [90 test!, 91 test!, 92 test!, ... 99 test!]

 

 

 

 

Comments