본문 바로가기
개발

ThreadPoolTaskExecutor와 CompletableFutre를 사용하여 비동기처리하기

by autocat 2024. 9. 30.

CompletableFuture 간단히 사용해보기

Java 8에서 추가된 CompletableFuture는 비동기 작업을 쉽게 처리하고, 메서드들을 결합해 더 직관적인 코드를 작성할 수 있도록 돕는 클래스입니다. 기존의 Future는 에러 핸들링이나 메서드 체이닝에 제한적이었으나, CompletableFuture는 이를 보완하여 더욱 유연한 비동기 작업이 가능하게 해줍니다.

예를 들어, 아래와 같은 코드를 작성할 수 있습니다

CompletableFuture.supplyAsync(() -> "Hello")  
    .thenApply(s -> s + " World")  
    .thenApply(String::toUpperCase)  
    .thenAccept(s -> System.out.println(s));

이 코드는 다음과 같은 과정을 거칩니다:

  1. supplyAsync()는 비동기 작업을 시작합니다. 여기서는 "Hello"라는 값을 반환하는 작업을 실행합니다.
  2. thenApply()는 첫 번째 비동기 작업의 결과인 "Hello"를 받아 "Hello World"로 문자열을 추가합니다.
  3. 또 다른 thenApply()는 결과 문자열을 대문자로 변환합니다.
  4. 마지막으로 thenAccept()는 처리된 결과를 출력합니다. 이 메서드는 반환값이 없고, 비동기 작업이 완료됨을 의미합니다.

에러 핸들링도 간단히!

비동기 작업에서 에러가 발생할 수 있습니다. CompletableFuture에서는 exceptionally() 메서드를 사용하여 에러를 처리할 수 있습니다.

CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Error"); })
    .exceptionally(throwable -> {  
        System.out.println("Error occurred");
        return "error";
    })
    .thenApply(s -> s + " world")
    .thenApply(String::toUpperCase)
    .thenAccept(System.out::println);

이 코드를 실행하면 비동기 작업 중 에러가 발생하지만, exceptionally() 블록에서 에러를 처리하고 "error"라는 값을 반환한 후, 이후 체이닝된 메서드들이 정상적으로 실행됩니다.

출력 결과는 다음과 같습니다:

Error occurred
ERROR WORLD

ForkJoinPool 기본 사용

CompletableFuture.supplyAsync()를 보면, 내부적으로 ForkJoinPoolcommonPool을 사용한다는 것을 알 수 있습니다. ForkJoinPool은 워크 스틸링(work-stealing) 알고리즘을 통해 스레드 풀을 관리하며, 비동기 작업의 효율적인 분산 처리를 돕습니다. 기본적으로 시스템의 가용 프로세서 수에 맞춰 스레드 풀 크기를 설정합니다.


ForkJoinPool의 기본 원리

CompletableFuture는 기본적으로 ForkJoinPool.commonPool()을 사용해 비동기 작업을 처리합니다. ForkJoinPool은 효율적인 스레드 분산을 위해 설계되었으며, 특히 워크 스틸링(work-stealing) 알고리즘을 사용해 부하를 분산시킵니다. 간단히 말해, 각 스레드가 할당된 작업을 처리하다가 다른 스레드가 바쁘면 그 스레드의 남은 작업을 가져와 처리하는 방식입니다.

ForkJoinPool의 특징은 아래와 같습니다:

  • 작업 분할(Fork)과 병합(Join): 큰 작업을 작은 단위로 나눠서 처리하고, 그 결과를 다시 병합하는 방식으로 병렬 처리를 최적화합니다.
  • 스레드 재사용: CPU 코어 수에 맞춰 스레드 풀 크기를 자동으로 조정하며, 공용 풀(common pool)을 통해 시스템 자원을 효율적으로 사용합니다.

하지만 모든 상황에서 ForkJoinPool을 사용하는 것이 적합한 것은 아닙니다. 특히, CPU 집약적인 작업이 아닌 경우에는 다른 스레드 풀을 사용하는 것이 더 나을 수 있습니다.


왜 ThreadPoolTaskExecutor를 사용할까?

