Java线程池
线程池的基本概念
什么是线程池
Java中的线程池是一种管理和复用线程的机制,它可以有效地控制并发执行的多个任务。线程池包含一组预先创建的线程,这些线程可以被任务动态地分配和重复利用,而不是为每个任务都创建一个新线程。这种做法有助于减少线程创建和销毁的开销,并且可以更有效地利用系统资源。
使用线程池有哪些优势
- 降低资源消耗:通过重复利用现有的线程来执行任务,避免多次创建和销毁线程。
- 提高响应速度:因为省去了创建线程这个步骤,所以在任务来的时候,可以立刻开始执行。
- 提高线程的可管理性:线程池进行统一的分配、调优和监控。
- 避免任务过载: 当系统负载过高时,线程池可以通过控制任务队列的大小或者拒绝策略来避免任务过载。
- 提供更多更强大的功能:线程池的可拓展性使得我们可以自己加入新的功能,比如说定时、延时来执行某些线程。
JUC线程池架构
JUC指的是Java的并发工具包java.util.concurrent
。
ThreadPoolExecutor的顶层接口是Executor:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需要提供Runnable对象,将任务运行逻辑提交到执行器(Executor)中,由Executor框架来调配和执行。
ThreadPoolExecutor
ThreadPoolExecutor部分源码。
1 2 3 4 5 6 7 8 9 10 11
| public ThreadPoolExecutor(int corePoolSize, // 核心线程数量 int maximumPoolSize,// 最大线程数 long keepAliveTime, // 最大空闲时间 TimeUnit unit, // 时间单位 BlockingQueue<Runnable> workQueue, // 任务队列 ThreadFactory threadFactory, // 线程工厂 RejectedExecutionHandler handler // 饱和处理机制 ) { ... }
|
线程池参数解析
- corePoolSize用于设置核心(Core)线程池数量。
- 线程池接收到新任务,当前工作线程数少于corePoolSize,即使有空闲的工作线程,也会创建新的线程来处理该请求,直到线程数达到corePoolSize。
- maximumPoolSize用于设置最大线程数量。
- 当前工作线程数多于corePoolSize数量,但小于maximumPoolSize数量,那么仅当任务排队队列已满时才会创建新线程。
- maximumPoolSize被设置为无界值(如Integer.MAX_VALUE)时,线程池可以接收任意数量的并发任务。
- BlockingQueue(阻塞队列)的实例用于暂时接收到的异步任务,如果线程池的核心线程都在忙,那么所接收到的目标任务缓存在阻塞队列中。
- keepAliveTime空闲线程存活时间,当前的线程数量大于corePoolSize,那么在指定的时间后,这个空闲的线程将被销毁,这个指定的时间就是keepAliveTime。
- unit空闲线程存活时间单位
- threadFactory线程工厂,创建新线程的时候使用的工厂,可以用来指定线程名等等。
线程池的工作流程
自定义线程池
自定义线程池步骤:
编写任务类(MyTask),实现Runnable接口;
编写线程类(MyWorker),用于执行任务,需要持有所有任务;
编写线程池类(MyThreadPool),包含提交任务,执行任务的能力;
编写测试类(MyTest),创建线程池对象,提交多个任务测试。
1、编写任务类(MyTask),实现Runnable接口;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package cn.lrwjz.mythreadpool;
public class MyTask implements Runnable{ private int id;
public MyTask(int id) { this.id = id; }
@Override public void run() { String name = Thread.currentThread().getName(); System.out.println("线程:"+name+"即将执行任务:"+id); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程:"+name+"完成了任务:"+id); }
@Override public String toString() { return "MyTask{" + "id=" + id + '}'; } }
|
2、编写线程类(MyWorker),用于执行任务,需要持有所有任务;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package cn.lrwjz.mythreadpool;
import java.util.List;
public class MyWorker extends Thread { private String name; private List<Runnable> tasks;
public MyWorker(String name, List<Runnable> tasks) { super(name); this.tasks = tasks; }
@Override public void run() { while (tasks.size() > 0){ Runnable runnable = tasks.remove(0); runnable.run(); } } }
|
3、编写线程池类(MyThreadPool),包含提交任务,执行任务的能力;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package cn.lrwjz.mythreadpool;
import java.util.Collections; import java.util.LinkedList; import java.util.List;
public class MyThreadPool { private List<Runnable> tasks = Collections.synchronizedList(new LinkedList<>()); private int num; private int corePoolSize; private int maxSize; private int workSize;
public MyThreadPool(int corePoolSize, int maxSize, int workSize) { this.corePoolSize = corePoolSize; this.maxSize = maxSize; this.workSize = workSize; }
public void submit(Runnable r){ if(tasks.size() >= workSize){ System.out.println("任务:"+r+"被丢弃了...."); }else{ tasks.add(r); execTask(r); }
}
private void execTask(Runnable r) { if(num < corePoolSize){ new MyWorker("核心线程:"+(num+1), tasks).start(); num++; }else if(num < maxSize){ new MyWorker("非核心线程:"+(num+1), tasks).start(); num++; }else{ System.out.println("任务:"+r+"被缓存了...."); } } }
|
4、编写测试类(MyTest),创建线程池对象,提交多个任务测试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package cn.lrwjz.mythreadpool;
public class MyTest { public static void main(String[] args) { MyThreadPool myThreadPool = new MyThreadPool(2, 4, 10); for (int i = 1; i <= 3; i++) { MyTask myTask = new MyTask(i); myThreadPool.submit(myTask); } } }
|
ExecutorService
ExecutorService创建线程池的三种方式。
ExecutorService接口是java内置的线程池接口,获取ExecutorService可以利用JDK中的Executors 类中的静态方法,常用获取方式如下:
static ExecutorService newCachedThreadPool()
:创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程。
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
:线程池中的所有线程都使用ThreadFactory来创建,这样的线程无需手动启动,自动执行
static ExecutorService newFixedThreadPool(int nThreads)
:创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待。
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
:创建一个可重用固定线程数的线程池且线程池中的所有线程都使用ThreadFactory来创建。
static ExecutorService newSingleThreadExecutor()
:创建单个线程数的线程池,它可以保证先进先出的执行顺序。
static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
:创建一个使用单个 worker 线程的 Executor,且线程池中的所有线程都使用ThreadFactory来创建。
常用方法:
void shutdown()
启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
List<Runnable> shutdownNow()
停止所有正在执行的任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
<T> Future<T> submit(Callable<T> task)
执行带返回值的任务,返回一个Future对象。
Future<?> submit(Runnable task)
执行 Runnable 任务,并返回一个表示该任务的 Future。
<T> Future<T> submit(Runnable task, T result)
执行 Runnable 任务,并返回一个表示该任务的 Future。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| package cn.lrwjz.threadpool;
import org.junit.Test;
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory;
public class ExecutorServiceTest { @Test public void test01(){ ExecutorService pool = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { pool.submit(new MyRunnable(i)); } }
@Test public void test02(){ ExecutorService pool = Executors.newFixedThreadPool(4); for (int i = 0; i < 20; i++) { pool.submit(new MyRunnable(i)); } }
@Test public void test03(){ ExecutorService pool = Executors.newSingleThreadExecutor(); for (int i = 0; i < 20; i++) { pool.submit(new MyRunnable(i)); } }
@Test public void test04(){ ExecutorService pool = Executors.newSingleThreadExecutor(new ThreadFactory() { private int n = 1; @Override public Thread newThread(Runnable r) { return new Thread(r, "自定义的线程名称:"+n++); } }); for (int i = 0; i < 20; i++) { pool.submit(new MyRunnable(i)); } }
@Test public void test05(){ ExecutorService pool = Executors.newSingleThreadExecutor(); for (int i = 0; i < 20; i++) { pool.submit(new MyRunnable(i)); } pool.shutdown();
}
@Test public void test06(){ ExecutorService pool = Executors.newSingleThreadExecutor(); for (int i = 0; i < 20; i++) { pool.submit(new MyRunnable(i)); } pool.shutdownNow(); }
class MyRunnable implements Runnable{ private int id;
public MyRunnable(int id) { this.id = id; }
@Override public void run() { String name = Thread.currentThread().getName(); System.out.println(name+"执行了任务..."+id); } } }
|
ScheduledExecutorService
ScheduledExecutorService创建线程池的两种方式。
ScheduledExecutorService是ExecutorService的子接口,具备了延迟运行或定期执行任务的能力,常用获取方式如下:
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
:创建一个可重用固定线程数的线程池且允许延迟运行或定期执行任务。
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
:创建一个可重用固定线程数的线程池且线程池中的所有线程都使用ThreadFactory来创建,且允许延迟运行或定期执行任务。
static ScheduledExecutorService newSingleThreadScheduledExecutor()
:创建一个单线程执行程序,且允许延迟运行或定期执行任务。
static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
:创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
常用方法:
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
- 延迟时间单位是unit,数量是delay的时间后执行callable。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
- 延迟时间单位是unit,数量是delay的时间后执行command。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
- 延迟时间单位是unit,数量是initialDelay的时间后,每间隔period时间重复执行一次command。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
- 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package cn.lrwjz.threadpool;
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit;
public class ScheduleExecutorServiceTest {
public static void main(String[] args) { test03(); }
private static void test01() { ScheduledExecutorService pool = Executors.newScheduledThreadPool(3); for (int i = 1; i <= 10; i++) { pool.schedule(new MyRunnable(i), 2, TimeUnit.SECONDS); } }
private static void test02() { ScheduledExecutorService pool = Executors.newScheduledThreadPool(3, new ThreadFactory() { private int n = 1; @Override public Thread newThread(Runnable r) { return new Thread(r, "自定义的线程名称:"+n++); } });
pool.scheduleAtFixedRate(new MyRunnable(1), 1,2, TimeUnit.SECONDS); }
private static void test03() { ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor(); pool.scheduleWithFixedDelay(new MyRunnable(1), 1,2, TimeUnit.SECONDS); } } class MyRunnable implements Runnable{ private int id;
public MyRunnable(int id) { this.id = id; }
@Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } String name = Thread.currentThread().getName(); System.out.println(name+"执行了任务..."+id); } }
|
Future异步计算结果
开发中,我们有时需要利用线程进行一些计算,然后获取这些计算的结果,而java中的Future接口就是专门用于描述异步计算结果的,我们可以通过Future 对象获取线程计算的结果,Future 的常用方法如下:
boolean cancel(boolean mayInterruptIfRunning)
试图取消对此任务的执行。
V get()
如有必要,等待计算完成,然后获取其结果。
V get(long timeout, TimeUnit unit)
如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
boolean isCancelled()
如果在任务正常完成前将其取消,则返回 true。
boolean isDone()
如果任务已完成,则返回 true。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package cn.lrwjz.threadpool;
import java.util.concurrent.*;
public class FutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newCachedThreadPool(); Future<Integer> future = pool.submit(new MyCallable(1, 2)); boolean done = future.isDone(); System.out.println(done); boolean cancelled = future.isCancelled(); System.out.println(cancelled); Integer integer = future.get(); System.out.println(integer); boolean done1 = future.isDone(); System.out.println(done1); } }
class MyCallable implements Callable<Integer>{ private int a; private int b;
public MyCallable(int a, int b) { this.a = a; this.b = b; }
@Override public Integer call() throws Exception { String name = Thread.currentThread().getName(); System.out.println(name+"开始计算结果...."); Thread.sleep(2000); System.out.println(name+"结束计算...."); return a+b; } }
|
总结
线程池的使用步骤可以归纳总结为五步 :
- 利用Executors工厂类的静态方法,创建线程池对象。
- 编写Runnable或Callable实现类的实例对象。
- 利用ExecutorService的submit方法或ScheduledExecutorService的schedule方法提交并执行线程任务。
- 如果有执行结果,则处理异步执行结果(Future)。
- 调用shutdown()方法,关闭线程池。
创建线程池的几种方式:
在 Java 语言中,并发编程都是通过创建线程池来实现的,而线程池的创建方式也有很多种,每种线程池的创建方式都对应了不同的使用场景,总体来说线程池的创建可以分为以下两类:
- 通过 ThreadPoolExecutor 手动创建线程池。
- 通过 Executors 执行器自动创建线程池。
而以上两类创建线程池的方式,又有 7 种具体实现方法,这 7 种实现方法分别是:
Executors.newFixedThreadPool
:创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待。
Executors.newCachedThreadPool
:创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程。
Executors.newSingleThreadExecutor
:创建单个线程数的线程池,它可以保证先进先出的执行顺序。
Executors.newScheduledThreadPool
:创建一个可以执行延迟任务的线程池。
Executors.newSingleThreadScheduledExecutor
:创建一个单线程的可以执行延迟任务的线程池。
Executors.newWorkStealingPool
:创建一个抢占式执行的线程池(任务执行顺序不确定)【JDK 1.8 添加】。
ThreadPoolExecutor
:手动创建线程池的方式,它创建时最多可以设置 7 个参数。
业务场景练习
模拟银行取款业务
设计一个程序,使用两个线程模拟在两个地点同时从一个账号中取钱,假如卡中一共有1000元,每个线程取800元,要求演示结果一个线程取款成功,剩余200元,另一个线程取款失败,余额不足。
要求:
思路提示:
- 线程池可以利用Executors工厂类的静态方法,创建线程池对象。
- 解决线程安全问题可以使用synchronized方法控制取钱的操作。
- 在取款前,先判断余额是否足够,且保证余额判断和取钱行为的原子性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package cn.lrwjz.threadpool;
import java.util.concurrent.*;
public class WithdrawalTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(4); Future<Integer> future = null; for (int i = 1; i <= 3; i++) { future = pool.submit(new MyTask("用户:" + i, 400));
} Integer balance = future.get(); System.out.println("余额为:"+ balance); pool.shutdown(); } }
class MyTask implements Callable<Integer> { private static int money = 1000; private String userName; private int withdrawal;
public MyTask(String userName, int withdrawal) { this.userName = userName; this.withdrawal = withdrawal; }
@Override public Integer call() throws Exception { String name = Thread.currentThread().getName(); System.out.println(userName+"正在取款,通过"+name+"进行...."); Thread.sleep(2000); synchronized (MyTask.class) { if (money > 0 && withdrawal <= money) { System.out.println(userName + "通过" + name + "取款成功...."); money = money - withdrawal;
} else { System.out.println(userName + "通过" + name + "取款失败,余额不足...."); } } return money; } }
|
执行结果:
