多线程与线程池

Soul Lv2

今天我们来稍微了解一下线程与线程池的相关知识

多线程的基本操作

我们首先从最基本的Thread类讲起。我们知道多线程的基本使用方法大概如下

1
2
3
4
5
6
7
8
public class Main {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
System.out.println("Hello World");
});
thread.start();
}
}

每一个Thread对象都是一个单独的线程,当调用这个对象的start方法时这个线程就会被启动。我们可以看看Thread类的构造方法

1
2
3
public Thread(Runnable task) {
this((ThreadGroup)null, (String)null, 0, task, 0L, (AccessControlContext)null);
}

emm具体的构造方法到底是怎么实现的暂时先不用管,我们这里只是用来明确我们刚才那个lambda表达式实际上是一个Runnable接口的实现,所以理所当然的,我们可以这样去实现多线程

1
2
3
4
5
6
public class Test implements Runnable {
@Override
public void run() {
System.out.println("Hello World");
}
}

然后把这这个类的对象直接丢到线程类的构造方法中。或者你也可以选择继承Thread类,此时可以选择重写Thread类的run方法,然后直接调用子类的start方法。通过这样的方法可以给线程中运行的程序添加一些需要使用的参数

但这种操作存在一个问题,run方法是void的,所以在某些情况下我们如果需要返回值,可以使用回调类Callable来实现

1
2
3
4
5
6
7
8
9
10
11
12
import java.util.concurrent.Callable;

class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
}
}

先随便定义一个回调类然后通过ExecutorService(实际上就是一个线程池)来运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Test {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);//新建线程池
MyCallable callable = new MyCallable();
Future<Integer> future1 = executor.submit(callable);
Future<Integer> future2 = executor.submit(callable);
//这里稍微解释一下Future类,当我们使用Future去保存异步方法的返回值时及时运算还没有完成程序也会继续向下运行,直到调用get方法获取值时如果没有运算完成才会造成阻塞
try {
System.out.println("线程 1 的结果:" + future1.get());
System.out.println("线程 2 的结果:" + future2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}

如果你有一定的基础,上面的这些内容对你来说一定是非常简单的,下面我们来聊聊睡眠与锁的问题

首先来看一个相当经典的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package Winter;

public class Test {
private static int a = 0;

public static void main(String[] args) {
Thread thread = new Thread(() -> {
for (int i = 1; i <= 10000; i++) {
a++;
}
});
Thread thread2 = new Thread(() -> {
for (int i = 1; i <= 10000; i++) {
a++;
}
});
thread.start();
thread2.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(a);
}

}

此时a的值有很大的可能不是20000,为什么?两个线程分别加了10000次,为什么不是两千呢。非常简单,我们想象一下这个相加的过程:a被保存在内存中,线程1会首先从内存中读取这个值,然后放到累加器中进行累加,再将得到的值写回内存。但是这个过程存在一个问题:线程2也在做同样的事情,如果在线程1读取之后写回之前线程2进行读取,那么必然会导致最终的运算结果小于2000.所以这个问题该怎么解决?

有一个相当愚蠢的做法是直接睡眠

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package Winter;

public class Test {
private static int a = 0;

public static void main(String[] args) {
Thread thread = new Thread(() -> {
for (int i = 1; i <= 10000; i++) {
a++;
}
});
Thread thread2 = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
for (int i = 1; i <= 10000; i++) {
a++;
}
});
thread.start();
thread2.start();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(a);
}

}

一个干完另一个继续干,这样就不会冲突了。但这样只能保证10000时不出问题,那么如果是一亿呢,睡1秒时间不够呢,如果是10呢,睡一秒又太过奢侈,所以我们这里又创造了更细致的控制方法,首先是interrupt方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package Winter;

public class Test {
private static int a = 0;

public static void main(String[] args) {
Thread thread = new Thread(() -> {
while (true){
if(Thread.currentThread().isInterrupted()){
break;
}
}//在没有收到中断信号之前始终处于死循环状态
Thread.interrupted();//将终端标记复位
for (int i = 1; i <= 10000; i++) {
a++;
}
});
Thread thread2 = new Thread(() -> {
for (int i = 1; i <= 10000; i++) {
a++;
}
thread.interrupt();//向线程1发送中断信号
});
thread2.start();
thread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(a);
}
}

