-
Notifications
You must be signed in to change notification settings - Fork 2
Open
Labels
🐇 11 Concurrency11장 동시성11장 동시성
Description
Chapter : 11. 동시성
Item : 80. 스레드보다는 실행자, 태스크, 스트림을 애용하라
Assignee : byunghyunkim0
🍑 서론
과거에는 단순한 작업 큐(work queue)를 작성하기 위해서 많은 코드를 작성해야 했다.
🍑 본론
java.util.concurrent
- java.util.concurrent 패키지에는 실행자 프레임워크(Executor Framework)가 있다.
- 과거에 작성한 작업 큐를 다음의 한 줄로 생성할 수 있게 되었다.
// 작업 큐를 생성
ExecutorService exec = Executors.newSingleThreadExecutor();
// 실행자에 실행할 태스크를 넘기는 방법
exec.execute(runnable);
// 실행자 종료 (이 작업이 실패하면 VM 자체가 종료되지 않을 것이다.)
exec.shutdown();실행자 프레임워크 주요 기능
- 특정 태스크가 완료되기를 기다린다.
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () -> {
Thread.sleep(2000L);
return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println("Started!");
System.out.println(helloFuture.get()); // hello가 완료될때까지 기다림
System.out.println("End!!");
executorService.shutdown();
}
}- 태스크 모음 중 아무것 하나(invokeAny 메서드) 혹은 모든 태스크(invokeAll 메서드)가 완료되기를 기다린다.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
Callable<String> hello = () -> {
Thread.sleep(2000L);
return "Hello";
};
Callable<String> java = () -> {
Thread.sleep(4000L);
return "Java";
};
Callable<String> study = () -> {
Thread.sleep(1000L);
return "Study";
};
List<Future<String>> allFutures = executorService.invokeAll(Arrays.asList(hello, java, study));
for (Future<String> f : allFutures) {
System.out.println(f.get()); // Hello, Java, Study 순서대로 출력된다.
}
String anyFutures = executorService.invokeAny(Arrays.asList(hello, java, study));
System.out.println(anyFutures); // Study
executorService.shutdown();
}
}- 실행자 서비스가 종료하기를 기다린다. (awaitTermination 메서드)
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws InterruptedException {
// awaitTermination 를 이용해서 시간측정을 하려고 한다.
// 일단 시작 시간을 저장한다.
LocalDateTime startTime = LocalDateTime.now();
// 쓰레드풀 생성
ExecutorService executorService = Executors.newFixedThreadPool(4);
// ? ~ 5 초 사이에 끝나는 Runnable 들을 submit 한다.
executorService.submit(getRunnable(new Random().nextLong(3000,5000)));
executorService.submit(getRunnable(new Random().nextLong(2000,5000)));
executorService.submit(getRunnable(new Random().nextLong(4000,5000)));
executorService.submit(getRunnable(new Random().nextLong(3000,5000)));
// ExecutorService shutdown!
executorService.shutdown();
// executorService.shutdown(); 를 호출한 main 쓰레드가
// executorService 가 완전히 종료될 때까지 기다리는(= blocking)
// 하는 작업을 수행한다. 기다리는 시간을 첫번째 파라미터로 넣는데,
// 여기서는 무한정 대기인 Long.MAX_VALUE 을 준다.
System.out.println("Blocking 시작!");
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
// blocking 하는 데 걸린 시간 측정
LocalDateTime endTime = LocalDateTime.now();
System.out.println("걸린 시간: "
+ Duration.between(startTime, endTime).toSeconds() + "초");
System.out.println("main thread 끝!");
}
private static Runnable getRunnable(Long time) {
return () -> {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
System.out.println("InterruptedException occurred from ... "
+ Thread.currentThread().getName());
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
};
}
}- 완료된 태스크들의 결과를 차례로 받는다 (ExecutorCompletionService 이용)
package test;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
// 스레드 풀 생성
ExecutorService executorService = Executors.newFixedThreadPool(3);
// ExecutorCompletionService 생성
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
// 작업 제출
completionService.submit(new MyTask("Study"));
completionService.submit(new MyTask("Hello"));
completionService.submit(new MyTask("Java"));
try {
// 작업 결과 순서대로 받기
for (int i = 0; i < 3; i++) {
Future<String> future = completionService.take();
System.out.println(future.get());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
// 스레드 풀 종료
executorService.shutdown();
}
}
private record MyTask(String name) implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep((int) (Math.random() * 3000)); // 작업 처리 시간 지연
return name;
}
}
}- 태스크를 특정 시간에 혹은 주기적으로 실행하게 한다 (ScheduledThreadPoolExecutor 이용)
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
// ScheduledThreadPoolExecutor 생성
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// 1초 후에 작업 실행
scheduler.schedule(new MyTask("Task 1"), 1, TimeUnit.SECONDS);
// 2초 후에 작업 실행, 그 후 3초 간격으로 반복 실행
scheduler.scheduleAtFixedRate(new MyTask("Task 2"), 2, 3, TimeUnit.SECONDS);
// 3초 후에 작업 실행, 그 후 5초 간격으로 반복 실행
scheduler.scheduleWithFixedDelay(new MyTask("Task 3"), 3, 5, TimeUnit.SECONDS);
// 10초 후에 스레드 풀 종료
scheduler.schedule(() -> scheduler.shutdown(), 10, TimeUnit.SECONDS);
}
private record MyTask(String name) implements Runnable {
@Override
public void run() {
System.out.println(name + " started");
try {
Thread.sleep(2000); // 2초 동안 작업 수행
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " finished");
}
}
}ThreadPool 종류
ThreadPoolExecutor
- 평범하지 않은 실행자를 원한다면 사용
- 스레드 풀 동작을 결정하는 모든 속성을 설정할 수 있다.
Executors.newCachedThreadPool
- 가벼운 프로그램을 실행하는 서버에 적합 (특별하게 설정할게 없고 일반적인 용도에 적합)
- 무거운 프로덕션 서버에는 좋지 못함
- CachedThreadPool은 요청받은 태스크들이 큐에 쌓이지 않고 즉시 스레드에 위임돼 실행하기 때문에 가용한 스레드가 없다면 새로 하나를 생성
- 서버가 무겁다면 CPU 이용률이 100%로 치닫게되고, 새로운 태스크가 또 다른 스레드를 생성함
Executors.newFixedThreadPool
- 무거운 프로덕션 서버에는 Executors.newFixedThreadPool을 선택하여 스레드 개수를 고정하는게 좋음
주의점
- 작업 큐를 직접 만들거나 스레드를 직접 다루는 것을 삼가는게 좋다.
- 스레드를 직접 다루지 말고, 실행자 프레임워크를 이용해라.
포크-조인(fork-join)
- java7부터 지원
- ForkJoinTask의 인스턴스는 작은 하위 태스크로 나뉠 수 있고 ForkJoinPool을 구성하는 스레드들이 이 태스크들을 처리하며, 일을 먼저 끝낸 스레드가 다른 스레드의 남은 태스크를 가져와 대신 처리할 수도 있다.
- 최대한의 CPU 활용을 통해서 높은 처리량과 낮은 지연시간을 달성한다.
Callable & Runable
Callable과 Runnable은 자바에서 비동기 작업을 정의하기 위해 사용되는 두 가지 인터페이스입니다.
-
Callable
- Callable 인터페이스는 call() 메서드를 정의하고 있습니다.
- call() 메서드는 작업 수행 후 결과값을 반환할 수 있습니다.
- Callable은 작업의 결과를 반환할 수 있기 때문에, 주로 작업의 결과를 활용해야 하는 경우에 사용됩니다.
- Callable 인터페이스는 Future 객체와 함께 사용되어, 작업 결과를 비동기적으로 가져올 수 있습니다.
-
Runnable
- Runnable 인터페이스는 run() 메서드를 정의하고 있습니다.
- run() 메서드는 작업 수행 후 결과값을 반환할 수 없습니다.
- Runnable은 작업의 결과를 반환할 수 없기 때문에, 주로 작업 자체만 수행하면 되는 경우에 사용됩니다.
- Runnable 인터페이스는 Thread 객체와 함께 사용되어, 작업을 비동기적으로 실행할 수 있습니다.
차이점
- Callable은 작업 결과를 반환할 수 있지만, Runnable은 작업 결과를 반환할 수 없습니다.
- Callable은 Future 객체와 함께 사용되고, Runnable은 Thread 객체와 함께 사용됩니다.
- Callable은 예외를 throw할 수 있지만, Runnable은 예외를 throw할 수 없습니다.
사용 사례
- Callable은 작업의 결과가 필요한 경우에 사용됩니다. 예를 들어, 데이터베이스 조회, 파일 읽기 등의 작업에 사용할 수 있습니다.
- Runnable은 작업의 결과가 필요 없고 단순히 작업을 수행하면 되는 경우에 사용됩니다. 예를 들어, 로그 기록, 이미지 처리 등의 작업에 사용할 수 있습니다.
🍑 결론
실행자 프레임워클르 통해서 스레드를 관리해라
Referenced by
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
🐇 11 Concurrency11장 동시성11장 동시성