ThreadPoolExecutor – Beispiel für Java-Thread-Pool

Java-Thread-Pool verwaltet die Gruppe von Worker-Threads. Es enthält eine Warteschlange, die Aufgaben enthält, die darauf warten, ausgeführt zu werden. Wir können ThreadPoolExecutor verwenden, um in Java einen Thread-Pool zu erstellen. Der Java-Thread-Pool verwaltet die Sammlung von ausführbaren Threads. Die Worker-Threads führen ausführbare Threads aus der Warteschlange aus. java.util.concurrent.Executors bietet Fabrik- und Unterstützungsmethoden für das java.util.concurrent.Executor-Interface, um den Thread-Pool in Java zu erstellen. Executors ist eine Hilfsklasse, die auch nützliche Methoden bereitstellt, um mit ExecutorService, ScheduledExecutorService, ThreadFactory und Callable-Klassen durch verschiedene Fabrikmethode zu arbeiten. Lassen Sie uns ein einfaches Programm schreiben, um sein Funktionieren zu erklären. Zuerst benötigen wir eine Runnable-Klasse, die WorkerThread.java benannt ist

package com.journaldev.threadpool;

public class WorkerThread implements Runnable {
  
    private String command;
    
    public WorkerThread(String s){
        this.command=s;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" Start. Command = "+command);
        processCommand();
        System.out.println(Thread.currentThread().getName()+" End.");
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString(){
        return this.command;
    }
}

ExecutorService-Beispiel

Hier ist die Testprogrammklasse SimpleThreadPool.java, in der wir einen Thread-Pool mit fester Größe von Executors-Framework erstellen.

package com.journaldev.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SimpleThreadPool {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            Runnable worker = new WorkerThread("" + i);
            executor.execute(worker);
          }
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}

In dem obigen Programm erstellen wir einen Thread-Pool mit fester Größe von 5 Worker-Threads. Dann übergeben wir diesem Pool 10 Jobs, da die Poolgröße 5 beträgt, werden 5 Jobs gestartet und andere Jobs werden im Wartezustand sein, sobald einer der Jobs beendet ist, wird ein weiterer Job aus der Warteschlange von Worker-Thread aufgegriffen und ausgeführt. Hier ist die Ausgabe des obigen Programms.

pool-1-thread-2 Start. Command = 1
pool-1-thread-4 Start. Command = 3
pool-1-thread-1 Start. Command = 0
pool-1-thread-3 Start. Command = 2
pool-1-thread-5 Start. Command = 4
pool-1-thread-4 End.
pool-1-thread-5 End.
pool-1-thread-1 End.
pool-1-thread-3 End.
pool-1-thread-3 Start. Command = 8
pool-1-thread-2 End.
pool-1-thread-2 Start. Command = 9
pool-1-thread-1 Start. Command = 7
pool-1-thread-5 Start. Command = 6
pool-1-thread-4 Start. Command = 5
pool-1-thread-2 End.
pool-1-thread-4 End.
pool-1-thread-3 End.
pool-1-thread-5 End.
pool-1-thread-1 End.
Finished all threads

Die Ausgabe bestätigt, dass es fünf Threads im Pool namens „pool-1-thread-1“ bis „pool-1-thread-5“ gibt und sie dafür verantwortlich sind, die übergebenen Aufgaben im Pool auszuführen.

ThreadPoolExecutor Beispiel

Die Klasse Executors bietet eine einfache Implementierung von ExecutorService unter Verwendung von ThreadPoolExecutor, aber ThreadPoolExecutor bietet viele weitere Funktionen als das. Wir können die Anzahl der Threads angeben, die beim Erstellen einer ThreadPoolExecutor-Instanz aktiv sein sollen, und die Größe des Thread-Pools begrenzen sowie unsere eigene Implementierung von RejectedExecutionHandler erstellen, um mit den Aufgaben umzugehen, die nicht in die Worker-Warteschlange passen. Hier ist unsere benutzerdefinierte Implementierung des RejectedExecutionHandler-Interfaces.

package com.journaldev.threadpool;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString() + " is rejected");
    }

}