기본적으로 ForkJoinPool을 사용하는 것도 괜찮지만, 몇 가지 이유로 ThreadPoolTaskExecutor를 사용하는 것이 더 적합한 경우가 있습니다.

  1. 스프링과의 통합: ThreadPoolTaskExecutor는 스프링 프레임워크와 잘 통합되어 있어, 스프링 애플리케이션에서 더 쉽게 구성하고 관리할 수 있습니다.
  2. 세밀한 제어: 코어 풀 크기, 최대 풀 크기, 큐 용량 등을 더 세밀하게 설정할 수 있어, 애플리케이션의 특성에 맞게 최적화할 수 있습니다.
  3. 모니터링 및 관리: 스프링 메트릭스와 액추에이터를 사용해 스레드 풀을 쉽게 모니터링하고 관리할 수 있습니다.
  4. 작업 유형에 따른 적합성: ForkJoinPool은 분할 정복 알고리즘에 최적화되어 있지만, 일반적인 비동기 작업에서는 ThreadPoolTaskExecutor가 더 나은 선택이 될 수 있습니다.

다음 코드는 ThreadPoolTaskExecutor를 설정하는 예시입니다.

@Bean(name = "cpuBoundExecutor")  
public Executor cpuBoundExecutor() {  
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
    int processors = Runtime.getRuntime().availableProcessors();  
    executor.setCorePoolSize(processors); // CPU 코어 수만큼 스레드 설정
    executor.setMaxPoolSize(processors + 1); // 최대 스레드 수 설정
    executor.setQueueCapacity(500); // 대기열 용량 설정
    executor.setKeepAliveSeconds(30); // 스레드 유지 시간 설정
    executor.setAllowCoreThreadTimeOut(true); // 코어 스레드도 타임아웃 허용
    executor.setThreadNamePrefix("CPUTask-"); // 스레드 이름 접두사
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 작업 거부 정책 설정
    executor.initialize();  
    return executor;  
}

주요 옵션 설정

  • CorePoolSize: 스레드 풀이 항상 유지할 스레드 수입니다. CPU 바운드 작업의 경우, 코어 수와 동일하거나 ±1 정도로 설정하는 것이 좋습니다.
  • MaxPoolSize: 최대 스레드 수로, 높은 부하 상황에서 추가로 생성할 수 있는 스레드의 한계를 설정합니다.
  • QueueCapacity: 작업 대기열의 크기를 지정하며, 대기 중인 작업을 관리합니다.
  • KeepAliveSeconds: 코어 스레드 외의 추가 스레드가 유휴 상태일 때 얼마 동안 유지할지를 설정합니다.
  • RejectedExecutionHandler: 큐가 가득 찼을 때 어떻게 처리할지 설정합니다. 기본값은 AbortPolicy로, 예외를 발생시킵니다.

CPU 바운드 작업과 I/O 바운드 작업에 맞는 설정

ThreadPoolTaskExecutor의 설정은 작업 유형에 따라 다르게 적용될 수 있습니다. CPU 바운드 작업과 I/O 바운드 작업에 따라 적절한 스레드 풀 설정을 달리할 수 있습니다.

@Bean(name = "ioBoundExecutor")  
public Executor ioBoundExecutor() {  
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
    int processors = Runtime.getRuntime().availableProcessors();  
    executor.setCorePoolSize(processors * 2); // I/O 바운드 작업에는 더 많은 스레드 설정
    executor.setMaxPoolSize(processors * 4); // 최대 스레드 수 설정
    executor.setQueueCapacity(5000); // 더 큰 큐 용량 설정
    executor.setKeepAliveSeconds(300); // 스레드 유지 시간 증가
    executor.setAllowCoreThreadTimeOut(false); // 코어 스레드는 항상 유지
    executor.setThreadNamePrefix("IOTask-"); // I/O 작업에 맞는 스레드 이름
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 작업 거부 시 호출자 스레드에서 실행
    executor.initialize();  
    return executor;  
}

이 코드에서는 I/O 바운드 작업에 적합한 설정을 사용합니다. 예를 들어, 대기 시간이 많은 I/O 작업에는 더 많은 스레드와 대기열 용량이 필요하므로, CPU 코어 수의 2배에서 4배 정도의 스레드 풀 크기를 설정합니다.


CompletableFutureThreadPoolTaskExecutor 적용 예시

이제 ThreadPoolTaskExecutorCompletableFuture를 사용하여 비동기 작업을 처리하는 방법을 알아보겠습니다. 아래 예시는 쿠폰을 대량으로 생성하고 저장하는 서비스를 구현하는 과정입니다.

쿠폰 생성 비동기 처리

쿠폰을 대량으로 생성하는 작업은 I/O 작업이 많이 발생하는 대표적인 경우입니다. 이런 경우에 ThreadPoolTaskExecutor를 사용하여 비동기 처리 성능을 최적화할 수 있습니다. 다음은 CouponService에서 쿠폰을 비동기로 생성하는 코드 예시입니다.

// CouponService.java
private final Executor customThreadPoolExecutor; 
// ThreadPoolTaskExecutor 의존성 주입

public void insertCoupons() {
    CompletableFuture.supplyAsync(this::generateRandomCoupon, customThreadPoolExecutor)
        .exceptionally(throwable -> {
            log.error("Error occurred while inserting coupon", throwable);
            return Coupon.builder().build();
        })
        .thenAccept(coupon -> {
            if (StringUtils.hasText(coupon.getName())) {
                couponRepository.save(coupon);
            } else {
                log.warn("Coupon name is empty");
            }
        });
}

코드 설명

  1. supplyAsync(): 쿠폰을 생성하는 작업을 비동기로 실행합니다. 여기서는 customThreadPoolExecutor를 두 번째 인자로 전달하여 스프링의 ThreadPoolTaskExecutor를 사용하게 설정했습니다. 이렇게 하면 기본적으로 ForkJoinPool을 사용하는 대신, 더 세밀하게 설정된 스레드 풀에서 작업을 처리하게 됩니다.

  2. exceptionally(): 쿠폰 생성 과정에서 예외가 발생할 경우, 이 블록에서 예외를 처리하고 로그를 남깁니다. 또한, 예외가 발생했을 때 기본 쿠폰 객체를 반환해 후속 처리가 정상적으로 이어지도록 합니다.

  3. thenAccept(): 생성된 쿠폰 객체를 받아서 저장소에 저장합니다. 쿠폰의 이름이 유효한 경우에만 저장을 시도하며, 그렇지 않은 경우에는 경고 로그를 남깁니다.


대량 쿠폰 생성 처리

아래 코드에서는 대량의 쿠폰을 생성하는 작업을 비동기로 처리하는 예시를 볼 수 있습니다. 3000개의 쿠폰을 생성하여 insertCoupons() 메서드를 통해 비동기 처리하도록 했습니다.

// CouponController.java
public void insertMassiveCoupons() {
    for (int i = 0; i < 3000; i++) {
        couponService.insertCoupons();
    }
}

이 코드에서는 3000개의 쿠폰을 순차적으로 생성 요청합니다. insertCoupons() 메서드에서 각 쿠폰 생성 작업은 비동기로 처리되므로, 메인 스레드가 차단되지 않고 동시에 여러 작업이 처리됩니다. 이때, ThreadPoolTaskExecutor를 통해 적절한 스레드 풀에서 비동기 작업이 수행됩니다.


성능 최적화 확인

위 코드를 실행해 보면, CompletableFutureThreadPoolTaskExecutor를 사용해 스레드를 관리하며 비동기 작업을 효율적으로 처리하는 것을 확인할 수 있습니다. 실제로 얼마나 많은 스레드가 생성되고 재사용되는지를 로그로 확인할 수 있습니다.

private Coupon generateRandomCoupon() {
    log.info("generateRandomCoupon() method is called by: {}", Thread.currentThread().getName());
    return Coupon.builder()
            .name(UUID.randomUUID().toString())
            .build();
}

로그에서는 다음과 같은 메시지를 확인할 수 있습니다:

INFO  [CPUTask-1] - generateRandomCoupon() method is called by: CPUTask-1
INFO  [CPUTask-2] - generateRandomCoupon() method is called by: CPUTask-2
INFO  [CPUTask-3] - generateRandomCoupon() method is called by: CPUTask-3
...

이 로그를 통해 ThreadPoolTaskExecutor에서 생성한 스레드들이 재사용되며 작업을 처리하는 것을 확인할 수 있습니다. 이처럼 비동기 처리는 스레드를 효율적으로 관리해 대량 작업을 빠르게 처리할 수 있게 도와줍니다.


결론

CompletableFutureThreadPoolTaskExecutor는 비동기 작업을 효과적으로 처리하는 강력한 도구입니다. ForkJoinPool을 기본으로 사용하는 것도 좋지만, 스프링에서 제공하는 ThreadPoolTaskExecutor를 사용하면 더 세밀한 설정과 모니터링이 가능합니다. 특히, 대량의 I/O 작업이나 CPU 바운드 작업을 처리할 때 각 작업에 맞는 최적의 스레드 풀을 설정하는 것이 성능 최적화의 핵심입니다.