Статьи по Java

Что делать, если пул потоков занят? – RejectedExecutionHandler

Чтобы ускорить работу наших приложений, мы очень часто решаем распараллелить работу. При изменении рабочей модели мы предоставляем пул потоков, которые будут отвечать за параллельное выполнение задач. Однако иногда, при работе в многопоточной среде, может возникнуть ситуация, когда мы насытим весь пул потоков. Java предоставляет механизмы (политики), которые инструктируют пул, как вести себя в случае перенасыщения ресурсов.

Создание пула потоков

Наиболее распространенным способом создания пула потоков является использование базовых методов, предоставляемых Executors.

Например, если мы хотим создать пул с 10 потоками, мы вызываем метод:

Executors.newFixedThreadPool(10);
Code language: CSS (css)

Под ним вызывается класс ThreadPoolExecutor:

Executors.newFixedThreadPool(10); public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... }
Code language: PHP (php)

Как вы можете видеть выше, мы передали только значение 10, но получили множество значений по умолчанию, включая очередь, фабрику потоков и rejected execution handler.

Очередь

Не все знают об этом, но большинство пулов потоков имеют встроенную очередь. Его задача – поставить в очередь задания, которые в данный момент не могут быть выполнены, потому что весь пул в данный момент занят. Однако стоит задуматься, что произойдет, если очередь также уже заполнена?

Постановка задачи

Итак, давайте обратимся к реализации, чтобы найти ответ на этот вопрос. Выполнение задачи осуществляется методом execute:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
Code language: JavaScript (javascript)

Как видите, если пул потоков закрыт (метод isRunning) или задание не удалось добавить (метод addWorker), вызывается метод reject:

final void reject(Runnable command) { handler.rejectedExecution(command, this); }
Code language: JavaScript (javascript)

Этот метод на типе, реализующем RejectedExecutionHandler, вызывает метод rejectedExecution.

RejectedExecutionHandler

Мы уже знаем, что во время выполнения рабочего задания может возникнуть ситуация, когда наше задание будет отклонено. Эта задача выполняется обработчиком RejectedExecutionHandler:

public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
Code language: PHP (php)

Политика AbortPolicy применяется к пулам потоков, созданных классом Executors:

Executors.newFixedThreadPool(10); public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
Code language: PHP (php)

AbortPolicy

Как я уже писал выше, AbortPolicy – это политика, которая используется по умолчанию при создании пула с помощью класса Executors. Его единственная задача – бросать RejectedExecutionException:

public static class AbortPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Задача " + r.toString() + " отклонена " + e.toString()); } }
Code language: PHP (php)

Вы можете проверить это очень легко:

@Test void shouldThrowRejectedExecutionExceptionWithAbortPolicy() { // given final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); // when threadPool.submit(() -> sleep(10_000)); threadPool.submit(() -> sleep(10_000)); // then assertThatExceptionOfType(RejectedExecutionException.class) .isThrownBy(() -> threadPool.submit(() -> sleep(10_000))); }
Code language: JavaScript (javascript)

DiscardPolicy

Еще одна встроенная политика (которую можно установить в конструкторе ThreadPoolExecutor) – это политика DiscardPolicy.

Это противоположность AbortPolicy, т.е. ничего не происходит, если новая задача не может быть выполнена:

public static class DiscardPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
Code language: PHP (php)

И тест для этой политики может выглядеть следующим образом:

@Test void shouldNotThrowRejectedExecutionExceptionWithDiscardPolicy() { // given final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy() ); // when threadPool.submit(() -> sleep(10_000)); threadPool.submit(() -> sleep(10_000)); // then assertThatCode(() -> threadPool.submit(() -> sleep(10_000))) .doesNotThrowAnyException(); }
Code language: JavaScript (javascript)

DiscardOldestPolicy

Эта политика очень похожа на политику DiscardPolicy с той разницей, что самая старая задача аутсорсера прекращается, а на ее место “перепрыгивает” наша самая новая задача:

public static class DiscardOldestPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
Code language: PHP (php)

Это хорошая политика, если вы заботитесь о выполнении только самых новых задач и можете позволить себе нарушить старые. Тесты для этой политики:

@Test void shouldReturnNewestElementsWithDiscardOldestPolicy() throws InterruptedException { // given final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(2), new ThreadPoolExecutor.DiscardOldestPolicy() ); // when threadPool.execute(() -> sleep(100)); BlockingQueue<String> queue = new LinkedBlockingDeque<>(); threadPool.execute(() -> queue.offer("Старые")); threadPool.execute(() -> queue.offer("Работа")); threadPool.execute(() -> queue.offer("Новые")); threadPool.awaitTermination(100, TimeUnit.MILLISECONDS); List<String> results = new ArrayList<>(); queue.drainTo(results); // then assertThat(results).containsExactlyInAnyOrder("Работа", "Новые") .doesNotContain("Старые"); }
Code language: JavaScript (javascript)

Политика Caller-Runs

Другой встроенной политикой (которую можно установить в конструкторе ThreadPoolExecutor) является политика CallerRunsPolicy.

Это очень интересное решение, которое может позволить нам реализовать дросселирование. Это техника, которая позволяет избежать слишком интенсивного использования ресурсов, что может иметь такие последствия, как замедление работы приложения или его полное отключение. Когда заказывается новое задание и пул заполнен, задание выполняется в потоке, который заказал задание.

Таким образом, мы блокируем поток, который заказывает задание, чтобы новые задания не были добавлены в пул.

В это время пул потоков завершит часть своих задач и начнет принимать новые:

public static class CallerRunsPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
Code language: PHP (php)

Давайте попробуем написать тесты для этого:

@Test void shouldBlockCallerThread() { final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new ThreadPoolExecutor.CallerRunsPolicy()); threadPool.submit(() -> sleep(1_000)); threadPool.submit(() -> sleep(1_000)); long startTime = System.currentTimeMillis(); threadPool.submit(() -> sleep(1_000)); long blockedDuration = System.currentTimeMillis() - startTime; assertThat(blockedDuration).isGreaterThanOrEqualTo(1_000); }
Code language: JavaScript (javascript)

Собственная политика

Знание того, как отклоняются задания, позволяет нам написать нашу реализацию RejectedExecutionHandler. Например, мы можем сохранить отклоненные задания в базе данных:

class CustomRejectedExecutionHandler implements RejectedExecutionHandler { private final RejectedTasksRepository rejectedTasksRepository; CustomRejectedExecutionHandler(final RejectedTasksRepository rejectedTasksRepository) { this.rejectedTasksRepository = rejectedTasksRepository; } @Override public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { rejectedTasksRepository.save(r.toString()); } }
Code language: PHP (php)

Последний тест касается нашей политики:

@Test void shouldSaveRejectedTasksWithCustomPolicy() { // given RejectedTasksRepository repository = Mockito.mock(RejectedTasksRepository.class); final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new CustomRejectedExecutionHandler(repository) ); // when threadPool.submit(() -> sleep(10_000)); threadPool.submit(() -> sleep(10_000)); threadPool.submit(() -> sleep(10_000)); // then verify(repository).save(anyString()); }
Code language: JavaScript (javascript)

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *