Чтобы ускорить работу наших приложений, мы очень часто решаем распараллелить работу. При изменении рабочей модели мы предоставляем пул потоков, которые будут отвечать за параллельное выполнение задач. Однако иногда, при работе в многопоточной среде, может возникнуть ситуация, когда мы насытим весь пул потоков. Java предоставляет механизмы (политики), которые инструктируют пул, как вести себя в случае перенасыщения ресурсов.
Содержание
Создание пула потоков
Наиболее распространенным способом создания пула потоков является использование базовых методов, предоставляемых Executors.
Например, если мы хотим создать пул с 10 потоками, мы вызываем метод:
Executors.newFixedThreadPool(10);
Code language: CSS (css)
Под ним вызывается класс ThreadPoolExecutor:
Executors.newFixedThreadPool(10);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
Code language: PHP (php)
Как вы можете видеть выше, мы передали только значение 10, но получили множество значений по умолчанию, включая очередь, фабрику потоков и rejected execution handler.
Очередь
Не все знают об этом, но большинство пулов потоков имеют встроенную очередь. Его задача – поставить в очередь задания, которые в данный момент не могут быть выполнены, потому что весь пул в данный момент занят. Однако стоит задуматься, что произойдет, если очередь также уже заполнена?
Постановка задачи
Итак, давайте обратимся к реализации, чтобы найти ответ на этот вопрос. Выполнение задачи осуществляется методом execute:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
Code language: JavaScript (javascript)
Как видите, если пул потоков закрыт (метод isRunning) или задание не удалось добавить (метод addWorker), вызывается метод reject:
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
Code language: JavaScript (javascript)
Этот метод на типе, реализующем RejectedExecutionHandler, вызывает метод rejectedExecution.
RejectedExecutionHandler
Мы уже знаем, что во время выполнения рабочего задания может возникнуть ситуация, когда наше задание будет отклонено. Эта задача выполняется обработчиком RejectedExecutionHandler:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Code language: PHP (php)
Политика AbortPolicy применяется к пулам потоков, созданных классом Executors:
Executors.newFixedThreadPool(10);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
Code language: PHP (php)
AbortPolicy
Как я уже писал выше, AbortPolicy – это политика, которая используется по умолчанию при создании пула с помощью класса Executors. Его единственная задача – бросать RejectedExecutionException:
public static class AbortPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Задача " + r.toString() +
" отклонена " +
e.toString());
}
}
Code language: PHP (php)
Вы можете проверить это очень легко:
@Test
void shouldThrowRejectedExecutionExceptionWithAbortPolicy() {
// given
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
1,
1,
0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
// when
threadPool.submit(() -> sleep(10_000));
threadPool.submit(() -> sleep(10_000));
// then
assertThatExceptionOfType(RejectedExecutionException.class)
.isThrownBy(() -> threadPool.submit(() -> sleep(10_000)));
}
Code language: JavaScript (javascript)
DiscardPolicy
Еще одна встроенная политика (которую можно установить в конструкторе ThreadPoolExecutor) – это политика DiscardPolicy.
Это противоположность AbortPolicy, т.е. ничего не происходит, если новая задача не может быть выполнена:
public static class DiscardPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
Code language: PHP (php)
И тест для этой политики может выглядеть следующим образом:
@Test
void shouldNotThrowRejectedExecutionExceptionWithDiscardPolicy() {
// given
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
1,
1,
0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy()
);
// when
threadPool.submit(() -> sleep(10_000));
threadPool.submit(() -> sleep(10_000));
// then
assertThatCode(() -> threadPool.submit(() -> sleep(10_000)))
.doesNotThrowAnyException();
}
Code language: JavaScript (javascript)
DiscardOldestPolicy
Эта политика очень похожа на политику DiscardPolicy с той разницей, что самая старая задача аутсорсера прекращается, а на ее место “перепрыгивает” наша самая новая задача:
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
Code language: PHP (php)
Это хорошая политика, если вы заботитесь о выполнении только самых новых задач и можете позволить себе нарушить старые. Тесты для этой политики:
@Test
void shouldReturnNewestElementsWithDiscardOldestPolicy() throws InterruptedException {
// given
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
1,
1,
0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(2),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
// when
threadPool.execute(() -> sleep(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
threadPool.execute(() -> queue.offer("Старые"));
threadPool.execute(() -> queue.offer("Работа"));
threadPool.execute(() -> queue.offer("Новые"));
threadPool.awaitTermination(100, TimeUnit.MILLISECONDS);
List<String> results = new ArrayList<>();
queue.drainTo(results);
// then
assertThat(results).containsExactlyInAnyOrder("Работа", "Новые")
.doesNotContain("Старые");
}
Code language: JavaScript (javascript)
Политика Caller-Runs
Другой встроенной политикой (которую можно установить в конструкторе ThreadPoolExecutor) является политика CallerRunsPolicy.
Это очень интересное решение, которое может позволить нам реализовать дросселирование. Это техника, которая позволяет избежать слишком интенсивного использования ресурсов, что может иметь такие последствия, как замедление работы приложения или его полное отключение. Когда заказывается новое задание и пул заполнен, задание выполняется в потоке, который заказал задание.
Таким образом, мы блокируем поток, который заказывает задание, чтобы новые задания не были добавлены в пул.
В это время пул потоков завершит часть своих задач и начнет принимать новые:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
Code language: PHP (php)
Давайте попробуем написать тесты для этого:
@Test
void shouldBlockCallerThread() {
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
1,
1,
0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
new ThreadPoolExecutor.CallerRunsPolicy());
threadPool.submit(() -> sleep(1_000));
threadPool.submit(() -> sleep(1_000));
long startTime = System.currentTimeMillis();
threadPool.submit(() -> sleep(1_000));
long blockedDuration = System.currentTimeMillis() - startTime;
assertThat(blockedDuration).isGreaterThanOrEqualTo(1_000);
}
Code language: JavaScript (javascript)
Собственная политика
Знание того, как отклоняются задания, позволяет нам написать нашу реализацию RejectedExecutionHandler. Например, мы можем сохранить отклоненные задания в базе данных:
class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
private final RejectedTasksRepository rejectedTasksRepository;
CustomRejectedExecutionHandler(final RejectedTasksRepository rejectedTasksRepository) {
this.rejectedTasksRepository = rejectedTasksRepository;
}
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
rejectedTasksRepository.save(r.toString());
}
}
Code language: PHP (php)
Последний тест касается нашей политики:
@Test
void shouldSaveRejectedTasksWithCustomPolicy() {
// given
RejectedTasksRepository repository = Mockito.mock(RejectedTasksRepository.class);
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
1,
1,
0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
new CustomRejectedExecutionHandler(repository)
);
// when
threadPool.submit(() -> sleep(10_000));
threadPool.submit(() -> sleep(10_000));
threadPool.submit(() -> sleep(10_000));
// then
verify(repository).save(anyString());
}
Code language: JavaScript (javascript)