본문 바로가기
dev/java

Java 23 : Structured Concurrency

by igooo 2024. 9. 28.
728x90

개요

Structured Concurrency는 서로 다른 스레드에서 실행되는 관련된 작업 그룹을 하나의 작업 단위로 처리하여 오류 처리와 작업 취소를 간소화하고, 안정성을 향상하며, 관찰성을 강화하는 방식이다. 

Java에서는 Structured concurrency를 위한 API를 도입하여 동시성 프로그래밍을 간소화함을 목표로 한다.

 

Kotlin에서는 코루틴 구문을 사용하여 Structured Concurrency(구조적 동시성)을 지원하지만 자바에서는 아직 Preview 상태이고 Java 23에서도 아직 Preview 상태다. 하지만 이후 버전에서는 정식 기능으로 나오기를 희망한다.

 

Motivation

개발자는 task를 여러 개의 subtask로 나눠서 복잡성을 관리한다. 일반적인 단일 스레드 코드에서는 subtask는 순차적으로 실행된다. 하지만 subtask가 서로 충분히 독립적이고 충분한 하드웨어 리소스가 있는 경우 subtask를 동시에 실행하여 전체 작업을 더 빠르게(낮은 대기 시간으로) 실행할 수 있다. 예를 들어, 여러 I/O 작업이 필요한 경우 I/O 작업을 여러 스레드에서 동시에 실행하면 더 빠르게 처리된다. Virtual Thread를 사용하여 이러한 I/O 작업에 대하여 대하여 대응하면 매우 효율 적이지만 결과적으로 발생할 수 있는 엄청난 수의 스레드를 관리하는 것은 여전히 어려운 일이다.

Single Thread, ExecutorService, Structured Concurrency를 사용하는 코드를 각각 살펴보면서 Structured Concurrency가 가지는 장점에 대하여 알아본다. 

 

Single Thread

Response handle() throws IOException {
    String theUser = findUser();
    int theOrder = fetchOrder();
    return new Response(theUser, theOrder);
}

단일 스레드 버전의 handle()에서는 task-subtask의 관계가 구문 구조에서 분명하게 드러난다.

위 코드에서는 subTask인 fetchOrder()는 findUser()가 완료되기 전까지 시작되지 않는다. 만약 findUser()가 실패하면 fetchOrder()는 실행되지 않고 handle() 태스크는 암묵적으로 실패하게 된다. subTask가 부모로만 돌아갈 수 있다는 사실은 중요하다. 부모 작업이 한 하위 작업의 실패를 암묵적으로 처리하여 완료되지 않는 다른 하위 작업을 취소하고 스스로 실패할 수 있음을 의미한다.

 

Unstructured concurrency with ExecutorService

Response handle() throws ExecutionException, InterruptedException {
    Future<String>  user  = esvc.submit(() -> findUser());
    Future<Integer> order = esvc.submit(() -> fetchOrder());
    String theUser  = user.get();   // Join findUser     
    int    theOrder = order.get();  // Join fetchOrder     
    return new Response(theUser, theOrder);
}

handle() 메서드는 두 개의 하위 작업을 ExecutorService에 submit 하여 하위 작업을 처리한다. findUser(), fetchOrder()는 handle() 메서드의 하위 작업으로 각 메서도 호출 시 Future를 바로 반환하고 Executor 스켈줄링 정책에 따라 하위 작업을 동시에 실행한다. handle() 메서드는 Future의 .get() 메서드를 통해 두 개의 하위 작업이 완료되기를 기다리고 이를 조인(join) 한다고 한다.

하위 작업은(subtask) 동시에 실행되기 때문에 각 하위 작업은 독립적으로 성공하거나 실패할 수 있다. handle()의 하위 작업 중 하나가 실패하면 스레드의 수명을 이해하는 것은 놀라울 정도로 복잡해질 수 있다.

  • findUser()가 Exception을 발생시키면 handle()는 호출한 사용자에게 exception을 던진다. 하지만 fetchOrder()는 자체 스레드에서 계속 실행된다. 이는 스레드 누수로(thread leak) 시스템 리소스를 낭비하거나 최악의 경우 fetchOder() 스레드가 다른 작업을 방해할 수 있다.
  • 실행 중인 handle()가 중지되면 하위 작업에 hadler()의 중지가 전파되지 않는다. findUser(), fetchOrder()는 스레드 누수가 발생하고, handle()가 실패한 후에도 계속 실행된다.
  • findUser()는 실행에 오랜 시간이 걸리지만, fetchOrder()는 그 사이에 실패한다면 handle()는 findUser()의 Future를 취소하는 대신 findUser()가 완료되기를 기다린다. findUser()가 완료되어 .get()을 호출해야 예외가 발생하고 handle()는 실패하게 된다.

