ThreadPoolExecutor – Ejemplo de grupo de hilos en Java

Java thread pool administra el conjunto de hilos de trabajo. Contiene una cola que mantiene las tareas en espera para ser ejecutadas. Podemos usar ThreadPoolExecutor para crear un grupo de hilos en Java. El grupo de hilos de Java administra la colección de hilos Runnable. Los hilos de trabajo ejecutan los hilos Runnable desde la cola. java.util.concurrent.Executors proporciona métodos de fábrica y de soporte para la interfaz java.util.concurrent.Executor para crear el grupo de hilos en Java. Executors es una clase de utilidad que también proporciona métodos útiles para trabajar con ExecutorService, ScheduledExecutorService, ThreadFactory y las clases Callable a través de varios métodos de fábrica. Escribamos un programa simple para explicar su funcionamiento. Primero, necesitamos tener una clase Runnable, llamada WorkerThread.java

package com.journaldev.threadpool;

public class WorkerThread implements Runnable {
  
    private String command;
    
    public WorkerThread(String s){
        this.command=s;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" Start. Command = "+command);
        processCommand();
        System.out.println(Thread.currentThread().getName()+" End.");
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString(){
        return this.command;
    }
}

Ejemplo de ExecutorService

Aquí está la clase del programa de prueba SimpleThreadPool.java, donde estamos creando un grupo de hilos fijos desde el marco de Executors.

package com.journaldev.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SimpleThreadPool {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            Runnable worker = new WorkerThread("" + i);
            executor.execute(worker);
          }
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}

En el programa anterior, estamos creando un grupo de hilos de tamaño fijo de 5 hilos de trabajo. Luego, estamos enviando 10 trabajos a este grupo, ya que el tamaño del grupo es 5, comenzará a trabajar en 5 trabajos y otros trabajos estarán en estado de espera. Tan pronto como uno de los trabajos haya terminado, otro trabajo de la cola de espera será recogido por el hilo de trabajo y se ejecutará. Aquí está la salida del programa anterior.

pool-1-thread-2 Start. Command = 1
pool-1-thread-4 Start. Command = 3
pool-1-thread-1 Start. Command = 0
pool-1-thread-3 Start. Command = 2
pool-1-thread-5 Start. Command = 4
pool-1-thread-4 End.
pool-1-thread-5 End.
pool-1-thread-1 End.
pool-1-thread-3 End.
pool-1-thread-3 Start. Command = 8
pool-1-thread-2 End.
pool-1-thread-2 Start. Command = 9
pool-1-thread-1 Start. Command = 7
pool-1-thread-5 Start. Command = 6
pool-1-thread-4 Start. Command = 5
pool-1-thread-2 End.
pool-1-thread-4 End.
pool-1-thread-3 End.
pool-1-thread-5 End.
pool-1-thread-1 End.
Finished all threads

La salida confirma que hay cinco hilos en el grupo llamados “pool-1-thread-1” a “pool-1-thread-5” y son responsables de ejecutar las tareas enviadas al grupo.

Ejemplo de ThreadPoolExecutor

La clase Executors proporciona una implementación simple de ExecutorService utilizando ThreadPoolExecutor, pero ThreadPoolExecutor ofrece muchas más características que eso. Podemos especificar el número de hilos que estarán activos cuando creemos una instancia de ThreadPoolExecutor y podemos limitar el tamaño del grupo de hilos y crear nuestra propia implementación de RejectedExecutionHandler para manejar los trabajos que no pueden caber en la cola de trabajadores. Aquí está nuestra implementación personalizada de la interfaz RejectedExecutionHandler.

package com.journaldev.threadpool;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString() + " is rejected");
    }

}

ThreadPoolExecutor proporciona varios métodos mediante los cuales podemos averiguar el estado actual del ejecutor, el tamaño del grupo, el recuento de hilos activos y el recuento de tareas. Así que tengo un hilo de monitorización que imprimirá la información del ejecutor en un cierto intervalo de tiempo.

package com.journaldev.threadpool;

import java.util.concurrent.ThreadPoolExecutor;

public class MyMonitorThread implements Runnable
{
    private ThreadPoolExecutor executor;
    private int seconds;
    private boolean run=true;

    public MyMonitorThread(ThreadPoolExecutor executor, int delay)
    {
        this.executor = executor;
        this.seconds=delay;
    }
    public void shutdown(){
        this.run=false;
    }
    @Override
    public void run()
    {
        while(run){
                System.out.println(
                    String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
                        this.executor.getPoolSize(),
                        this.executor.getCorePoolSize(),
                        this.executor.getActiveCount(),
                        this.executor.getCompletedTaskCount(),
                        this.executor.getTaskCount(),
                        this.executor.isShutdown(),
                        this.executor.isTerminated()));
                try {
                    Thread.sleep(seconds*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
        }
            
    }
}

Aquí está el ejemplo de implementación del grupo de hilos usando ThreadPoolExecutor.

package com.journaldev.threadpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class WorkerPool {

    public static void main(String args[]) throws InterruptedException{
        Implementación de RejectedExecutionHandler
        RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
        Obtén la implementación de ThreadFactory para usar
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        creando el ThreadPoolExecutor
        ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(2), threadFactory, rejectionHandler);
        inicia el hilo de monitoreo
        MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
        Thread monitorThread = new Thread(monitor);
        monitorThread.start();
        envía trabajo al grupo de hilos
        for(int i=0; i<10; i++){
            executorPool.execute(new WorkerThread("cmd"+i));
        }
        
        Thread.sleep(30000);
        cierra el grupo
        executorPool.shutdown();
        cierra el hilo de monitorización
        Thread.sleep(5000);
        monitor.shutdown();
        
    }
}

Observa que al inicializar el ThreadPoolExecutor, mantenemos el tamaño del grupo inicial en 2, el tamaño máximo del grupo en 4 y el tamaño de la cola de trabajo en 2. Por lo tanto, si hay 4 tareas en ejecución y se envían más tareas, la cola de trabajo retendrá solo 2 de ellas y el resto será manejado por RejectedExecutionHandlerImpl. Aquí está la salida del programa anterior que confirma la declaración anterior.

pool-1-thread-1 Start. Command = cmd0
pool-1-thread-4 Start. Command = cmd5
cmd6 is rejected
pool-1-thread-3 Start. Command = cmd4
pool-1-thread-2 Start. Command = cmd1
cmd7 is rejected
cmd8 is rejected
cmd9 is rejected
[monitor] [0/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false
[monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false
pool-1-thread-4 End.
pool-1-thread-1 End.
pool-1-thread-2 End.
pool-1-thread-3 End.
pool-1-thread-1 Start. Command = cmd3
pool-1-thread-4 Start. Command = cmd2
[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false
[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false
pool-1-thread-1 End.
pool-1-thread-4 End.
[monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true
[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true

Observa el cambio en el recuento de tareas activas, completadas y completadas totales del ejecutor. Podemos invocar el método shutdown() para finalizar la ejecución de todas las tareas enviadas y terminar el grupo de hilos. Si deseas programar una tarea para que se ejecute con retraso o periódicamente, puedes usar la clase ScheduledThreadPoolExecutor. Lee más sobre ellos en Java Schedule Thread Pool Executor.

Source:
https://www.digitalocean.com/community/tutorials/threadpoolexecutor-java-thread-pool-example-executorservice