반응형

자바에서 비동기(Asynchronous) 프로그래밍을 가능하게하는 인터페이스.

  • Future를 사용해서도 어느정도 가능했지만 하기 힘든 일들이 많았다.

 

Future로는 하기 어렵던 작업들

  • Future를 외부에서 완료 시킬 수 없다.
    • 취소하거나, get()에 타임아웃을 설정할 수는 있다.
  • 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
  • 여러 Future를 조합할 수 없다.
    • 예) Event 정보 가져온 다음 Event에 참석하는 회원 목록 가져오기
  • 예외 처리용 API를 제공하지 않는다.

 

CompletableFuture

  • Completable 이름이 붙은 이유는 외부에서 Complet을 명시적으로 시킬 수 있다.
    • 예) 몇 초 이내에 응답이 오지않으면 기본 값으로 설정
  • 명시적으로 Executor(쓰레드풀)를 선언해서 사용하지 않아도 된다.
  • main 쓰레드 입장에서는 get()을 사용해야 CompletableFuture에 정의한 동작이 수행된다.
    • main 쓰레드에서 sleep 또는 get() 메소드를 사용하지 않으면 그 작업을 기다리지 않고 바로 끝나기 때문에 사실상 해당 future 작업을 볼 수 없어서 아무일도 일어나지 않는다.
    • 그러나 ForkJoinPool에서 가져온 쓰레드는 sleep 또는 get()이 없어도 CompletableFuture에 정의된 코드를 실행한다.
  • ForkJoinPool을 사용하여 Executor(쓰레드풀)을 따로 정의하지 않고도 쓰레드를 사용할 수 있다.
    • Executor(쓰레드풀)를 구현한 구현체(Dequeue를 사용)
    • 자기 쓰레드가 할 일이 없으면 직접 Dequeue에서 할 일을 가져와서 처리하는 방식의 프레임워크
    • 작업 단위를 자기가 파생시킨 세부적인 서브 태스크가 있다면 서브 태스크들을 잘게 쪼개서 다른 쓰레드에 분산시켜서 작업을 처리하고 모아서 결과 값을 도출한다.
  • Implements Future

 

비동기로 작업 실행하기

  • 리턴값이 없는 경우: runAsync()
  • 리턴값이 있는 경우: supplyAsync()
  • 필요하다면 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다. (기본은 commonPool())

 

콜백 제공하기

  • thenApply(Function): 리턴값을 받아서 다른 값으로 바꾸는 콜백
  • thenAccept(Consumer): 리턴값을 또 다른 작업을 처리하는 콜백 (리턴없이)
  • thenRun(Runnable): 리턴값 받지 다른 작업을 처리하는 콜백
  • 콜백 자체를 또 다른 쓰레드에서 실행할 수 있다.

 

조합하기

  • thenCompose(): 두 작업이 서로 이어서 실행하도록 조합
  • thenCombine(): 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행
  • allOf(): 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
  • anyOf(): 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행

 

예외처리

  • exeptionally(Function) : 에러 발생 시 콜백 실행
  • handle(BiFunction)

 

CompletableFuture 예제