我们可以看到通过这种方式可以灵活的控制一个线程的睡眠时间,确保结果正确

当然,其实java在这方面的控制方法还是相当丰富的,我们随便写几个

1
2
Thread.currentThread().suspend();//在本线程内调度,直接暂停某个线程
t.resume();//恢复t的运行

你可以试着使用这两个方法对上面的代码进行修改,这两个方法相较interrrupt方法的优越性在与线程的暂停本质上是使当前线程处于阻塞状态,此时当前线程不会占用CPU资源,而上面的循环却会无意义的占用CPU资源。

再看一看这个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
System.out.println("线程1开始运行!");
for (int i = 0; i < 50; i++) {
System.out.println("1打印:"+i);
}
System.out.println("线程1结束!");
});
Thread t2 = new Thread(() -> {
System.out.println("线程2开始运行!");
for (int i = 0; i < 50; i++) {
System.out.println("2打印:"+i);
if(i == 10){
try {
System.out.println("线程1加入到此线程!");
t1.join(); //在i==10时,让线程1加入,先完成线程1的内容,在继续当前内容
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
t1.start();
t2.start();
}

这里使用到了join方法,join方法其实可以认为是刚才上面的三个方法的结合,首先将t2阻塞,然后等待t1运行完成后t2收到信号恢复运行。

接下来继续想办法改进这一段代码,我们需要引入锁的概念,我们之前提到了之所以在多线程状态下会出现问题是因为不同哦ing线程在竞争同一个资源,所以我们之前的解决方法是优先使某个线程使用该资源,在一个线程使用完成后再让其他线程使用。但是一个线程中真正使用资源的时间只占一部分,那么我们能不能想办法把这部分资源节约出来呢?我们就需要使用synchronized关键字

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package Winter;

public class Test {
private static int a = 0;

public static void main(String[] args) {
Thread thread = new Thread(() -> {
synchronized (Test.class) {
for (int i = 1; i <= 10000; i++) {
a++;
}
}

});
Thread thread2 = new Thread(() -> {
synchronized (Test.class) {
for (int i = 1; i <= 10000; i++) {
a++;
}
}
});
thread2.start();
thread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(a);
}

}

所有的synchronized内的代码只能有一个线程执行,其余线程处于阻塞状态(其实你可以试着不用这个关键字而使用上面提到的方法来实现差不多的效果)

synchronized关键自后括号内的是被加锁的对象,我们称这种锁为对象锁,该对象的锁只能被一个线程持有,只有持有锁的线程才能执行被加锁的代码。(这里使用的是Test类的Class对象),但不同的对象锁之间相互无影响

当然,我们也有更灵活的写法,

1
2
3
private static synchronized void add(){
value++;
}

这种写法被成为方法锁,只能有一个线程调用该方法。当然,这种写法实质上和对象锁是同一种东西,如果是静态方法,加锁对象为类的Class对象,如果是动态方法,加锁对象为方法所在对象。

使用锁可以极大的发挥多线程优势,当然,要注意下面这种情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) throws InterruptedException {
Object o1 = new Object();
Object o2 = new Object();
Thread t1 = new Thread(() -> {
synchronized (o1){//t1先启动,开始持有o1锁
try {
Thread.sleep(1000);
synchronized (o2){//在持有o1锁的同时需要使用o2锁,等待o2锁,当前线程阻塞
System.out.println("线程1");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread t2 = new Thread(() -> {
synchronized (o2){//线程2持有o2锁
try {
Thread.sleep(1000);
synchronized (o1){//等待o1锁,线程阻塞
System.out.println("线程2");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
}

此时两个线程均开始等待对方持有的锁,同时陷入阻塞,那么这两个线程都永远不会被唤醒,这种状态我们称之为死锁。

另外,考虑到某些需求,我们需要在满足某些条件时主动的释放锁,交给其他线程,此时可以这样写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void main(String[] args) throws InterruptedException {
Object o1 = new Object();
Thread t1 = new Thread(() -> {
synchronized (o1){
try {
System.out.println("开始等待");
o1.wait(); //进入等待状态并释放锁
System.out.println("等待结束!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread t2 = new Thread(() -> {
synchronized (o1){
System.out.println("开始唤醒!");
o1.notify(); //唤醒处于等待状态的线程
for (int i = 0; i < 50; i++) {
System.out.println(i);
}
//唤醒后依然需要等待这里的锁释放之前等待的线程才能继续
}
});
t1.start();
Thread.sleep(1000);
t2.start();
}

这种方法可以很好的控制锁的获取与释放。此外还有一个notifyall方法,这个方法可以直接唤醒所有处于等待的线程,而上面提到的notify只能唤醒随机的一个

一些简单的进阶知识

synchronized的实现机制

首先回答我一个问题:锁机制的底层实现是怎样的?你可以猜一猜。

在比较久远的时代,Java中的synchronized被成为重量级锁,实质上将获取锁与释放锁的操作映射为CPU实际线程的阻塞与唤醒,这一过程由操作系统负责调度。但是有一点令人不爽:实际的阻塞与唤醒操作是一种开销很大的操作,过于频繁的阻塞与唤醒会消耗大量资源,后来人们针对性的做出了一定的改进。

改进的成果被称为自旋锁,什么是自旋锁?当线程需要获取锁时不再被阻塞,而是进入一个循环反复试图获取锁。等等,这难道不是越改经越回去了吗?但是仔细想想,在一个锁的所有权被频繁切换的场景下,这种操作是不是就可理解多了,最多不过是浪费几次循环的算力,相比阻塞与唤醒的操作,其消耗的资源少得多。但是在另一个方面,如果锁的切换不是那么频繁呢?那样就会消耗大量资源,所以经过改进的synchronized 实际上在初始状态下是一个自旋锁,在自旋时间超过一定的界限后就自动转换为一个重量级锁。

但改进并没有停止,既然有重量级锁,那么必然有轻量级锁。轻量级锁是一种乐观锁,上面提到的两种锁都总是认为有其他的线程想要和自己抢夺资源,所以总是要自己实质上拥有锁,但乐观锁不这么认为,乐观锁如轻量级锁总是认为没有人和自己争抢锁,所以在运行时直接读内存并进行计算,但同时保存内存中的初始值,在运算完成后先将保存的初始值与当前内存中的值做比较,如果内存中的值与刚刚保存的值一致,说明这期间没有其他线程修改,直接将运算结果写回,如果发生了变化则说明有其他线程进行了修改,放弃此次操作,重头开始,如果这种机制多次失败,那么向上转化为自旋锁

但优化还没有停止,人们又创造出了偏向锁。人们注意到,在某些情况下,一个对象很多时候都只被一个线程使用,所以直接在这个锁内部添加添加一条信息用于记录线程的id,如果id匹配,那么当前线程直接将这个对象当作无锁对象使用,直到有别的线程来尝试获取这个锁,此时偏向锁向轻量级锁升级。

可以看到synchronized的机制实质上是一个逐渐升级的过程,从偏向锁开始逐渐的随着竞争的程度向上升级直到使用重量级锁。这个流程被称为锁粗化。

内存可见性问题

我们来看一看下面这段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Test {  
private static int a = 0;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
while (a == 0);
System.out.println("线程结束!");
}).start();

Thread.sleep(1000);
System.out.println("正在修改a的值...");
a = 1;
}
}

表面上看这段代码似乎不存在任何问题,但实际上这玩意会变成一个死循环,为什么呢?这里我们可以简单的介绍以下java的内存模型

java的内存分为主内存和工作内存两个部分,所有的线程在实际的操作中会先从主内存读取值并存储到自己的工作内存中,然后进行使用,在更新值之后再将值写回主内存,其余时间一直在使用工作内存中的值。在上面的例子中,主线程更新了主内存中的值,但另一个线程实际上一直在循环使用工作线程中的值,并不知道主内存中的值已经被更新,导致出现死循环。

这个问题到底该如何解决?最直接的办法就是加锁,当一个对象被加锁后每次调用时都会从工作内存刷新这个值,可以参考下面的这段代码,此时受到锁机制的影响,代码的循环是有限的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Main {
private static int a = 0;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
while (a == 0) {
synchronized (Main.class){}
}
System.out.println("线程结束!");
}).start();

Thread.sleep(1000);
System.out.println("正在修改a的值...");
synchronized (Main.class){
a = 1;
}
}
}

这里稍微解释一下,加锁的对象的Main的Class对象,Main类的静态属性正是包含在Class对象中的,此时对Class对象加锁,那么每次获得锁时都会从主内存内获取属性的实际值

但是除了这种方法以外再没有别的办法了吗,还是有的,这里我们再介绍一个关键字volitale,我们可以重新修改之前的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Main {
//添加volatile关键字
private static volatile int a = 0;

public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
while (a == 0) ;
System.out.println("线程结束!");
}).start();

Thread.sleep(1000);
System.out.println("正在修改a的值...");
a = 1;
}
}

此时的代码也不会出现问题,这个关键字的功能在于两点

  • 保证不同线程之间数据的可见性
  • 阻止编译器的重排序,编译器在编译时可能对部分代码进行优化,其中一个比较重要的过程就是进行重排序调整代码的执行顺序,但是在多线程环境下这种重排序是可能出现问题的,所以对于跨线程调用的变量即使不加锁也至少应该使用这个关键字

当然,还有一个小知识点,既然我们可以要求某个变量在多个线程内可见,那么也自然可以创造出仅在线程的工作内存内使用的属性,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package Winter;

public class Main {
public static void main(String[] args) {
ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
threadLocal.set(0);
Thread thread = new Thread(() -> {
threadLocal.set(1);
System.out.println(threadLocal.get());
});
thread.start();
System.out.println(threadLocal.get());

}
}

ThreadLocal类中的值存储在每个线程的工作内存中,各自独立,互不干涉。可以被用来实现一些比较奇怪的需求,不过我个人似乎没怎么用过。

现代锁框架

接下来我们聊一聊现代锁框架。从java5开始,java的锁机制发生了一次重大更新,诞生了除了synchronized之外的另一套锁机制,这套锁机制更加的灵活,这里简单的介绍一下。

现代锁框架主要涉及Lock类,Condition类以及几个原子类,我们首先可以看一看Lock类的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Lock {  
void lock();//获取锁,如果拿不到锁会造成阻塞

void lockInterruptibly() throws InterruptedException;//获取锁且响应中断

boolean tryLock(); //尝试获取锁,但不会造成阻塞

boolean tryLock(long var1, TimeUnit var3) throws InterruptedException;//尝试获取锁,其中两个参数用来设置最大等待时间

void unlock(); //释放锁

Condition newCondition(); //这个下面讲
}

这里再对lockInterruptibly方法做一点解释,这个方法提供了对中断的支持,我们前面提到每个线程都会有一个interrupt方法,这个方法不会真的打断线程,只是对线程进行一个通知,Lock类中的lockInterruptibly方法可以监测这个通知,当收到通知时即使处在阻塞状态也会直接抛出InterruptedException异常

此时的锁从一个依赖于具体对象的关键字变成了一个实际存在的锁对象,这在设计上显然符合Java的万物皆对象的思想,接下来我们可以来简单的了解一下Condition类,这个类可以被认为是Object类中wait方法的上位替代,可以简单的看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface Condition {  
void await() throws InterruptedException; //相当与wait

void awaitUninterruptibly(); //相当于wait但不会被中断

long awaitNanos(long var1) throws InterruptedException;// 等待固定的时间,如果在时间内唤醒则返回剩余时间,如果超时则返回负数,注意单位是纳秒

boolean await(long var1, TimeUnit var3) throws InterruptedException;//同上但支持更细致的时间控制

boolean awaitUntil(Date var1) throws InterruptedException; //等待到固定的时间点

void signal(); //唤醒随机的一个

void signalAll(); //唤醒全部
}

一个Lock下可以包含多个Conditon,来实现更细致的控制。

特别说明,上面两个类的方法中涉及类TimeUtil类,这是一个枚举类,指的是时间的单位

可重入锁

接下来介绍Lock接口的两个重要实现:可重入锁与读写锁,我们先来了解一下可重入锁。
所谓的可重入锁指的是可以被多次加锁的锁对象,大概就像这样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Main {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
lock.lock();//可以反复加锁
lock.lock();
ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
threadLocal.set(0);
Thread thread = new Thread(() -> {
lock.lock();//尝试获得锁,进入阻塞
System.out.println("获得锁");
});
thread.start();
lock.unlock();
System.out.println("第一次释放锁");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("第二次释放锁");
lock.unlock();

}
}

可以看到我们在第一次释放锁后thread并没有获得锁,而是等到两次释放锁之后才真正获得了锁。

此处我们进一步引入公平锁与非公平锁的概念,两者的区别如下

  • 公平锁中维护了一个等待队列,所有尝试获取锁的线程都将进入这个队列,按照先进先出的顺序来获取锁
  • 非公平锁中同样拥有一个等待队列,但此时当一个线程需要获取锁时会尝试先获取锁,如果获取失败则进入等待队列

我们上面展示的可重入锁就是非公平锁,不过我们也可以选择公平锁模式,就像这样

1
Lock lock = new ReentrantLock(true);

此时的锁就是一个公平锁

读写锁

接下来我们介绍读写锁,这种锁的创造时考虑到了这样一个事实:对同一个变量,部分线程只需要读取,部分线程需要修改,而对于只有读取需求的线程来说,可以不占用锁。在这种情况下,读写锁将锁分成了两部分:

  • 读锁:在没有任何线程占用写锁的情况下可以被多个线程获取
  • 写锁:在没有任何线程占用读锁的情况下可以被唯一一个线程获取

大致的使用方式如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Main {
public static void main(String[] args) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
Thread thread = new Thread(() -> {
System.out.println("加写锁");
lock.writeLock().lock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("释放写锁");
lock.writeLock().unlock();
System.out.println("加读锁");
lock.readLock().lock();
});
thread.start();
lock.writeLock().lock();
System.out.println("获得写锁");
lock.writeLock().unlock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
lock.readLock().lock();
System.out.println("获取写锁");

}

}

这种锁在部分线程需要读取部分线程需要写入时很好的满足了需求

原子类

有些时候我们只需要一些简单的操作,此时为了简化操作,人们又创造了原子类,类中所有的方法都是线程安全的。原子类包含三种

  • AtomicInteger:原子更新int
  • AtomicLong:原子更新long
  • AtomicBoolean:原子更新boolean
    由于在使用上比较简单,我这里只放一个例子,就不做具体的解释了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Main {
private static AtomicInteger i = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
Runnable r = () -> {
for (int j = 0; j < 100000; j++)
i.getAndIncrement();
System.out.println("自增完成!");
};
new Thread(r).start();
new Thread(r).start();
TimeUnit.SECONDS.sleep(1);
System.out.println(i.get());
}
}

并发容器

最后再介绍一下jdk提供的并发容器,我们之前习惯的ArrayList,HashMap等容器实际上是线程不安全的,我们这里可以使用线程安全的并发容器,比较常用的包括

  • CopyOnWriteArrayList
  • ConcurrentHashMap
  • BlockingQueue
    由于在使用时与普通容器没什么大的区别,这里就不做介绍了

线程池

接下来我们来了解一下java的线程池相关内容,线程的创建与销毁是一个开销很大的工作,所以我们倾向于在有长期使用需要时创建一个线程池,一次性创建足够的线程病保存在池中,之后只使用池中线程
java提供了一个原生的线程池实现,大概使用方式如下

1
2
3
4
5
6
7
8
9
10
11
12
public class Main {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(200));
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
System.exit(0);
}
}

接下来我们去底层看一看到底是做了什么,下面是全参的构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
this.mainLock = new ReentrantLock();
this.workers = new HashSet();
this.termination = this.mainLock.newCondition();
if (corePoolSize >= 0 && maximumPoolSize > 0 && maximumPoolSize >= corePoolSize && keepAliveTime >= 0L) {
if (workQueue != null && threadFactory != null && handler != null) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
String name = Objects.toIdentityString(this);
this.container = SharedThreadContainer.create(name);
} else {
throw new NullPointerException();
}
} else {
throw new IllegalArgumentException();
}
}

我们先简单的看一下各个参数

1
2

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {

下面是名词解释:

  • corePoolSize:核心线程数量
  • maximumPoolSize:最大线程数量
  • keepAliveTime:非核心线程等待时间
  • unit:等待时间的单位
  • workQueue:任务队列
  • threadFactory:线程工厂
  • handaler:拒绝策略
    线程池中的线程被分为核心线程与非核心线程,所有的任务都被放入任务队列中,线程从任务队列中获取任务进行执行,如果所有核心线程都有任务,就注册新的非核心线程进行处理,如果达到最大线程数量就将任务保留在任务队列中,如果任务队列已满,则按照拒绝策略进行处理

我们再来看一看exucute方法具体的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
} else {
int c = this.ctl.get();//此处稍微解释一下,ctl这个变量同时存储了线程池中工作线程的数量和线程池的状态,需要通过位运算分离
//如果正在工作的线程数小于核心线程数,直接将任务给核心线程中
if (workerCountOf(c) < this.corePoolSize) {
if (this.addWorker(command, true)) {
return;
}

c = this.ctl.get();//此时没有成功的插入任务,说明发生了某种异常,重新获取ctl
}
//isRunning方法检查线程池是否正在运行,尝试将任务放到任务队列
if (isRunning(c) && this.workQueue.offer(command)) {
int recheck = this.ctl.get();//再次检查线程池的工作状态
if (!isRunning(recheck) && this.remove(command)) {
this.reject(command);//此分支说明线程池被设置为拒绝加入,使用拒绝策略
} else if (workerCountOf(recheck) == 0) {
this.addWorker((Runnable)null, false);//此分支说明线程池中无线程
}
} else if (!this.addWorker(command, false)) {//尝试将线程放到非核心线程中
this.reject(command);
}

}
}

这里有一个额外需要解释的点事ctl,这个原子整数中保存了一个32位的int,其中部分位用来保存线程池的状态,部分位用来保存线程池中线程的数量,通过位运算获得具体位的数字得到信息

我们可以继续深入,看看addWorker方法干了些什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private boolean addWorker(Runnable firstTask, boolean core) {
for(int c = this.ctl.get(); !runStateAtLeast(c, 0) || !runStateAtLeast(c, 536870912) && firstTask == null && !this.workQueue.isEmpty(); c = this.ctl.get()) {
while(workerCountOf(c) < ((core ? this.corePoolSize : this.maximumPoolSize) & 536870911)) {//这个奇奇怪怪的长整数转换为二进制为 `00011111111111111111111111111111`,是用来和ctl做位运算获得线程池的不同信息的
if (this.compareAndIncrementWorkerCount(c)) {//此处利用CAS操作增加c的值
c = 0;
boolean workerAdded = false;
Worker w = null;

try {
w = new Worker(firstTask);//创建一个Worker,其实就是Thread套皮
Thread t = w.thread;
if (t != null) {
ReentrantLock mainLock = this.mainLock;
mainLock.lock();

try {
int c = this.ctl.get();
if (isRunning(c) || runStateLessThan(c, 536870912) && firstTask == null) {
if (t.getState() != State.NEW) {
throw new IllegalThreadStateException();
}

this.workers.add(w);
workerAdded = true;
int s = this.workers.size();
if (s > this.largestPoolSize) {
this.largestPoolSize = s;
}
}
} finally {
mainLock.unlock();
}

if (workerAdded) {
this.container.start(t);
c = 1;
}
}
} finally {
if (!c) {
this.addWorkerFailed(w);
}

}

return (boolean)c;
}
}

return false;
}

return false;
}

看不懂没关系,我们来直接自己实现一个线程池,只实现最基本的功能,一切追求简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class ThreadPool {
private AtomicInteger coreThreadCount;
private AtomicInteger maxThreadCount;
private ArrayBlockingQueue<Runnable> workQueue;
private AtomicInteger activeThreadCount;
private long activeTime;

public ThreadPool(int coreThreadCount, int maxThreadCount,int workQueueSize, int activeThreadCount) {
this.coreThreadCount = new AtomicInteger(coreThreadCount);
this.maxThreadCount = new AtomicInteger(maxThreadCount);
this.workQueue = new ArrayBlockingQueue<>(workQueueSize);
this.activeThreadCount = new AtomicInteger(0);
this.activeTime = activeThreadCount;
}//简单的构造方法,全都是直接赋值

public void excute(Runnable task) {
if(activeThreadCount.get()<coreThreadCount.get()){
Thread thread = new Thread(()->{
task.run();
while(true){
Runnable r = workQueue.poll();//这里直接利用阻塞队列的特性,如果队列中没有任务会陷入阻塞
if(r!=null){
r.run();
}
}
});//核心线程反复尝试获取任务,注册后用不销毁
thread.start();
activeThreadCount.incrementAndGet();
coreThreadCount.incrementAndGet();
} else if (activeThreadCount.get()<maxThreadCount.get()) {
Thread thread = new Thread(()->{
task.run();
while(true){
try {
Runnable r = workQueue.poll(activeTime,TimeUnit.MILLISECONDS);
//此处设置非核心线程的等待时间,如果时间超过了设置的时间还没有任务则退出
if(r!=null){
r.run();
}
}catch (InterruptedException e){
activeThreadCount.decrementAndGet();//记得在销毁线程时活动线程数减1
break;
}
}
});
thread.start();
activeThreadCount.incrementAndGet();
}else {
throw new RuntimeException("超出最大线程数量");
}
}

}