각각의 경우에서 문제는 프로그램이 작업 - 하위 작업(task - subtask) 관계로 논리적으로 구조화되어 있지만, 이러한 관계는 개발자의 마음속에만 존재한다는 것이다. 또한 위와 같은 코드는 오류를 진단하고 문제를 해결하는 것을 더 어렵게 만든다. 예를 들어 스레드 덤프를 확인해 봐도 스레드의 호출 스택에서는 3개의(handle(), findUser(), fetchOrder()) 스레드가 표현되지만 task - subtask 관계에 대한 힌트는 찾을 수 없다.

ExecutorService와 Future는 위와 같은 비구조적 사용을 허용하기 때문에 작업과 하위 작업 간의 관계를 강제하거나 추적하지 못하고, 이러한 관례는 일반적인 경우에 유용하기 때문이다. 그러므로 개발자는 작업과 하위 작업 간의 라이프 사이클과 관계를 수동으로 관리하도록 요청하는 대신 구조적 동시성을 사용하여 안정적으로 자동화하려고 한다.

 

Structured Concurrency

Structured Concurrency는 작업과 하위 작업 사이에 자연스러운 관계를 보전하는 concurrent programming 방식으로 읽기 쉽고, 유지보수하기 쉽고, 신뢰할 수 있는 동시성 코드를 제공한다.

Structured Concurrency는 다음 간단한 원칙에서 파생도니다.

작업이(task) 하위 작업으로(subtask) 분할되면 모두 같은 위치, 즉 작업 코드 블록으로 돌아간다.

 

Structured Concurrency에서 하위 작업은 작업을 대신하여 작동한다. 작업은 하우 작업의 결과를 기다리고 실패 여부를 모니터링한다. 단일 스레드의 코드에 대한 구조화된 프로그래밍 기술과 마찬가지로 여러 스레드에 대한 Structured Concurrency의 힘은 두 가지 아이디어에서 나온다.

  1. 코드 블록을 통한 실행 흐름에 대한 잘 정의된 진입점과 종료점
  2. 코드에서 구문적 중첩을 반영하는 방식으로 작업의 수명을 엄격하게 중첩한다.

Description

Structured Concurrency API의 주요 클래스는 java.util.concurrent 패키지에 있는 StructuredTaskScope 클래스다. 이 클래스를 사용하면 개발자가 concurrent subtask를 그룹 하여 구조화할 수 있고, unit 단위로 조정할 수 있다. 하위 작업은 fork 되어 개별적인 스레드로 실행되어 결합하고, unit 단이로 취소도 가능하다. 하위 작업의 성공적인 결과 또는 예외는 집계되어 부모 작업에 의해 처리된다.

 

Response handle() throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Supplier<String>  user  = scope.fork(() -> findUser());
        Supplier<Integer> order = scope.fork(() -> fetchOrder());

        scope.join()            // Join both subtasks
            .throwIfFailed();  // ... and propagate errors
       // Here, both subtasks have succeeded, so compose their 
       results         return new Response(user.get(), order.get());
    }
}

모든 조건에서 스레드의 수명은 try-with-resources 구문으로 제한된다. 또한 StructuredTaskScope를 사용하면 아래의 소성이 보장된다.

  • Error hadling with short-circuiting(단락 회로를 사용한 에러처리) : 하위 작업 findUser() 또는 fetchOrder() 작업 중 하나가 실패하면 아직 완료되지 않은 작업은 취소된다. (이는 ShutdownOnFailure에 구현된 종료 정책에 의해 관리된다.)
  • Cancellation propagation(취소 전파) : 실행 중인 스레드 handle()이 join() 호출 중이거나 호출 전에 중지(interrupted)되면 스레드가 코드 범위를 벗어날 때 두 하위 작업은 모두 자동 취소된다.
  • Clarity(명확성) : 위 코드는 구조가 명확하다. 하위 작업을 설정하고 완료되거나 취소될 때까지 대기후 성공할지(그리고 이미 완료된 하위 작업의 결과를 처리할지) 아니면 실패할지를(그리고 하위 작업이 이미 완료되었다면 정리할 것이 없음) 결정한다.
  • Observability(관찰성) : 스레드 덤프는 작업 계층 구조를 명확하게 표시하며, findUser(), fetchOrder()는 범위(Scopr)의 자식으로 표현된다.

