본문 바로가기
dev/spring

Spring - Modulith - Working with Applicaton Events

by igooo 2024. 6. 21.
728x90

개요

Spring Modulith는 Spring Boot 애플리케이션에서 논리 모듈을 구련하는 개발자를 지원한다. 모듈을 문서화하고, 모듈에 대한 통합 테스트를 실행하고, 모듈 간 상호 작용을 관찰하고, 모듈 간의 느슨한 결합으로(loosely coupled) 상호 작용을 구현할 수 있도록 한다. 이전글 참고 (https://blog.igooo.org/118)

 

Working with Application Events

Spring Modulith로 구현된 모듈 사이에는 모듈에서 공개된 기능만(public method) 호출하여 사용할 수도 있지만 각 모듈은 서로 가능한 분리된 상태로 모듈 간 연동은 이벤트 게시 및 소비로 연동하여야 한다.

 

모노리스로 구현된 애플리케이션에서는 아래와 같은 코드를 볼 수 있다. Order를 처리하는 모듈에서 Inventory 업데이트가 필요한 경우 Invetory 패키지의 객체를 직접 사용하는 코드다 

@Service
public class OrderManagement {

	private final InventoryManagement inventory;

	@Transactional
	public void complete(Order order) {

		// State transition on the order aggregate go here

		// Invoke related functionality
		inventory.updateStockFor(order);
	}
}

 

Spring Modulith를 사용하면 아래와 같은 이벤트 기반으로 모듈간 연동을 변경할 수 있다.

@Service
public class OrderManagement {

	private final ApplicationEventPublisher events;
	private final OrderInternal dependency;

	@Transactional
	public void complete(Order order) {

		// State transition on the order aggregate go here

		events.publishEvent(new OrderCompleted(order.getId()));
	}
}

다른 모듈의 Spring Bean에(InvetoryManagement) 의존하는 대신 ApplicatonEventPublisher를 사용하여 이벤트를 개시하고 해당 이벤트를 소비하는 부분을 작성하여 이벤트를 소비할 수 있다.

이벤트 방식으로 변경하는 변경하여 작성하는 경우 아래 항목들에 대하여 고민이 필요하다.

  • 이벤트 게시는 기본적으로 동기적으로 발생한다.
    • 위 코드에서 주문의 상태 변경과 (OrderManagement.complete) 재고 업데이트는 모두 성공하거나 둘 중 하나도 성공하지 못함
  • 동기적으로 처리되는 경우 매우 간단한 일관성 모델을 얻을 수 있다.
  • 이벤트로 처리해야하는 관련 기능이 많이 트리거 된다면 트랜젝션의 경계가 넓어진다는 단점도 가지고 있다.
    • 필수로 처리되지 않아야 하는 기능 때문에 잠재적으로 전체 트랜젝션이 실패할 수 있다.

 

위 고려사항에 대한 다른 접근 방식은 이벤트 소비를 트랜잭션 커밋 시 비동기 처리하고 보조 기능을 아래와 같이 처리하는 것이다.

@Service
class InventoryManagement {

	@Async
	@TransactionalEventListener
	void on(OrderCompleted event) { /* … */ }
}

 

Application Module Listener

트랜잭션 자체에서 트랜잭션 이벤트 리스너를 실행하려면 @Transacrional로 주석을 달아야 한다.

@Component
class InventoryManagement {

	@Async
	@Transactional(propagation = Propagation.REQUIRES_NEW)
	@TransactionalEventListener
	void on(OrderCompleted event) { /* … */ }
}

하지만 Spring Modulith에서는 위 선언을 쉽게 하기 위해서 @ApplicationModuleListener을 제공한다.

@Component
class InventoryManagement {

	@ApplicationModuleListener
	void on(OrderCompleted event) { /* … */ }
}

 

The Event Publication Registry

Spring Modulith는 이벤트가 게시될 때 이벤트를 전달할 트랜잭션 이벤트 리스너에 대하여 알아내고 각 항목(진한 파란색)에 대한 항목을 원래 비즈니스 트랜잭션의 일부로 이벤트 게시 로그에 기록한다. 각 트랜잭션 이벤트는 이벤트 리스너 실행이 성공하는 경우 로그 항목을 완료로 표시하고, 실패하는 항목은 재시도하거나, 완료되지 않는 이벤트 게시는 애플리케이션이 다시 시작될 때 실행된다.

 

Managing Event Publications

dependencies {
	implementation 'org.springframework.modulith:spring-modulith-events-api:1.1.5'
}

spring-modulith-events-api를 사용하며 아래 두 개의 인터페이스를 사용하여 이벤트를 처리할 수 있다.

 

  • CompletedEventPublications : 완료된 이벤트에 대하여 접근 가능하다.
  • IncompleteEventPublications : 완료되지 않은 이벤트에 대하여 접근 가능하고, 이벤트 게시를 다시 할 수 있다.

 

Event Publication Repositories

실제로 이벤트 게시 로그를 작성하기 위해서는 EventPublicationRepository JPA, JDBC 및 MongoDB와 같이 트랜젝션을 지원하는 저장소를 하용하여 이벤트 게시 로그를 저장할 수 있다.

JDBC를 사용하는 경우 spring.modulith.events.jdbc.schema-initialization.enabled 설정을 사용하여 테이블을 생성할 수 있다.

 

 

 

Example

주문을(Order) 저장하고, 저장한 주문에 추가된 아이템 정보가 있으면 재고를(Inventory) 업데이트 하는 간단한 프로그램을 작성한다. (Dependencies : spring-boot-starter-data-jdbc, spring-boot-starter-web, spring-modulith-starter-core, spring-modulith-starter-jdbc, org.hsqldb:hsqldb)

package org.igooo.service.order;

@RestController
@RequestMapping("/orders")
class OrderController {
	private Logger logger = LoggerFactory.getLogger(OrderController.class);

	private final OrderRepository orderRepository;

	OrderController(OrderRepository orderRepository) {
		this.orderRepository = orderRepository;
	}

	@PostMapping
	void create() {
		Order order = new Order(null, Set.of(new OrderItem(null, 1), new OrderItem(null, 2)));
		var saved = this.orderRepository.save(order);

		logger.info("Saved {}", saved);
	}

}

@Repository
interface OrderRepository extends CrudRepository<Order, Long> {}

@Table("ORDERS")
record Order(@Id Long id, Set<OrderItem> items) {}

@Table("ORDERS_ITEMS")
record OrderItem(@Id Long id, int product) {}

 

API 호출결과

2024-06-21T00:39:46.366+09:00  INFO 10084[service] [nio-8080-exec-1]o.igooo.service.order.OrderController Saved Order[id=1, items=[OrderItem[id=1, product=2], OrderItem[id=2, product=1]]]

 

 

이벤트 개시, 소비

Inventory 관련 파일은 inventory 패키지 하위로 생성하고, OrderController에서는 publisher를 사용하여 이벤트를 발행한다.

package org.igooo.service.inventory;

// 이벤트
public record InventoryUpdatedEvent(int product) {}

// Listner
@Service
class InventoryService {
	private static final Logger logger = LoggerFactory.getLogger(InventoryService.class);

	@EventListener
	void onInventoryUpdatedEvent(InventoryUpdatedEvent event) {
		logger.info("Updated {}", event);
	}
}


package org.igooo.service.order;

@RestController
@RequestMapping("/orders")
class OrderController {
	private Logger logger = LoggerFactory.getLogger(OrderController.class);

	private final OrderRepository orderRepository;
	private final ApplicationEventPublisher publisher;

	OrderController(OrderRepository orderRepository, ApplicationEventPublisher publisher) {
		this.orderRepository = orderRepository;
		this.publisher = publisher;
	}

	@PostMapping
	void create(Order order) {
		Order order = new Order(null, Set.of(new OrderItem(null, 1), new OrderItem(null, 2)));
		var saved = this.orderRepository.save(order);

		logger.info("Saved {}", saved);
		// pulisher로 이벤트 발행
		saved.items().forEach((item) -> this.publisher.publishEvent(new InventoryUpdatedEvent(item.product())));
	}

}

 

API 호출 결과

2024-06-21T00:51:36.529+09:00  INFO 12244[service] [nio-8080-exec-2] o.igooo.service.order.OrderController Saved Order[id=1, items=[OrderItem[id=1, product=2], OrderItem[id=2, product=1]]]
2024-06-21T00:51:36.531+09:00  INFO 12244[service] [nio-8080-exec-2] o.i.service.inventory.InventoryService Updated InventoryUpdatedEvent[product=2]
2024-06-21T00:51:36.531+09:00  INFO 12244[service] [nio-8080-exec-2] o.i.service.inventory.InventoryService Updated InventoryUpdatedEvent[product=1]

 

 

비동기로 소비하기

@Async 어노테이션을 추가하여 비동기로 이벤트를 처리한다.

@Service
class InventoryService {
	private static final Logger logger = LoggerFactory.getLogger(InventoryService.class);

	@Async
	@EventListener
	void onInventoryUpdatedEvent(InventoryUpdatedEvent event) throws InterruptedException {
		TimeUnit.SECONDS.sleep(5L);
		logger.info("Updated {}", event);
	}

}

API 호출 결과

2024-06-21T00:53:53.529+09:00  INFO 12244[service] [nio-8080-exec-2] o.igooo.service.order.OrderController Saved Order[id=1, items=[OrderItem[id=1, product=2], OrderItem[id=2, product=1]]]
2024-06-21T00:53:58.531+09:00  INFO 12244[service] [         task-1] o.i.service.inventory.InventoryService Updated InventoryUpdatedEvent[product=2]
2024-06-21T00:53:58.531+09:00  INFO 12244[service] [         task-2] o.i.service.inventory.InventoryService Updated InventoryUpdatedEvent[product=1]

이벤트 소비는 다른 thread를(task-1, task-2) 사용하여 5초뒤에 실행된 것을 볼 수 있다.

 

트랜잭션을 사용하여 이벤트 개시하기

OrderController와 InventoryService에 모두 @Transaction 어노테이션을 추가해준다.

이벤트 게시, 소비 모두 트랜잭션을 처리가 되어야 이벤트가 정상적으로 처리된다.
package org.igooo.service.inventory;

@Service
@Transactional
class InventoryService {
	private static final Logger logger = LoggerFactory.getLogger(InventoryService.class);

	@ApplicationModuleListener
	void onInventoryUpdatedEvent(InventoryUpdatedEvent event) {
		logger.info("Updated {}", event);
	}
}


package org.igooo.service.order;

@RestController
@RequestMapping("/orders")
@Transactional
class OrderController {
	private Logger logger = LoggerFactory.getLogger(OrderController.class);

	private final OrderRepository orderRepository;
	private final ApplicationEventPublisher publisher;

	OrderController(OrderRepository orderRepository, ApplicationEventPublisher publisher) {
		this.orderRepository = orderRepository;
		this.publisher = publisher;
	}

	@PostMapping
	void create(Order order) {
		Order order = new Order(null, Set.of(new OrderItem(null, 1), new OrderItem(null, 2)));
		var saved = this.orderRepository.save(order);

		logger.info("Saved {}", saved);
		// pulisher로 이벤트 발행
		saved.items().forEach((item) -> this.publisher.publishEvent(new InventoryUpdatedEvent(item.product())));
	}

}

# application.properties
spring.modulith.events.jdbc.schema-initialization.enabled=true

API 호출 결과

2024-06-21T00:53:53.529+09:00  INFO 12244[service] [nio-8080-exec-2] o.igooo.service.order.OrderController Saved Order[id=1, items=[OrderItem[id=1, product=2], OrderItem[id=2, product=1]]]
2024-06-21T00:53:58.531+09:00  INFO 12244[service] [         task-1] o.i.service.inventory.InventoryService Updated InventoryUpdatedEvent[product=2]
2024-06-21T00:53:58.531+09:00  INFO 12244[service] [         task-2] o.i.service.inventory.InventoryService Updated InventoryUpdatedEvent[product=1]

 

소비되지 않은 이벤트 처리하기

이벤트를 소비하기전에 애플리케이션이 종료되는 경우 아래와 같이 로그가 나온다.

Shutting down with the following publications left unfinished:
 ├─ org.igooo.service.inventory.InventoryUpdatedEvent - org.igooo.service.inventory.InventoryService.onInventoryUpdatedEvent(org.igooo.service.inventory.InventoryUpdatedEvent)
  └─ org.igooo.service.inventory.InventoryUpdatedEvent - org.igooo.service.inventory.InventoryService.onInventoryUpdatedEvent(org.igooo.service.inventory.InventoryUpdatedEvent)

 

소비되지 않은 이벤트에 대하여 처리하는 방법은 위에서 알아봤지만 간당히 설정을 추가하면 애플리케이션이 재 실행되면서 이벤트가 자동으로 발행된다.

# application.properties
spring.modulith.republish-outstanding-events-on-restart=true

 

 

참고