반응형
자바에서 비동기(Asynchronous) 프로그래밍을 가능하게하는 인터페이스.
- Future를 사용해서도 어느정도 가능했지만 하기 힘든 일들이 많았다.
Future로는 하기 어렵던 작업들
- Future를 외부에서 완료 시킬 수 없다.
- 취소하거나, get()에 타임아웃을 설정할 수는 있다.
- 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
- 여러 Future를 조합할 수 없다.
- 예) Event 정보 가져온 다음 Event에 참석하는 회원 목록 가져오기
- 예외 처리용 API를 제공하지 않는다.
- Completable 이름이 붙은 이유는 외부에서 Complet을 명시적으로 시킬 수 있다.
- 예) 몇 초 이내에 응답이 오지않으면 기본 값으로 설정
- 명시적으로 Executor(쓰레드풀)를 선언해서 사용하지 않아도 된다.
- main 쓰레드 입장에서는
get()
을 사용해야CompletableFuture
에 정의한 동작이 수행된다.- main 쓰레드에서
sleep
또는get()
메소드를 사용하지 않으면 그 작업을 기다리지 않고 바로 끝나기 때문에 사실상 해당 future 작업을 볼 수 없어서 아무일도 일어나지 않는다.
- 그러나
ForkJoinPool
에서 가져온 쓰레드는sleep
또는get()
이 없어도CompletableFuture
에 정의된 코드를 실행한다.
- main 쓰레드에서
ForkJoinPool
을 사용하여 Executor(쓰레드풀)을 따로 정의하지 않고도 쓰레드를 사용할 수 있다.- Executor(쓰레드풀)를 구현한 구현체(Dequeue를 사용)
- 자기 쓰레드가 할 일이 없으면 직접 Dequeue에서 할 일을 가져와서 처리하는 방식의 프레임워크
- 작업 단위를 자기가 파생시킨 세부적인 서브 태스크가 있다면 서브 태스크들을 잘게 쪼개서 다른 쓰레드에 분산시켜서 작업을 처리하고 모아서 결과 값을 도출한다.
- Implements Future
- Implements CompletionStage
비동기로 작업 실행하기
- 리턴값이 없는 경우:
runAsync()
- 리턴값이 있는 경우:
supplyAsync()
- 필요하다면 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다. (기본은
commonPool()
)
콜백 제공하기
thenApply(Function)
: 리턴값을 받아서 다른 값으로 바꾸는 콜백
thenAccept(Consumer)
: 리턴값을 또 다른 작업을 처리하는 콜백 (리턴없이)
thenRun(Runnable)
: 리턴값 받지 다른 작업을 처리하는 콜백
- 콜백 자체를 또 다른 쓰레드에서 실행할 수 있다.
조합하기
thenCompose()
: 두 작업이 서로 이어서 실행하도록 조합
thenCombine()
: 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행
allOf()
: 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
anyOf()
: 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행
예외처리
exeptionally(Function)
: 에러 발생 시 콜백 실행
handle(BiFunction)
CompletableFuture 예제
public class Main {
public static void main(String[] args){
/*
기존 Future의 문제점
- Future에서 get()하기 전까지 Future의 결과 값을 이용한 작업을 할 수 없다.
*/
ExecutorService executorService = Executors.newFixedThreadPool(4);
Callable<String> hello = () -> {
Thread.sleep(1000L);
return "Hello";
};
Future<String> future = executorService.submit(hello);
try {
future.get();
//Future의 값을 이용한 로직은 future.get() 이후에 등장하게 된다.
executorService.shutdown();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("#######################");
/*
CompletableFuture
- 외부에서 명시적으로 Complet 시킬 수 있다.
*/
CompletableFuture<String> future2 = new CompletableFuture<>();
future2.complete("Dev History"); //Future의 기본값 설정과 동시에 작업 완료 처리가 된다.
System.out.println(future2.isDone()); //상태 true 출력
try {
System.out.println(future2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("#######################");
//위와 결과 동일
CompletableFuture<String> future3 = CompletableFuture.completedFuture("Dev History");
System.out.println(future3.isDone()); //상태 true 출력
try {
System.out.println(future3.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("#######################");
/*
비동기로 작업 실행하기
- runAsync() : 리턴 값이 없는 경우
- supplyAsync() : 리턴 값이 있는 경우
*/
CompletableFuture<Void> future4 = CompletableFuture.runAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
});
try {
System.out.println(future4.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("#######################");
CompletableFuture<String> future5 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
});
try {
System.out.println(future5.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("#######################");
/*
CompletableFuture는 Future와 달리 콜백 주는 것이 가능하다.
그리고 get() 이전에 처리 로직을 작성하는 것이 가능해졌다.
- thenApply(Function) : 리턴 값을 받아서 다른 값으로 바꾸는 콜백
- thenAccept(Consumer) : 리턴 값으로 또 다른 작업을 처리하는 콜백(리턴없이)
- thenRun(Runnable) : 리턴 값을 다른 쓰레드의 작업에서 사용하는 콜백
*/
//받은 결과 값을 UpperCase로 변경.
CompletableFuture<String> future6 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}).thenApply((s) -> {
System.out.println(Thread.currentThread().getName());
return s.toUpperCase();
});
try {
System.out.println(future6.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("#######################");
//받은 결과 값을 UpperCase로 변경.
CompletableFuture<Void> future7 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}).thenAccept((s) -> {
System.out.println(Thread.currentThread().getName());
System.out.println(s.toLowerCase());
});
try {
future7.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("#######################");
//리턴 값 받지않고 다른 작업 처리
CompletableFuture<Void> future8 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}).thenRun(() -> {
System.out.println(Thread.currentThread().getName());
});
try {
future8.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("#######################");
/*
CompletableFuture Executor(쓰레드풀) 변경 실행에 따른 사용 쓰레드 변경 확인
*/
ExecutorService executorService2 = Executors.newFixedThreadPool(4);
CompletableFuture<Void> future9 = CompletableFuture.supplyAsync(() -> {//쓰레드풀 변경 실행. 사용 쓰레드 : pool-thread-1
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}, executorService2).thenRun(() -> {//사용 쓰레드 : pool-thread-1 또는 main (thenRun()으로 전달한 콜백은 앞선 콜백을 실행한 쓰레드(pool-thread-1)나 그 쓰레드를 파생시킨 부모(main)에서 실행한다)
System.out.println(Thread.currentThread().getName());
}).thenRunAsync(() -> { //쓰레드풀 변경 실행. 사용 쓰레드 : pool-thread-2
System.out.println(Thread.currentThread().getName());
}, executorService2).thenRun(() -> { //사용 쓰레드 : pool-thread-2 또는 main (thenRun()으로 전달한 콜백은 앞선 콜백을 실행한 쓰레드(pool-thread-2)나 그 쓰레드를 파생시킨 부모(main)에서 실행한다)
System.out.println(Thread.currentThread().getName());
}).thenRunAsync(() -> { //쓰레드풀 변경 실행. 사용 쓰레드 : pool-thread-3
System.out.println(Thread.currentThread().getName());
}, executorService2).thenRunAsync(() -> { //쓰레드풀 변경 실행. 사용 쓰레드 : pool-thread-4
System.out.println(Thread.currentThread().getName());
}, executorService2);
try {
future9.get();
executorService2.shutdown();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("#######################");
/*
비동기 작업 조합하기
- thenCompose() : 두 작업이 서로 이어서 실행하도록 조합(CompletableFuture 두 개를 연결하여 처리한 하나의 CompletableFuture가 나온다.)
- thenCombine() : 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행
- allOf() : 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
- anyOf() : 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행
*/
//서로 의존관계 : thenCompose()
CompletableFuture<String> dev = CompletableFuture.supplyAsync(() -> {
System.out.println("Dev " + Thread.currentThread().getName());
return "Dev";
});
try {
CompletableFuture<String> future10 = dev.thenCompose(Main::getWorld);
System.out.println(future10.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("#######################");
//서로 의존관계 X : thenCombine()
CompletableFuture<String> apple = CompletableFuture.supplyAsync(() -> {
System.out.println("Apple " + Thread.currentThread().getName());
return "Apple";
});
CompletableFuture<String> history = CompletableFuture.supplyAsync(() -> {
System.out.println("History " + Thread.currentThread().getName());
return "History";
});
//둘 다 모두 작업완료 했을때, 실행할 콜백 작성
CompletableFuture<String> future11 = apple.thenCombine(history, (a, h) -> {
return a + " " + h;
});
try {
System.out.println(future11.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("#######################");
/*
2개 이상일 때, 여러 서브 태스크들을 합쳐서 처리하는 방법
- allOf에 넘긴 모든 태스크들이 다 끝났을 때 모든 작업 결과에 콜백을 실행한다.
- 문제는 여러 태스크들의 결과가 동일한 타입임을 보장할 수 없기 때문에 처리가 어렵다.
*/
//join와 get은 똑같은데 예외처리방식에서 다르다.
//get은 checked Exceptions, join은 unchecked Exceptions 발생
//결과 값을 콜렉션으로 처리하기. 아래와 같이 처리하면 블로킹이 발생하지 않는다.
List<CompletableFuture<String>> futures = Arrays.asList(apple, history);
CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);
CompletableFuture<List<String>> results = CompletableFuture.allOf(futuresArray)
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
try {
results.get().forEach(System.out::println);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("#######################");
/*
anyOf에 넘긴 태스크들 중 아무거나 빨리 끝나는 작업 하나의 결과에 콜백 실행
*/
CompletableFuture<Void> results2 = CompletableFuture.anyOf(apple, history).thenAccept(System.out::println);
try {
results2.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("#######################");
/*
에러 발생 시, 콜백 실행
- exceptionally(Function) : 에러가 발생했을 경우 콜백 실행
- handle(BiFunction) : 정상적으로 종료되었을 경우, 에러가 발생했을 경우 모두 사용 가능
-> 첫 번째 파라미터는 정상적으로 종료되었을 경우의 값, 두 번째 파라미터는 에러가 발생한 경우의 값
*/
boolean throwError = true;
CompletableFuture<String> banana = CompletableFuture.supplyAsync(() -> {
if (throwError) {
throw new IllegalArgumentException();
}
System.out.println("Banana " + Thread.currentThread().getName());
return "Banana";
}).exceptionally(ex -> {
System.out.println(ex);
return "Error!";
});
try {
System.out.println(banana.get()); //에러 발생시 Error! 출력
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("#######################");
CompletableFuture<String> grape = CompletableFuture.supplyAsync(() -> {
if (throwError) {
throw new IllegalArgumentException();
}
System.out.println("Grape " + Thread.currentThread().getName());
return "Grape";
}).handle((result, ex) -> {
if(ex != null){
System.out.println(ex);
return "ERROR!";
}
return result;
});
try {
System.out.println(grape.get()); //에러 발생시 Error! 출력
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
private static CompletableFuture<String> getWorld(String message) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("World " + Thread.currentThread().getName());
return message + " World";
});
}
}
[참고자료]
반응형
'Java > 기본' 카테고리의 다른 글
자바 배열 정렬(Java Array Sort) - 정렬(Arrays.sort), 병렬 정렬(Arrays.parallelSort) 비교 (0) | 2021.07.05 |
---|---|
자바 애노테이션(Java Annotation)의 변화 (0) | 2021.07.05 |
자바 동시성(Java Concurrent) - 3 (Callable, Future) (0) | 2021.07.04 |
자바 동시성(Java Concurrent) - 2 (Executors) (0) | 2021.07.04 |
자바 동시성(Java Concurrent) - 1 (Thread) (0) | 2021.07.02 |