StructuredTaskScope is a preview API, disabled by default

StructuredTaskScope API는 Java 23에서 아직 preview로 제공하는 API로 사용하려면 미리 보기 API를 활성화해서 사용할 수 있다.

  • Compile : javac --release 21 --enable-preview
  • Run : java --enable-preview

 

Using StructuredTaskScope

public class StructuredTaskScope<T> implements AutoCloseable {

    public <U extends T> Subtask<U> fork(Callable<? extends U> task);
    public void shutdown();

    public StructuredTaskScope<T> join() throws InterruptedException;
    public StructuredTaskScope<T> joinUntil(Instant deadline) throws InterruptedException, TimeoutException;
    public void close();

    protected void handleComplete(Subtask<? extends T> handle);
    protected final void ensureOwnerAndJoined();

}

StructuredTaskScope의 일반적인 워크플로우는 아래와 같다.

  1. scope를 (범위) 만든다 scope를 만드는 스레드는 소유자다.(owner)
  2. scope에서 하위 작업을 만드는 방법은 fork(Callable) 메서드를 사용한다.
  3. 언제든지 어떤 하위 작업이나 scope 소유자가 shutdown() 메서드를 호출하여 완료되지 않은 하위 작업을 취소하거나 새로운 하위 작업이 생성(forking) 되지 않도록 방지한다.
  4. scope의 소유자는 모든 하위 작업을 unit 단위로 조인한다. 소유자는 scope의 join() 메서드를 호출하여 각 작업이 완료(successfully or not) 되거나 shutdown()을 통해 취소될 때까지 기다릴 수 있다. joinUntil(java.time.Instant) 메서드로 데드라인을 정하고 대기할 수 있다.
  5. 조인 이후 하위 작업의 오류를 처리하고 해당 결과를 처리한다.
  6. 일반적으로 try-with-resources 구문을 통해 암시적으로 scope를 닫는다. 아직 종료되지 않은 경우 scope를 종료하고 취소되었지만 완료되지 않은 하위 작업을 완료될 때까지 기다린다.

 

Shutdown policies(종료 정책)

Concurrent subtask를 처리할 때 불필요한 작업을 하지 않기 위해서 short-circuiting(단락 회로) 패턴을 사용하는 것이 일반 적이다. 예를 들어 하위 작업 중 하나가 실패하면(i.e., invoke all) 모든 하위 작업을 취소하거나, 그중 하나가 성공하면(i.e., invoke any) 모든 하위 작업을 취소하는 것이 합리적인 경우가 있다. StructuredTaskScope의 하우 클래스인인 ShutdownOnFailure, ShutdownOnSuccess는 각각 첫 번째 하위 작업이 실패하거나 성공할 때 범위를 종료하는 정책을 통해 이러한 패턴을 지원하며, StructuredTaskScope를 상속받아 protected handleComplete()를 재구현하여 사용자가 원하는 사용자 정의 종료 정책을 구현할 수 있다.

 

Example : ShutdownOnFailure()

여러 작업을 동시에 실행하고 그 중 하나라도 실패하면 실패하는 예제

<T> List<T> runAll(List<Callable<T>> tasks) 
        throws InterruptedException, ExecutionException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        List<? extends Supplier<T>> suppliers = tasks.stream().map(scope::fork).toList();
        scope.join()
             .throwIfFailed();  // Propagate exception if any subtask fails
        // Here, all tasks have succeeded, so compose their results
        return suppliers.stream().map(Supplier::get).toList();
    }
}

 

Example : ShutdownOnSuccess()

하위 작업 중 처음으로 성공하는 하위 작업의 결과를 반환하는 예제 (하위 작업 하나가 성공하자 마자 범위는 자동으로 종료되고 완료되지 않는 하위 작업은 자동으로 취소된다.)

<T> T race(List<Callable<T>> tasks, Instant deadline) 
        throws InterruptedException, ExecutionException, TimeoutException {
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
        for (var task : tasks) {
            scope.fork(task);
        }
        return scope.joinUntil(deadline)
                    .result();  // Throws if none of the subtasks completed successfully
    }
}

 

 

Processing results

종료 정책을 통해 조인과 예외를 중앙에서 처리한 후 Scope의 소유자는 정책에 의해 처리되지 않는 경우 fork()  호추에서 반환된 Subtask Object를 사용하여 하위 작업의 결과를 처리할 수 있다.

Scopre  수유자가 종료 정책을 사용하는 대신 하위 작업 예외를 처리하여 복합 결과를 생성하는 경우 예외는 하위 작업의 값으로 반환될 수 있다. 예를 들어, 다음 작업 목록을 병렬로 실행하고 각 작업의 성공 또는 예외 결과가 포함된  completed Futrures 목록을 반환하는 예제는 아래와 같다.

<T> List<Future<T>> executeAll(List<Callable<T>> tasks)
        throws InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    	  List<? extends Supplier<Future<T>>> futures = tasks.stream()
    	      .map(task -> asFuture(task))
     	      .map(scope::fork)
     	      .toList();
    	  scope.join();
    	  return futures.stream().map(Supplier::get).toList();
    }
}

static <T> Callable<Future<T>> asFuture(Callable<T> task) {
   return () -> {
       try {
           return CompletableFuture.completedFuture(task.call());
       } catch (Exception ex) {
           return CompletableFuture.failedFuture(ex);
       }
   };
}

 

Custom shutdown policies

StructuredTaskScope는 상속할 수 있으며, handleComplete(...)를 오버라이드하여 재정의 할 수 있다. 하위 작업이 완료되면 shutdown(()이 호출된 후에도 하위 작업으로 handleComplete(...)에 보고된다.

public sealed interface Subtask<T> extends Supplier<T> {
    enum State { SUCCESS, FAILED, UNAVAILABLE }

    State state();
    Callable<? extends T> task();
    T get();
    Throwable exception();
}

아래 예제는 성공적으로 완료된 하위 작업의 결과를 수집하는 StructuredTaskScope 하위 클래스의 예제다. 결과를 검색하기 위해 기본 작업에서 사용할 results()를 정의한다.

class MyScope<T> extends StructuredTaskScope<T> {

    private final Queue<T> results = new ConcurrentLinkedQueue<>();

    MyScope() { super(null, Thread.ofVirtual().factory()); }

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        if (subtask.state() == Subtask.State.SUCCESS)
            results.add(subtask.get());
    }

    @Override
    public MyScope<T> join() throws InterruptedException {
        super.join();
        return this;
    }

    // Returns a stream of results from the subtasks that completed successfully
    public Stream<T> results() {
        super.ensureOwnerAndJoined();
        return results.stream();
    }

}

 

Fan-in scenario

지금까지의 예제는 여러개의 동시에 외부로 호출하는 I/O 작업을 관리하는 fan-out  시나리오를 살펴보았다. StructuredTaskScope를 사용하면 여러개의 동시에 들어오는 I/O 작업을 관리하는 fan-in 시나리오에서도 유용하게 사용 가능하다.

아래 코드는 SocketServer로 연결 처리를 하기 위해 작업을 fork하는 서버의 예제다.

void serve(ServerSocket serverSocket) throws IOException, InterruptedException {
    try (var scope = new StructuredTaskScope<Void>()) {
        try {
            while (true) {
                var socket = serverSocket.accept();
                scope.fork(() -> handle(socket));
            }
        } finally {
            // If there's been an error or we're interrupted, we stop accepting
            scope.shutdown();  // Close all active connections
            scope.join();
        }
    }
}

동시성의 광점에서 위 시나리오는 요청 방향이 아니라 작업의 시간과 수에서 fan-out 예제와 크게 다르다. 이전 예제와 다른게 scope 소유자는 기간에 제한이 없다. 즉 serve()가 중단 될 때만 중지되고, 하위 작업의 수도 알 수 없다. 외부 이벤트에 대한 응답으로 동적으로 포크되기 때문이다.

모든 연결 처리 하위 작업은 범위 내에서 생성되므로 스레드 덤프에서 해당 목적을 쉽게 확인할 수 있으며, 스레드 덤프는 이를 범위 소유자의 자식으로 표시한다. 또한 전체 서비스를 하나의 단위로 종료하는 것도 쉽다.

 

Observability

JSON 스레드 덤프 형식을 확장하여 StructuredTaskScope 스레드를 계층 구조로 그룹화한것을 확인할 수 있다.

$ jcmd <pid> Thread.dump_to_file -format=json <file>

 

참고

 

'dev > java' 카테고리의 다른 글

Building a SpringBoot Monorepo with Gradle  (2) 2024.11.06
Java - ReentrantLock  (0) 2024.07.19
Generational ZGC in Java 21  (0) 2024.07.02
Java Virtual Threads 사용 시 synchronized 주의  (0) 2024.06.04
Virtual Threads  (0) 2024.06.02