我们总计使用了56行就实现了一个最为简单的线程池,当然,你也可以试着自己优化这个线程池,加点什么状态控制,拒绝策略等等,但后面都是一些很好实现的东西了。

工具类

最后的最后我们还需要了解一下几个简单的工具类

CountDownLatch计数器锁

假设我们存在一个多线程任务,我们需要所有线程都完成后再在主线程执行下一步,各个线程执行时间未知,该怎么实现?我们可以使用计数器锁,大概如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Main {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(10);//计数器中有10个数
Runnable runnable = new Runnable() {
public void run() {
try{
Thread.sleep(1000);
countDownLatch.countDown();//每次调用这个方法都将初始的值减1
}catch (InterruptedException e){
throw new RuntimeException(e);
}
}
};
for (int i = 0; i < 10; i++) {
new Thread(runnable).start();
}
countDownLatch.await();//等待减到0才会执行下一步
System.out.println("All done");

}
}

CyclicBarrier循环屏障

上面我们使用for循环来启动了10个线程,但在某些需求下我们可能会要求线程同步启动,此时可以使用循环屏障

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(10, //创建一个初始值为10的循环屏障
() -> System.out.println("飞机马上就要起飞了,各位特种兵请准备!")); //人等够之后执行的任务
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
try {
Thread.sleep((long) (2000 * new Random().nextDouble()));
System.out.println("玩家 "+ finalI +" 进入房间进行等待... ("+barrier.getNumberWaiting()+"/10)");

barrier.await(); //调用await方法进行等待,直到等待的线程足够多为止

//开始游戏,所有玩家一起进入游戏
System.out.println("玩家 "+ finalI +" 进入游戏!");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}

直到达到要求的线程数量所有线程才能进入下一步

此外,循环屏障可以被重复使用,每次达到约定的线程后都会重置;

Exchanger数据交换类

借助这个类我们可以实现不同线程之间的数据交换,大概如下

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
System.out.println("收到主线程传递的交换数据:"+exchanger.exchange("AAAA"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println("收到子线程传递的交换数据:"+exchanger.exchange("BBBB"));
}

我个人感觉有那么点鸡肋;

结语

好的,到这里多线程的部分就算结束了,下一期我们来研究一下HTTP解析,实现简单的网络通讯。

  • 标题: 多线程与线程池
  • 作者: Soul
  • 创建于 : 2025-04-21 10:40:35
  • 更新于 : 2025-04-29 17:42:39
  • 链接: https://soulmate.org.cn/posts/afff9093/
  • 版权声明: 本文章采用 CC BY-NC 4.0 进行许可。