Java의 ThreadPoolExecutor, Spring의 ThreadPoolTaskExecutor
작성일: 2020-09-20 16:55
- Java의 ThreadPoolExecutor를 통해 ThreadPool을 설정하는 방법과 Spring의 ThreadPoolTaskExecutor가 ThreadPoolExecutor를 어떻게 활용하는지 알아보겠습니다.
# Java의 ThreadPoolExecutor
- ThreadPool은 애플리케이션의 특징 및 JVM을 구동시키는 하드웨어 사양에 따라 세밀하게 조정이 필요합니다.
- 자바 1.5이후 ExecutorService의 구현체인 ThreadPoolExecutor를 통해 스레드 풀을 직접 설정하여 사용할 수 있습니다.
# ThreadPoolExecutor 생성자
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
- corePoolSize
- 스레드 풀에 계속해서 유지시킬 스레드 개수입니다.
- maximumPoolSize
- 스레드 풀에 최대한으로 가질 수 있는 스레드 개수입니다.
- workQueue가 가득찼을 때 여기서 지정된 숫자로 스레드가 증가하며 workQueue가 가득차지 않으면 스레드 개수는 증가하지 않습니다.
- keepAliveTime
- workQueue가 가득차 maximumPoolSize까지 스레드가 개수가 증가한 후 증가된 스레드를 유지시킬 시간을 정의합니다.
- unit
- keepAliveTime에서 사용한 TimeUnit 타입을 정의합니다.
- workQueue
- 작업이 corePoolSize를 초과할 때 나머지 작업들을 보관할 BlockingQueue를 정의합니다.
- threadFactory (생략 가능)
- 스레드 풀에서 스레드를 생성할 때 사용할 threadFactory를 정의합니다.
- handler (생략 가능)
- 스레드 풀이 가득차 더 이상 작업을 수용할 수 없을 때 사용되는 핸들러를 정의합니다.
스레드 풀의 스레드 개수의 증가 절차는
corePoolSize 증가 -> workQueue가 가득찰 때까지 queue에 추가 -> maximumPoolSize 증가
입니다.
# Executors 팩토리를 활용하여 ThreadPoolExecutor 생성하기
- Java에서 제공하는 Executors 팩토리에는 ThreadPoolExecutor를 이용한 팩토리 메서드들을 제공합니다.
# Executors.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
1
2
3
4
5
2
3
4
5
- newFixedThreadPool은 메서드 명와 동일하게 고정된 크기의 스레드 풀을 생성할 수 있습니다.
- 해당 메서드는 작업 큐를 크기가 제한되지 않은 LinkedBlokingQueue로 지정했기 때문에 위의 사실상 maximumPoolSize는 사용되지 않습니다.
- 즉 작업이 계속해서 들어오더라도 지정된 스레드 개수만큼 스레드가 증가하고 그 이후에 작업 큐에 계속해서 작업이 쌓이게 됩니다.
# 동작 확인
// 100ms 만큼 block되는 작업
private Runnable getTask() {
return () -> {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
}
// threadPool를 종료하고 모든 작업이 끝날 때까지 기다린다.
private void shutDownAndWaitUntilTerminated(ExecutorService executorService) {
try {
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Test
void testFixedThreadPool() throws Exception {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
// 100개의 작업을 수행
IntStream.range(0, 100).forEach(i -> threadPoolExecutor.execute(getTask()));
int poolSize = threadPoolExecutor.getPoolSize();
int queueSize = threadPoolExecutor.getQueue().size();
assertThat(poolSize).isEqualTo(5);
assertThat(queueSize).isEqualTo(95);
String message = String.format("CurrentPoolSize: %s, WorkQueueSize: %s", poolSize, queueSize);
System.out.println(message);
shutDownAndWaitUntilTerminated(threadPoolExecutor);
}
// 촐력결과 -> CurrentPoolSize: 5, WorkQueueSize: 95
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
- 5개의 고정된 스레드 풀을 할당하였으므로 100개의 작업을 동시에 실행하더라도 나머지 95개의 작업은 작업 큐에 쌓이는 것을 확인할 수 있습니다.
newFixedThreadPool는 workQueue 사이즈가 제한되지 않기 때문에 작업이 계속해서 들어온다면 메모리 초과가 발생할 수 있으므로 상용에서 사용하기엔 무리가 있습니다.
# Executors.newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
1
2
3
4
5
2
3
4
5
- newCachedThreadPool은 corePoolSize를 0개로 지정하고, maximumPoolSize를 Integer.MAX_VALUE입니다.
- 즉 작업이 계속 추가되면 Integer.MAX_VALUE까지 스레드는 계속해서 늘어날 수 있습니다.
- 늘어난 스레드는 keepAliveTime에 지정된 만큼 60초동안 유지된 후 제거됩니다.
- 그리고 workQueue는 SynchronousQueue로 지정된 것을 알 수 있는데 SynchronousQueue는 이름을 Queue이지만 기존 Queue와는 다르게 들어오는 작업을 즉시 작업 스레드에게 넘겨줍니다.
- 그러므로 작업이 계속해서 추가되면 SynchronousQueue는 스레드에게 작업을 즉시 넘겨주게 되므로 스레드 풀의 스레드는 계속해서 늘어나게 됩니다.
# 동작 확인
@Test
void testCachedThreadPool() throws Exception {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
IntStream.range(0, 1000).forEach(i -> threadPoolExecutor.execute(getTask()));
int poolSize = threadPoolExecutor.getPoolSize();
int queueSize = threadPoolExecutor.getQueue().size();
assertThat(poolSize).isEqualTo(1000);
assertThat(queueSize).isEqualTo(0);
TimeUnit.SECONDS.sleep(65);
// keepAlive 시간이후엔 스레드들이 제거된다.
assertThat(threadPoolExecutor.getPoolSize()).isEqualTo(0);
shutDownAndWaitUntilTerminated(threadPoolExecutor);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- newCachedThreadPool를 활용하여 1000개의 작업을 동시에 수행시키면 즉시 poolSize는 1000개까지 증가하여 작업을 수행합니다.
- 그리고 keepAliveTime이 60초이므로 그 이후엔 추가기된 스레드는 제거됩니다.
newCachedThreadPool은 작업이 계속해서 들어온다면 스레드 풀의 스레드 개수를 거의 무제한으로 증가시키기 때문에 상용에서 사용하기엔 무리가 있습니다.
# ThreadPoolExecutor 직접 생성하기
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 15, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>(5));
1
- corePoolSize 5개, maximumPoolSize 15개, queueSize 5개로 제한된 ThreadPoolExecutor를 이용하여 ThreadPoolExecutor가 어떻게 동작하는지 확인해보겠습니다.
- 그리고 keepAliveTime은 5초로 주겠습니다.
# 10개의 작업이 동시에 수행될 때
@Test
void testCustomThreadPoolWithTenTasks() throws Exception {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 15, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>(5));
IntStream.range(0, 10).forEach(i -> threadPoolExecutor.execute(getTask()));
int poolSize = threadPoolExecutor.getPoolSize();
int queueSize = threadPoolExecutor.getQueue().size();
assertThat(poolSize).isEqualTo(5);
assertThat(queueSize).isEqualTo(5);
shutDownAndWaitUntilTerminated(threadPoolExecutor);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
- 10개의 작업이 동시에 수행되면 corePoolSize(5)만큼 작업이 동시에 수행되고, 나머지 5개는 workQueue에 쌓이게 됩니다.
# 20개의 작업이 동시에 수행될 때
@Test
void testCustomThreadPoolWithTwentyTasks() throws Exception {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 15, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>(5));
IntStream.range(0, 20).forEach(i -> threadPoolExecutor.execute(getTask()));
int poolSize = threadPoolExecutor.getPoolSize();
int queueSize = threadPoolExecutor.getQueue().size();
assertThat(poolSize).isEqualTo(15);
assertThat(queueSize).isEqualTo(5);
TimeUnit.SECONDS.sleep(6);
// keepAlive 시간이후엔 corePoolSize만큼 돌아온다.
assertThat(threadPoolExecutor.getPoolSize()).isEqualTo(5);
shutDownAndWaitUntilTerminated(threadPoolExecutor);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- 20개의 작업을 동시에 수행하면 corePoolSize(5)만큼 작업이 동시에 수행되고 나머지 15개는 workQueue에 쌓으려고 하지만 queueSize는 5이므로 maximumPoolSize까지 threadPool이 증가됩니다.
- 그리고 keepAliveTime(5초)이 지나면 poolSize는 corePoolSize만큼 돌아오게 됩니다.
# 30개의 작업이 동시에 수행될 때
@Test
void testCustomThreadPoolWithThirtyTasks() throws Exception {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 15, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>(5));
// RejectedExecutionException 발생
Assertions.assertThrows(RejectedExecutionException.class, () -> {
IntStream.range(0, 30).forEach(i -> threadPoolExecutor.execute(getTask()));
});
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
- 30개의 작업이 동시에 수행되면 maximumPoolSize + queueSize = 20개이므로 스레드 풀이 수용할 수 있는 최대 작업수를 초과하게되어 RejectedExecutionHandler가 동작하게 됩니다.
- ThreadPoolExecutor 생성 시 따로 RejectedExecutionHandler를 정의하지 않으면 DefaultHandler로 ThreadPoolExecutor.AbortPolicy가 사용되며 해당 핸들러는 RejectedExecutionException를 발생시킵니다.
# Spring의 ThreadPoolTaskExecutor
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
private int corePoolSize = 1;
private int maxPoolSize = Integer.MAX_VALUE;
private int keepAliveSeconds = 60;
private int queueCapacity = Integer.MAX_VALUE;
@Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
// BlockingQueue 생성
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
// 생략 ...
// ThreadPoolExecutor 생성
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
// 생략 ...
return executor;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
- ThreadPoolExecutor에 더하여 모니터링 등 다양한 기능을 제공하는 Spring ThreadPoolTaskExecutor도 내부적으로 ThreadPoolExecutor를 생성하여 사용하고 있습니다.
@Test
void testThreadPoolTaskExecutor()throws Exception {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setQueueCapacity(5);
threadPoolTaskExecutor.setMaxPoolSize(15);
threadPoolTaskExecutor.setKeepAliveSeconds(5);
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
- 이렇게 ThreadPoolTaskExecutor를 생성한다면 위에서 생성한 ThreadPoolExecutor와 동일하게 설정할 수 있습니다.
- ThreadPoolExecutor와 다른점은 Queue을 직접 생성하지 않고 QueueCapacity만 지정하는 것인데요.
// ThreadPoolTaskExecutor의 createQueue
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
if (queueCapacity > 0) {
return new LinkedBlockingQueue<>(queueCapacity);
}
else {
return new SynchronousQueue<>();
}
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
- 큐를 생성하는 메서드를 확인해보면 지정된 QueueCapacity가 0보다 크다면 LinkedBlokingQueue를 사용하고, 그렇지 않으면 작업을 즉시 작업 스레드에게 넘겨주는 SynchronousQueue를 사용하는 것을 알 수 있습니다.