잘 정리해보자
Spring Batch - Tasklet, Chunk 간단 비교 본문
Step에서 데이터를 처리하는 방식은 Tasklet과 chunksize 가 있다.
Tasklet
: 데이터 처리과정이 tasklet안에서 한번에 이뤄진다.
배치 처리과정이 쉬운 경우 쉽게 사용되며, 대량처리 경우 더 복잡해질 수 있다.
Chunksize
: chunksize 단위로 데이터가 페이징처럼 처리된다.
대용량 데이터를 처리할때 사용되며, reader / processor / writer 로 구분되어 처리된다.
(reader와 writer는 필수이며, processor는 사용안해도 된다.)
- reader
: (파일/DB) 데이터(item)를 읽어오며, reader안에서도 페이징처리가 가능한 bean들이 있다.
ItemReader, MybatisPagingItemReader, JpaPagingItemReader, ...
- processor
: reader에서 읽어온 데이터(item)를 필터/처리 하고 write로 보내는 역할.
item을 필터 도중 null로 리턴하면, 그 item은 write로 전달되지 못한다.
ex) 10개 read 후 processor에서 4개 필터링 해서 6개만 리턴하면 write에서는 6개만 처리한다.
- writer
: processor에서 처리된 데이터들(items : List<item>)을 파일이나 DB에 적재하는 역할.
writer는 기본으로 List단위로 처리되며, List는 chunksize에 의해 처리된다.
1. Tasklet 테스트
private String JOBNAME = "testTaskChunkJob";
@Bean
public Job testTaskChunkJob(){
return jobBuilderFactory.get(JOBNAME)
.incrementer(new RunIdIncrementer())
.start(this.taskStep1())
.build();
}
@Bean
public Step taskStep1(){
return stepBuilderFactory.get("taskStep1")
.tasklet(tasklet())
.build();
}
private Tasklet tasklet(){ //tasklet으로 모두 처리
return (contribution, chunkContext) -> {
List<String> items = getItems();
log.info("items : "+ items.toString());
return RepeatStatus.FINISHED;
};
}
private List<String> getItems() {
List<String> items = new ArrayList<>();
for (int i = 0; i < 100; i++) {
items.add(i + " test!");
}
return items;
}
단순하게 100개의 숫자를 가져와 log를 출력하는 tasklet이다.
tasklet은 함수안에서 모두 처리되서 1~100까지 모두 출력된다.
결과 :
items : [0 test!, 1 test!, 2 test!, 3 test!, ... 99 test!]
2. chunksize로 처리 테스트
private String JOBNAME = "testTaskChunkJob";
@Bean
public Job testTaskChunkJob(){
return jobBuilderFactory.get(JOBNAME)
.incrementer(new RunIdIncrementer())
.start(this.taskStep1()) //tasklet 처리
.next(this.chunkStep1()) //chunk 처리
.build();
}
// ...
@Bean
public Step chunkStep1() { //chunksize로 처리
return stepBuilderFactory.get("chunkStep1")
.<String,String>chunk(10)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
private ItemReader<String> itemReader() {
return new ListItemReader<>(getItems());
}
private ItemProcessor<String, String> itemProcessor() {
return item -> item + " now processor!!";
}
private ItemWriter<String> itemWriter() {
return items -> log.info("### writer : " + items.toString());
}
chunksize로 처리 시, Step에 .chunksize(크기) 로 정의하며 앞에 처리될 타입을 정의한다.
.<String,String>chunk(10)
<String,String>
앞의 String은 reader에서 읽은 데이터의 타입이고
뒤의 String은 writer에서 받을 데이터의 타입이다.
따라서 processor 처리 함수에서는 앞/뒤 타입을 모두 선언해서 필터나 데이터 변형 처리가 가능하다.
- reader
private ItemReader<String> itemReader() {
return new ListItemReader<>(getItems());
}
reader에서 리턴으로 선언한 ListItemReader는 spring batch에서 제공하는 item처리 클래스이다.
getItems에서 가져온 List<String>을 LIstItemReader에서 알아서 item반환해준다.
- processor
private ItemProcessor<String, String> itemProcessor() {
return item -> item + " now processor!!";
}
processor에서는 앞서 말한것과 같이 <reader에서 읽은 타입, writer에 보낼 타입> 으로 정의해 처리한다.
지금은 그냥 읽은 item에 문자열만 더해서 처리.
- writer
private ItemWriter<String> itemWriter() {
return items -> log.info("### writer : " + items.toString());
}
writer에서 <String>이지만, List<String>단위로 처리되며,
items를 받아 log를 출력하면 chunksize=10개씩 읽은 String이 List로 출력된다.
결과 :
writer : [0 test! now processor!!, 1 test! now processor!!, ... 9 test! now processor!!]
writer : [10 test! now processor!!, 11 test! now processor!!, ... 19 test! now processor!!]
...
writer : [90 test! now processor!!, 91 test! now processor!!, ... 99 test! now processor!!]
log를 총 10개 출력했으며, 각 로그는 items를 출력한다.
reader에서 10개씩 읽고 List<String> 으로 items 가 writer에서 chunksize=10만큼 처리된다.
3. Tasklet으로 페이징 처리 테스트
@Bean
public Job testTaskChunkJob(){
return jobBuilderFactory.get(JOBNAME)
.incrementer(new RunIdIncrementer())
.start(this.taskStep1())
.next(this.chunkStep1())
.next(this.taskPagingStep1()) //tasklet으로 페이징 처리
.build();
}
//...
private Tasklet pagingTasklet() {
List<String> items = getItems();
return (contribution, chunkContext) -> {
//stepexecution : 읽은 item을 저장
StepExecution stepExecution = contribution.getStepExecution();
int chunksize = 10;
int readCnt = stepExecution.getReadCount();
int idx = readCnt + chunksize;
if(idx > items.size()){
return RepeatStatus.FINISHED;
}
//sublist : list 중간 데이터 읽기
List<String> sublist = items.subList(readCnt,idx);
log.info("### sublist size : " + sublist.toString());
stepExecution.setReadCount(idx); //read count 읽은만큼 다시 저장
return RepeatStatus.CONTINUABLE;
};
}
tasklet의 contributution에서 StepExecution 으로 step이 얼마나 처리되고 있는지의 정보를 알 수 있다.
(chunksize를 10으로 정의하고 readCount로 페이징을 처리)
tasklet으로 chunksize=10으로 정해 처리할 수 있지만, 소스가 길어지고 복잡해짐을 볼 수 있다.
결과 :
sublist size : [0 test!, 1 test!, 2 test!, ... 9 test!]
sublist size : [10 test!, 11 test!, 12 test!, ... 19 test!]
...
sublist size : [90 test!, 91 test!, 92 test!, ... 99 test!]