ThreadPoolExecutor bietet mehrere Methoden, mit denen wir den aktuellen Zustand des Executors, die Poolgröße, die Anzahl der aktiven Threads und die Aufgabenanzahl ermitteln können. Daher habe ich einen Überwachungsthread, der die Executor-Informationen in bestimmten Zeitintervallen ausgibt.

package com.journaldev.threadpool;

import java.util.concurrent.ThreadPoolExecutor;

public class MyMonitorThread implements Runnable
{
    private ThreadPoolExecutor executor;
    private int seconds;
    private boolean run=true;

    public MyMonitorThread(ThreadPoolExecutor executor, int delay)
    {
        this.executor = executor;
        this.seconds=delay;
    }
    public void shutdown(){
        this.run=false;
    }
    @Override
    public void run()
    {
        while(run){
                System.out.println(
                    String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
                        this.executor.getPoolSize(),
                        this.executor.getCorePoolSize(),
                        this.executor.getActiveCount(),
                        this.executor.getCompletedTaskCount(),
                        this.executor.getTaskCount(),
                        this.executor.isShutdown(),
                        this.executor.isTerminated()));
                try {
                    Thread.sleep(seconds*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
        }
            
    }
}

Hier ist ein Beispiel für die Thread-Pool-Implementierung unter Verwendung von ThreadPoolExecutor.

package com.journaldev.threadpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class WorkerPool {

    public static void main(String args[]) throws InterruptedException{
        //Implementierung des RejectedExecutionHandler
        RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
        //Holen Sie sich die ThreadFactory-Implementierung
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        //Erstellen des ThreadPoolExecutor
        ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(2), threadFactory, rejectionHandler);
        //Starten des Überwachungsthreads
        MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
        Thread monitorThread = new Thread(monitor);
        monitorThread.start();
        //Arbeit zum Thread-Pool übermitteln
        for(int i=0; i<10; i++){
            executorPool.execute(new WorkerThread("cmd"+i));
        }
        
        Thread.sleep(30000);
        //Pool herunterfahren
        executorPool.shutdown();
        //Überwachungsthread herunterfahren
        Thread.sleep(5000);
        monitor.shutdown();
        
    }
}

Beachten Sie, dass beim Initialisieren des ThreadPoolExecutor die Anfangsgröße des Pools auf 2, die maximale Poolgröße auf 4 und die Größe der Arbeitswarteschlange auf 2 gesetzt wird. Wenn also 4 Aufgaben ausgeführt werden und weitere Aufgaben eingereicht werden, wird die Arbeitswarteschlange nur 2 von ihnen halten, und der Rest wird von RejectedExecutionHandlerImpl behandelt. Hier ist die Ausgabe des obigen Programms, die die obige Aussage bestätigt.

pool-1-thread-1 Start. Command = cmd0
pool-1-thread-4 Start. Command = cmd5
cmd6 is rejected
pool-1-thread-3 Start. Command = cmd4
pool-1-thread-2 Start. Command = cmd1
cmd7 is rejected
cmd8 is rejected
cmd9 is rejected
[monitor] [0/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false
[monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false
pool-1-thread-4 End.
pool-1-thread-1 End.
pool-1-thread-2 End.
pool-1-thread-3 End.
pool-1-thread-1 Start. Command = cmd3
pool-1-thread-4 Start. Command = cmd2
[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false
[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false
pool-1-thread-1 End.
pool-1-thread-4 End.
[monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true
[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true

Beachten Sie die Änderung der Anzahl der aktiven, abgeschlossenen und insgesamt abgeschlossenen Aufgaben des Executors. Wir können die Methode shutdown() aufrufen, um die Ausführung aller eingereichten Aufgaben zu beenden und den Thread-Pool zu beenden. Wenn Sie eine Aufgabe planen möchten, die mit Verzögerung oder regelmäßig ausgeführt wird, können Sie die Klasse ScheduledThreadPoolExecutor verwenden. Lesen Sie mehr darüber unter Java Schedule Thread Pool Executor.

Source:
https://www.digitalocean.com/community/tutorials/threadpoolexecutor-java-thread-pool-example-executorservice