public class Main {
    public static void main(String[] args){
        /*
            기존 Future의 문제점
            - Future에서 get()하기 전까지 Future의 결과 값을 이용한 작업을 할 수 없다.
         */
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        Callable<String> hello = () -> {
            Thread.sleep(1000L);
            return "Hello";
        };

        Future<String> future = executorService.submit(hello);
        try {
            future.get();
            //Future의 값을 이용한 로직은 future.get() 이후에 등장하게 된다.
            executorService.shutdown();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("#######################");

        /*
            CompletableFuture
            - 외부에서 명시적으로 Complet 시킬 수 있다.
         */
        CompletableFuture<String> future2 = new CompletableFuture<>();
        future2.complete("Dev History"); //Future의 기본값 설정과 동시에 작업 완료 처리가 된다.
        System.out.println(future2.isDone()); //상태 true 출력
        try {
            System.out.println(future2.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("#######################");
        //위와 결과 동일
        CompletableFuture<String> future3 = CompletableFuture.completedFuture("Dev History");
        System.out.println(future3.isDone()); //상태 true 출력
        try {
            System.out.println(future3.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("#######################");

        /*
            비동기로 작업 실행하기
            - runAsync() : 리턴 값이 없는 경우
            - supplyAsync() : 리턴 값이 있는 경우
         */
        CompletableFuture<Void> future4 = CompletableFuture.runAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
        });
        try {
            System.out.println(future4.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("#######################");
        CompletableFuture<String> future5 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });
        try {
            System.out.println(future5.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("#######################");
        /*
            CompletableFuture는 Future와 달리 콜백 주는 것이 가능하다.
            그리고 get() 이전에 처리 로직을 작성하는 것이 가능해졌다.
            - thenApply(Function) : 리턴 값을 받아서 다른 값으로 바꾸는 콜백
            - thenAccept(Consumer) : 리턴 값으로 또 다른 작업을 처리하는 콜백(리턴없이)
            - thenRun(Runnable) : 리턴 값을 다른 쓰레드의 작업에서 사용하는 콜백
         */
        //받은 결과 값을 UpperCase로 변경.
        CompletableFuture<String> future6 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).thenApply((s) -> {
            System.out.println(Thread.currentThread().getName());
            return s.toUpperCase();
        });
        try {
            System.out.println(future6.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("#######################");

        //받은 결과 값을 UpperCase로 변경.
        CompletableFuture<Void> future7 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).thenAccept((s) -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println(s.toLowerCase());
        });
        try {
            future7.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("#######################");

        //리턴 값 받지않고 다른 작업 처리
        CompletableFuture<Void> future8 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).thenRun(() -> {
            System.out.println(Thread.currentThread().getName());
        });
        try {
            future8.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("#######################");

        /*
            CompletableFuture Executor(쓰레드풀) 변경 실행에 따른 사용 쓰레드 변경 확인
         */
        ExecutorService executorService2 = Executors.newFixedThreadPool(4);
        CompletableFuture<Void> future9 = CompletableFuture.supplyAsync(() -> {//쓰레드풀 변경 실행. 사용 쓰레드 : pool-thread-1
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }, executorService2).thenRun(() -> {//사용 쓰레드 : pool-thread-1 또는 main (thenRun()으로 전달한 콜백은 앞선 콜백을 실행한 쓰레드(pool-thread-1)나 그 쓰레드를 파생시킨 부모(main)에서 실행한다)
            System.out.println(Thread.currentThread().getName());
        }).thenRunAsync(() -> { //쓰레드풀 변경 실행. 사용 쓰레드 : pool-thread-2
            System.out.println(Thread.currentThread().getName());
        }, executorService2).thenRun(() -> { //사용 쓰레드 : pool-thread-2 또는 main (thenRun()으로 전달한 콜백은 앞선 콜백을 실행한 쓰레드(pool-thread-2)나 그 쓰레드를 파생시킨 부모(main)에서 실행한다)
            System.out.println(Thread.currentThread().getName());
        }).thenRunAsync(() -> { //쓰레드풀 변경 실행. 사용 쓰레드 : pool-thread-3
            System.out.println(Thread.currentThread().getName());
        }, executorService2).thenRunAsync(() -> { //쓰레드풀 변경 실행. 사용 쓰레드 : pool-thread-4
            System.out.println(Thread.currentThread().getName());
        }, executorService2);
        try {
            future9.get();
            executorService2.shutdown();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("#######################");

        /*
            비동기 작업 조합하기
            - thenCompose() : 두 작업이 서로 이어서 실행하도록 조합(CompletableFuture 두 개를 연결하여 처리한 하나의 CompletableFuture가 나온다.)
            - thenCombine() : 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행
            - allOf() : 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
            - anyOf() : 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행
         */
        //서로 의존관계 : thenCompose()
        CompletableFuture<String> dev = CompletableFuture.supplyAsync(() -> {
            System.out.println("Dev " + Thread.currentThread().getName());
            return "Dev";
        });
        try {
            CompletableFuture<String> future10 = dev.thenCompose(Main::getWorld);
            System.out.println(future10.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("#######################");

        //서로 의존관계 X : thenCombine()
        CompletableFuture<String> apple = CompletableFuture.supplyAsync(() -> {
            System.out.println("Apple " + Thread.currentThread().getName());
            return "Apple";
        });

        CompletableFuture<String> history = CompletableFuture.supplyAsync(() -> {
            System.out.println("History " + Thread.currentThread().getName());
            return "History";
        });
        //둘 다 모두 작업완료 했을때, 실행할 콜백 작성
        CompletableFuture<String> future11 = apple.thenCombine(history, (a, h) -> {
            return a + " " + h;
        });
        try {
            System.out.println(future11.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("#######################");

        /*
            2개 이상일 때, 여러 서브 태스크들을 합쳐서 처리하는 방법
            - allOf에 넘긴 모든 태스크들이 다 끝났을 때 모든 작업 결과에 콜백을 실행한다.
            - 문제는 여러 태스크들의 결과가 동일한 타입임을 보장할 수 없기 때문에 처리가 어렵다.
         */
        //join와 get은 똑같은데 예외처리방식에서 다르다.
        //get은 checked Exceptions, join은 unchecked Exceptions 발생
        //결과 값을 콜렉션으로 처리하기. 아래와 같이 처리하면 블로킹이 발생하지 않는다.
        List<CompletableFuture<String>> futures = Arrays.asList(apple, history);
        CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);

        CompletableFuture<List<String>> results = CompletableFuture.allOf(futuresArray)
                .thenApply(v -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()));

        try {
            results.get().forEach(System.out::println);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("#######################");

        /*
            anyOf에 넘긴 태스크들 중 아무거나 빨리 끝나는 작업 하나의 결과에 콜백 실행
         */
        CompletableFuture<Void> results2 = CompletableFuture.anyOf(apple, history).thenAccept(System.out::println);
        try {
            results2.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("#######################");

        /*
            에러 발생 시, 콜백 실행
            - exceptionally(Function) : 에러가 발생했을 경우 콜백 실행
            - handle(BiFunction) : 정상적으로 종료되었을 경우, 에러가 발생했을 경우 모두 사용 가능
               -> 첫 번째 파라미터는 정상적으로 종료되었을 경우의 값, 두 번째 파라미터는 에러가 발생한 경우의 값
         */
        boolean throwError = true;
        CompletableFuture<String> banana = CompletableFuture.supplyAsync(() -> {
            if (throwError) {
                throw new IllegalArgumentException();
            }
            System.out.println("Banana " + Thread.currentThread().getName());
            return "Banana";
        }).exceptionally(ex -> {
            System.out.println(ex);
            return "Error!";
        });
        try {
            System.out.println(banana.get()); //에러 발생시 Error! 출력
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("#######################");

        CompletableFuture<String> grape = CompletableFuture.supplyAsync(() -> {
            if (throwError) {
                throw new IllegalArgumentException();
            }
            System.out.println("Grape " + Thread.currentThread().getName());
            return "Grape";
        }).handle((result, ex) -> {
            if(ex != null){
                System.out.println(ex);
                return "ERROR!";
            }
            return result;
        });
        try {
            System.out.println(grape.get()); //에러 발생시 Error! 출력
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return message + " World";
        });
    }
}

[참고자료]

더 자바, Java 8, 백기선

Class ForkJoinPool

Class CompletableFuture<T>

 

반응형

+ Recent posts