개요
Spring Batch 데이터 수집 중 보통은 ItemReader에서 데이터를 조작하는 작업을 하지 않지만 특수한 케이스에서 페이징 한 데이터를 기반으로 추가로 데이터를 수집해야 하는 경우(join이 불가능하거나...) AbstractPagingItemReader를 구현한 객체를 사용하는데 이때 주의해서 사용할 점에 대하여 정리한다.
PagingItemReader 동작 방식
AbstractPagingItemReader의 맴벼 변수(protected volaatitle List<T> results)는 AbstractPagingItemReader의 doRead() 메서드에서 AbstractPagingItemReader를 상속하는 구현체의 doReadPage()의 호출 결과로 값이 정해진다.
PagingItemReader 수집 로직
- ItemReader에 doRead()를 통해 조회한 데이터를 하니씩 조회한다.
- doRead()에서는 doReadPage() 메서드를 통해 페이징한 데이터를 가져온다
- result에 데이터가 있으면 result에서 데이터를 하나씩 꺼내서 가져온다.
AbstractItemCountingItemStreamItemReader
@Nullable
@Override
public T read() throws Exception {
if (currentItemCount >= maxItemCount) {
return null;
}
currentItemCount++;
T item = doRead();
if (item instanceof ItemCountAware) {
((ItemCountAware) item).setItemCount(currentItemCount);
}
return item;
}
AbstractPagingItemReader
@Nullable
@Override
protected T doRead() throws Exception {
this.lock.lock();
try {
if (results == null || current >= pageSize) {
if (logger.isDebugEnabled()) {
logger.debug("Reading page " + getPage());
}
doReadPage();
page++;
if (current >= pageSize) {
current = 0;
}
}
int next = current++;
if (next < results.size()) {
return results.get(next);
}
else {
return null;
}
}
finally {
this.lock.unlock();
}
}
JdbcPagingItemReader
@Override
protected void doReadPage() {
if (results == null) {
results = new CopyOnWriteArrayList<>();
}
else {
results.clear();
}
PagingRowMapper rowCallback = new PagingRowMapper();
List<T> query;
if (getPage() == 0) {
if (logger.isDebugEnabled()) {
logger.debug("SQL used for reading first page: [" + firstPageSql + "]");
}
if (parameterValues != null && parameterValues.size() > 0) {
if (this.queryProvider.isUsingNamedParameters()) {
query = namedParameterJdbcTemplate.query(firstPageSql, getParameterMap(parameterValues, null),
rowCallback);
}
else {
query = getJdbcTemplate().query(firstPageSql, rowCallback,
getParameterList(parameterValues, null).toArray());
}
}
else {
query = getJdbcTemplate().query(firstPageSql, rowCallback);
}
}
else if (startAfterValues != null) {
previousStartAfterValues = startAfterValues;
if (logger.isDebugEnabled()) {
logger.debug("SQL used for reading remaining pages: [" + remainingPagesSql + "]");
}
if (this.queryProvider.isUsingNamedParameters()) {
query = namedParameterJdbcTemplate.query(remainingPagesSql,
getParameterMap(parameterValues, startAfterValues), rowCallback);
}
else {
query = getJdbcTemplate().query(remainingPagesSql, rowCallback,
getParameterList(parameterValues, startAfterValues).toArray());
}
}
else {
query = Collections.emptyList();
}
results.addAll(query);
}
문제 상황
DB 구조로 인하여 Join이 불가능한 상황에서 Processer에서 데이터를 조회하는 경우 1건 단위로 조회를 해야 하기 때문에 성능 이슈로 ItemReader에서 데이터를 페이징 단위로 조회하고, 조회된 데이터의 id 기반으로 다른 DB에서 다시 조회하여 머지하는 코드를 작성하였는데 이때 result 변수에 데이터를 변경하는 경우 문제가 발생할 수 있음
문제의 코드
@Override
protected void doReadPage() {
super.doReadPage();
if (this.result.isEmpty()) {
return;
}
this.result.forEach((item) -> {
preocessMergeExtraData(item);
// 추가 데이터를 조회한 후 추가 데이터가 없는 경우 데이터를 제외
if(item.containsKey("extra_key")) {
this.result.remove(item); // 문제가 되는 코드
return
}
};
문제 원인
아래는 AbstractPagingItemReader의 doRead() 메서드다 current 변수는 paging한 데이터의 현재 위치고 pageSize는 페이지의 사이즈를 저장한다. 문제의 코드에서 데이터를 머지 후 processor로 전달하기 전 유요하지 않는 데이터를 제외하기 위해 result의 데이터를 삭제하게 되면 아래 코드의 current >= pageSize, next < result.size() 코드에 의하여 해당 페이지가 마지막 페이지로 판단할 수 있다.
(!) 페이징으로 데이터를 조회하는 경우 이후 데이터가 남아 있는 경우 조회된 페이지의 사이즈는 PageSize와 항상 같아야 한다. (적으면 마지막 페이지)
@Nullable
@Override
protected T doRead() throws Exception {
this.lock.lock();
try {
if (results == null || current >= pageSize) {
if (logger.isDebugEnabled()) {
logger.debug("Reading page " + getPage());
}
doReadPage();
page++;
if (current >= pageSize) {
current = 0;
}
}
int next = current++;
if (next < results.size()) {
return results.get(next);
}
else {
return null;
}
}
finally {
this.lock.unlock();
}
}
성능을 위해 고려된 코드로 인하여 의도치 않게 전체 데이터를 조회하지 못하는 경우가 발생 할 수 있다.
'dev > spring' 카테고리의 다른 글
Spring Boot + vue, react 환경 구성 (1) | 2024.09.05 |
---|---|
[Spring] Data Commons Auditing MongoDB (0) | 2024.08.26 |
[Spring Batch] 5.1.2 Features (0) | 2024.08.21 |
[Spring] Flyway DB Migration (0) | 2024.08.05 |
Java JVM - Checkpoint Restore (CRaC) (0) | 2024.07.20 |