JUC入门

JUC简介

在 Java 中,线程部分是一个重点,本篇文章说的 JUC 也是关于线程的。JUC 就是 java.util .concurrent 工具包的简称。这是一个处理线程的工具包,JDK 1.5 开始出现的。

多线程回顾

进程和线程

进程(Process) 指在系统中正在运行的一个应用程序;程序一旦运行就是进程;进程— —资源分配的最小单位。

线程(thread) 系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个 单元执行流。线程——程序执行的最小单位。

线程的状态

线程状态枚举类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
NEW, // 新建

/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
RUNNABLE, // 准备就绪

/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
BLOCKED, // 阻塞

/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
WAITING, // 不见不散

/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING, // 过时不候

/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
TERMINATED; // 终结
}

wait和sleep

wait和sleep的区别:

  • sleep 是 Thread 的静态方法。wait 是 Object 的方法,任何对象实例都能调用。
  • sleep 不会释放锁,它也不需要占用锁。wait 会释放锁,但调用它的前提是当前线程占有锁(即代码要在 synchronized 中)。
  • 它们都可以被 interrupted 方法中断。

并行和并发

串行并行并发的概念:

  • 串行:一个任务一个任务按顺序执行,互相阻塞,效率较低。
  • 并行:多个任务同时执行,每个任务有自己的执行线程或处理器核心,效率高。
  • 并发:多个任务在一段时间内交替执行,通过任务切换实现任务间共享处理器时间。

管程

在Java中,管程(Monitor)也叫监视器或者叫锁是一种并发编程的同步机制,用于在多线程环境下协调对共享资源的访问。管程提供了一种结构化的方式来管理共享资源,并确保多线程之间的同步和互斥访问。

在Java中,管程通常是通过使用synchronized关键字来实现的。synchronized关键字可以用于修饰方法或代码块,在使用synchronized修饰的方法或代码块内部,Java会自动地对当前对象(或指定的锁对象)进行加锁和解锁操作。

当多个线程同时访问被synchronized修饰的方法或代码块时,只有一个线程可以进入临界区(即被synchronized包围的代码块或方法),其他线程会被阻塞,直到进入临界区的线程执行完毕并释放锁。

简单来说,管程确保在同一时刻只有一个线程可以进入临界区,从而保证了共享资源的安全访问,避免了多个线程同时修改共享资源导致的数据不一致和竞争条件问题。

用户线程和守护线程

用户线程:平时用到的普通线程,自定义线程。

守护线程:运行在后台,是一种特殊的线程,比如垃圾回收。

当主线程结束后,用户线程还在运行,JVM 存活。

如果没有用户线程,都是守护线程,JVM 结束。

Synchronized

synchronized 是 Java 中的关键字,是一种同步锁。它修饰的对象有以下几种:

  • 修饰一个代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号{} 括起来的代码,作用的对象是调用这个代码块的对象;
  • 修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象;
  • 修改一个静态的方法,其作用的范围是整个静态方法,作用的对象是这个类的 所有对象;
  • 修改一个类,其作用的范围是 synchronized 后面括号括起来的部分,作用的对象是这个类的所有对象。

如果一个代码块被 synchronized 修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况:

  • 获取锁的线程执行完了该代码块,然后线程释放对锁的占有;
  • 线程执行发生异常,此时 JVM 会让线程自动释放锁。

Lock接口

Lock接口

1
2
3
4
5
6
7
8
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}

Lock与的Synchronized区别

  • Lock不是Java语言内置的,synchronized是Java语言的关键字,因此是内置特性。Lock是一个类,通过这个类可以实现同步访问;

  • Lock和synchronized有一点非常大的不同,采用synchronized不需要用户去手动释放锁,当synchronized方法或者synchronized代码块执行完之后,系统会自动让线程释放对锁的占用;而Lock则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。

lock方法

lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。

采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。通常使用Lock来进行同步的话,是以下面这种形式去使用的:

1
2
3
4
5
6
7
8
9
Lock lock = ...; 
lock.lock();
try{
//处理任务
} catch(Exception ex){

} finally{
lock.unlock(); //释放锁
}

newCondition

关键字synchronized与wait()/notify()这两个方法一起使用可以实现等待/通知模式, Lock锁的newContition()方法返回Condition对象,Condition类也可以实现等待/通知模式。

用notify()通知时,JVM会随机唤醒某个等待的线程, 使用Condition类可以进行选择性通知, Condition比较常用的两个方法:

• await()会使当前线程等待,同时会释放锁,当其他线程调用signal()时,线程会重新获得锁并继续执行。

• signal()用于唤醒一个等待的线程。

注意:在调用Condition的await()/signal()方法前,也需要线程持有相关的Lock锁,调用await()后线程会释放这个锁,在singal()调用后会从当前Condition对象的等待队列中,唤醒一个线程,唤醒的线程尝试获得锁, 一旦获得锁成功就继续执行。

ReentrantLock

ReentrantLock,意思是“可重入锁”。

ReentrantLock是唯一实现了Lock接口的类,并且ReentrantLock提供了更多的方法。

ReadWriteLock

ReadWriteLock也是一个接口,在它里面只定义了两个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();

/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}

一个用来获取读锁,一个用来获取写锁。也就是说将文件的读写操作分开,分成2个锁来分配给线程,从而使得多个线程可以同时进行读操作。下面的ReentrantReadWriteLock实现了ReadWriteLock接口。

ReentrantReadWriteLock里面提供了很多丰富的方法,不过最主要的有两个方法:readLock()和writeLock()用来获取读锁和写锁。

注意:

  • 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
  • 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。

Lock和synchronized的区别

  1. Lock是一个接口,而synchronized是Java中的关键字,synchronized是内置的语言实现;
  2. synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁;
  3. Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;
  4. 通过Lock可以知道有没有成功获取锁,而synchronized却无法办到。
  5. Lock可以提高多个线程进行读操作的效率。

在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。

synchronized实现案例

1
2
3
4
5
6
7
8
9
public class Ticket {
private Integer count = 30;

public synchronized void sale(){
if (count > 0){
System.out.println(Thread.currentThread().getName() + "卖出第" + (count--) + "张票");
}
}
}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(() -> {
for (int i = 0; i < 30; i++) {
ticket.sale();
}
},"AA").start();

new Thread(() -> {
for (int i = 0; i < 30; i++) {
ticket.sale();
}
},"BB").start();

new Thread(() -> {
for (int i = 0; i < 30; i++) {
ticket.sale();
}
},"CC").start();
}

lock实现案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class LTicket {
private Integer count = 30;

private ReentrantLock lock = new ReentrantLock();

public void sale(){
lock.lock();
try {
if (count > 0){
System.out.println(Thread.currentThread().getName() + "卖出第" + (count--) + "张票");
}
} catch (Exception e){

} finally {
lock.unlock();
}
}
}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) {
LTicket ticket = new LTicket();
new Thread(()->{
for (int i = 0; i < 30; i++) {
ticket.sale();
}
}, "AA").start();

new Thread(()->{
for (int i = 0; i < 30; i++) {
ticket.sale();
}
}, "BB").start();

new Thread(()->{
for (int i = 0; i < 30; i++) {
ticket.sale();
}
}, "CC").start();
}

线程间通信

线程间通信的模型有两种:共享内存和消息传递。

场景—两个线程,一个线程对当前数值加1,另一个线程对当前数值减1,要求用线程间通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class ThreadCommunication {

// 资源类
static class Operator {
//加减对象
private int number = 0;
//声明锁
private Lock lock = new ReentrantLock();
//声明钥匙
private Condition condition = lock.newCondition();

/**
* 加 1
*/
public void increment() {
try {
lock.lock();
while (number != 0) { // 该处不能使用if进行判断,会产生虚假唤醒的问题
condition.await(); // 在哪里等待,就会在哪唤醒
}
number++;
System.out.println(Thread.currentThread().getName() + "加一成功,值为:" + number);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

/**
* 减一
*/
public void decrement() {
try {
lock.lock();
while (number == 0) {
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + "减一成功,值为:" + number);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
Operator operator = new Operator();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
operator.increment();
}
}, "AA").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
operator.decrement();
}
}, "BB").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
operator.increment();
}
}, "CC").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
operator.decrement();
}
}, "DD").start();
}
}
}

线程间定制化通信

问题: A 线程打印 5 次 A,B 线程打印 10 次 B,C 线程打印 15 次 C,按照 此顺序循环 10 轮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class CustomizedCommunication {

static class Operator {
//标志位
private int flag = 1;
//声明锁
private final Lock lock = new ReentrantLock();
//声明钥匙
private final Condition condition1 = lock.newCondition();
private final Condition condition2 = lock.newCondition();
private final Condition condition3 = lock.newCondition();


public void print5() {
try {
lock.lock();
while (flag != 1) { // 该处不能使用if进行判断,会产生虚假唤醒的问题
condition1.await(); // 在哪里等待,就会在哪唤醒
}
flag = 2;
System.out.println("AA*****");
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void print10() {
try {
lock.lock();
while (flag != 2) { // 该处不能使用if进行判断,会产生虚假唤醒的问题
condition2.await(); // 在哪里等待,就会在哪唤醒
}
flag = 3;
System.out.println("BB**********");
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void print15() {
try {
lock.lock();
while (flag != 3) { // 该处不能使用if进行判断,会产生虚假唤醒的问题
condition3.await(); // 在哪里等待,就会在哪唤醒
}
flag = 1;
System.out.println("CC***************");
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}



public static void main(String[] args) {
Operator operator = new Operator();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
operator.print5();
}
}, "AA").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
operator.print10();
}
}, "BB").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
operator.print15();
}
}, "CC").start();
}
}
}

小结:多线程编程步骤

  • 创建资源类,在资源类创建属性和操作方法

  • 创建多个线程,调用资源类的操作方法

  • 防止虚假唤醒问题

集合的线程安全

ArrayList线程安全问题

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadDemo {

/**
* list集合线程不安全问题
*/
public static void main(String[] args) {
// list集合线程不安全问题
// List<String> list = new ArrayList<>();
// 方案一:使用线程安全集合vector
// List<String> list = new Vector<>();
// 方案二:使用Collections工具类
// List<String> list = Collections.synchronizedList(new ArrayList<>());
// 方案三:使用JUC的CopyOnWriteArrayList,写时复制技术
List<String> list = new CopyOnWriteArrayList<>();

for (int i = 0; i < 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}).start();
}
}

}

ArrayList的add方法并发读写时发生java.util.ConcurrentModificationException异常,它并不是一个线程安全的集合。

使用Vector或Collections.synchronizedList来保证list并发执行时的线程安全(底层使用synchronized)。

CopyOnWriteArrayList

它相当于线程安全的ArrayList。和ArrayList一样,它是个可变数组;但是和ArrayList不同的时,它具有以下特性:

  1. 它最适合于具有以下特征的应用程序:List 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突。
  2. 它是线程安全的。
  3. 因为通常需要复制整个基础数组,所以可变操作(add()、set() 和 remove() 等等)的开销很大。
  4. 迭代器支持hasNext(),next()等不可变操作,但不支持可变 remove()等操作。
  5. 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。

当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
private static final long serialVersionUID = 8673264195747942595L;

/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();

/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;

final Object[] getArray() {
return array;
}
final void setArray(Object[] a) {
array = a;
}

/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

小结:解决List集合线程不安全问题

  • 使用线程安全集合vector
  • 使用Collections工具类
  • 使用JUC的CopyOnWriteArrayList,写时复制技术

HashSet线程安全问题

测试代码:

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) {
// Set<String> list = new HashSet<>();
// Set<String> list = Collections.synchronizedSet(new HashSet<>());
Set<String> list = new CopyOnWriteArraySet<>();
for (int i = 0; i < 20; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,6));
System.out.println(list);
}, "aa").start();
}
}

使用Collections.synchronizedSet或CopyOnWriteArraySet解决Set集合线程安全问题。

HashMap线程安全问题

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
// Map<String, String> list = new HashMap<>();
// Map<String, String> list = Collections.synchronizedMap(new HashMap<>());
Map<String, String> list = new ConcurrentHashMap<>();
for (int i = 0; i < 20; i++) {
String key = String.valueOf(i);
new Thread(() -> {
list.put(key, UUID.randomUUID().toString().substring(0,6));
System.out.println(list);
}, "aa").start();
}
}

使用Collections.synchronizedMap或ConcurrentHashMap解决Map集合线程安全问题。

多线程锁

同步锁基础

synchronized实现同步的基础:Java中的每一个对象都可以作为锁。

具体表现为以下3种形式:

  • 对于普通同步方法,锁是当前实例对象。
  • 对于静态同步方法,锁是当前类的Class对象。
  • 对于同步方法块,锁是Synchonized括号里配置的对象。

公平锁和非公平锁

ReentrantLock构造方法根据传入参数创建公平锁或非公平锁,默认为非公平锁。非公平锁可能会有线程饿死,但效率高,避免了多线程的上下文切换所带来的开销。

NonfairSync源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

FairSync源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

可重入锁

可重入锁(Reentrant Lock),也称为递归锁,是一种支持同一个线程多次获取同一个锁的锁机制。换句话说,当一个线程持有了某个锁,再次尝试获取这个锁时,会成功获取而不会被阻塞,而其他线程在没有释放锁之前是无法获取这个锁的。

可重入锁的设计目的是为了解决线程递归调用或多层次调用同步代码的情况。如果不支持可重入锁,当一个线程已经持有了某个锁,再次尝试获取这个锁时,会产生死锁,因为它自己已经持有了这个锁,但是又在等待自己释放锁,形成了循环等待。

在Java中,synchronized关键字和ReentrantLock类都是可重入锁。当一个线程获取了使用synchronized或ReentrantLock修饰的锁之后,可以再次进入被同一把锁保护的代码块而不会被阻塞。

表锁和行锁

表锁(Table Lock)和行锁(Row Lock)是数据库中的两种不同的锁机制,用于控制对表和行的并发访问。

  1. 表锁(Table Lock):
    • 表锁是对整张表进行锁定,当一个事务获取了表锁后,其他事务无法对表进行修改或访问,直到持有表锁的事务释放锁。
    • 表锁适用于一些特定的场景,例如备份表、重建索引等操作,可以保证在执行这些操作时不会有其他事务对表进行干扰。
    • 表锁对于并发性能来说比较低,因为它会阻塞其他事务对表的访问。
  2. 行锁(Row Lock):
    • 行锁是对表中的单行数据进行锁定,当一个事务获取了某一行的行锁后,其他事务可以继续访问表的其他行,但无法访问被锁定的行,直到持有行锁的事务释放锁。
    • 行锁适用于并发较高的场景,它允许多个事务同时对表进行读取,只有在写入数据时才会对相关行进行行锁定。
    • 行锁可以提高并发性能,但在高并发情况下也可能导致死锁和性能问题,因此需要谨慎使用。

悲观锁和乐观锁

悲观锁和乐观锁是两种不同的并发控制机制,用于处理多线程环境下对共享资源的访问。

  1. 悲观锁:
    • 悲观锁假设在整个事务期间会有其他线程试图访问共享资源,并认为并发冲突是常态。因此,在悲观锁的机制下,当一个线程访问共享资源时,会将资源锁定,其他线程在访问该资源时必须等待锁的释放。
    • 悲观锁适用于写操作较多、读操作较少的场景,因为它在读取数据时也会阻塞其他的读操作,从而避免读-写冲突。
  2. 乐观锁:
    • 乐观锁假设在整个事务期间不会有其他线程试图修改共享资源,并认为并发冲突是不常见的。因此,在乐观锁的机制下,当一个线程访问共享资源时,并不会立即锁定资源,而是先进行读取操作,然后在写入操作前检查资源是否发生了变化。如果资源没有发生变化,则允许写入,否则需要进行冲突解决。
    • 乐观锁适用于读操作较多、写操作较少的场景,因为它允许多个读操作同时进行,避免了读-读冲突。

死锁

什么是死锁:

死锁是在多线程编程中常见的一种问题,它发生在两个或多个线程彼此持有对方需要的资源,同时又等待对方释放资源,从而导致所有线程都无法继续执行的情况。

产生死锁的原因:

  1. 竞争资源:多个线程同时竞争有限的资源,并且每个线程都占有部分资源,而需要其他线程释放资源才能继续执行。
  2. 循环等待:多个线程之间形成循环等待资源的情况,每个线程都等待其他线程释放资源,导致了循环等待。
  3. 无法被抢占:线程在持有资源时,不能被其他线程抢占,只能自己主动释放,但线程在等待其他资源时又不能释放资源。
  4. 缺乏资源的释放:线程在获取资源失败后,不主动释放自己占有的资源,导致其他线程无法获取所需资源。

死锁代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void main(String[] args) {
Ticket t1 = new Ticket();
Ticket t2 = new Ticket();
new Thread(() -> {
synchronized (t1){
System.out.println("拿到锁t1,准备获取锁t2");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (t2){
System.out.println("获取锁t2");
}
}
}, "aa").start();

new Thread(() -> {
synchronized (t2){
System.out.println("拿到锁t2,准备获取锁t1");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (t1){
System.out.println("获取锁t1");
}
}
}, "bb").start();
}

死锁的验证方式:

线程检测工具:现代的开发工具和性能分析工具通常提供线程检测功能。例如,在Java中,可以使用一些工具如jstack、jvisualvm、Java Mission Control等来检测线程的状态和堆栈信息,从而判断是否存在死锁。

Callable接口

使用Callable接口创建线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {

FutureTask<Integer> futureTask = new FutureTask<>(() -> {
Thread.sleep(1000);
return 100;
});
new Thread(futureTask).start();

// 判断线程是否执行完毕
while (!futureTask.isDone()) {
System.out.println("wait...");
}

// 获取callable的call方法返回值
System.out.println(futureTask.get());

}
}

小结:Callable接口和Runnable接口的区别

实现Runnable接口需要实现run方法,实现Callable接口需要实现call方法。

  • 相比run方法,call方法可以有返回值。
  • 方法可以抛出异常。
  • 支持泛型的返回值。
  • 需要借助FutureTask类,获取返回结果。

JUC 三大辅助类

CountDownLatch

CountDownLatch类可以设置一个计数器,然后通过countDown方法来进行减1的操作,使用await方法等待计数器不大于0,然后继续执行await方法之后的语句。

  • CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞
  • 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞)
  • 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 减少计数案例
* 当所有同学走后,班长才可以关教室门
*/
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {

// 初始化计数器
CountDownLatch latch = new CountDownLatch(5);

for (int i = 1; i <= 5; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "同学走人");
// 计数器减一
latch.countDown();

}, String.valueOf(i)).start();
}

// 等待计数器变为0
latch.await();
System.out.println("班长走人,关门");
}
}

CyclicBarrier

CyclicBarrier循环栅栏,在使用中CyclicBarrier的构造方法第一个参数是目标障碍数,每次执行CyclicBarrier一次障碍数会加一,如果达到了目标障碍数,才会执行cyclicBarrier.await()之后的语句。

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 循环栅栏案例
* 集齐七颗龙珠召唤神龙
*/
public class CyclicBarrierTest {
public static final Integer NUMBER = 7;
public static void main(String[] args) {
// 设置等待线程的数量和当所有线程都到达栅栏时要执行的回调函数
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> System.out.println("七颗龙珠已集齐,召唤神龙!"));

for (int i = 1; i <= 7; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "星龙珠已被集齐!");
try {
cyclicBarrier.await(); // 等待其他线程
} catch (Exception e) {
throw new RuntimeException(e);
}
}, String.valueOf(i)).start();
}

}
}

Semaphore

Semaphore(信号量)是一种并发控制机制,用于管理对共享资源的访问。它可以控制多个线程同时访问某个共享资源的数量。Semaphore 位于 java.util.concurrent 包中,用于实现信号量机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 信号灯案例
* 六辆车停到三个停车位
*/
public class SemaphoreTest {
public static void main(String[] args) {
// 设置许可证,也就是停车位
Semaphore semaphore = new Semaphore(3);

// 开启六个线程,模拟六辆车
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
// 获取许可,抢占车位
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "号车抢到车位,开始停车!");
// 模拟停车时间
Thread.sleep(new Random().nextInt(5));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
// 释放许可
semaphore.release();
System.out.println(Thread.currentThread().getName() + "号车离开车位!");
}
}, String.valueOf(i)).start();
}
}
}

读写锁

读写锁介绍

现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。

针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁。

  1. 线程进入读锁的前提条件:

    • 没有其他线程的写锁。

    • 没有写请求, 或者有写请求,但调用线程和持有锁的线程是同一个(可重入锁)。

  2. 线程进入写锁的前提条件:

    • 没有其他线程的读锁。

    • 没有其他线程的写锁。

而读写锁有以下三个重要的特性:

(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。

(2)重进入:读锁和写锁都支持线程重进入。

(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。

读写锁案例实现

资源类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class CustomCache {

private volatile Map<String, String> cacheMap = new HashMap<>();

private ReadWriteLock rwLock = new ReentrantReadWriteLock();

public void put(String key, String value){
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "正在写入数据。。" + key);
TimeUnit.SECONDS.sleep(1);
cacheMap.put(key, value);
System.out.println(Thread.currentThread().getName() + "写完了" + key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rwLock.writeLock().unlock();
}
}

public String get(String key){
rwLock.readLock().lock();
String value = null;
try {
System.out.println(Thread.currentThread().getName() + "正在读数据。。" + key);
TimeUnit.SECONDS.sleep(1);
value = cacheMap.get(key);
System.out.println(Thread.currentThread().getName() + "读完了key=" + key + ",value=" + value);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
return value;
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
CustomCache cache = new CustomCache();
for (int i = 0; i < 5; i++) {
final int num = i;
new Thread(() -> {
cache.put(String.valueOf(num), String.valueOf(num));
}, String.valueOf(i)).start();
}

for (int i = 0; i < 5; i++) {
final int num = i;
new Thread(() -> {
cache.get(String.valueOf(num));
}, String.valueOf(i)).start();
}
}

小结:

  • 在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。
  • 在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。

原因:当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。

阻塞队列

队列概述

常用的队列主要有以下两种:

• 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性

• 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件(栈)

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起 。

为什么需要BlockingQueue:

好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了

在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

阻塞队列概念

当队列是空的,从队列中获取元素的操作将会被阻塞。

当队列是满的,从队列中添加元素的操作将会被阻塞。

试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素。

试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多 个元素或者完全清空,使队列变得空闲起来并后续新增。

BlockingQueue核心方法

drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static void main(String[] args) throws Exception {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

//第一组 抛出异常
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// System.out.println(blockingQueue.add("d"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
System.out.println("======================================================");

//第二组 特殊值
System.out.println(blockingQueue.offer("1"));
System.out.println(blockingQueue.offer("2"));
System.out.println(blockingQueue.offer("3"));
System.out.println(blockingQueue.offer("4"));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println("======================================================");

//第三组 阻塞
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// blockingQueue.put("d");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
System.out.println("======================================================");

//第四组 超时
System.out.println(blockingQueue.offer("a", 3l, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("b", 3l, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("c", 3l, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("d", 3l, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS));
}

线程池

什么是线程池

线程池其实就是一种多线程处理形式,处理过程中可以将任务添加到队列中,然后在创建线程后自动启动这些任务。这里的线程就是我们前面学过的线程,这里的任务就是我们前面学过的实现了Runnable或Callable接口的实例对象;

为什么使用线程池

使用线程池最大的原因就是可以根据系统的需求和硬件环境灵活的控制线程的数量,且可以对所有线程进行统一的管理和控制,从而提高系统的运行效率,降低系统运行运行压力;当然了,使用线程池的原因不仅仅只有这些,我们可以从线程池自身的优点上来进一步了解线程池的好处;

使用线程池有哪些优势

  • 降低资源消耗:通过重复利用现有的线程来执行任务,避免多次创建和销毁线程。

  • 提高响应速度:因为省去了创建线程这个步骤,所以在任务来的时候,可以立刻开始执行。

  • 提高线程的可管理性:线程池进行统一的分配、调优和监控。

  • 提供更多更强大的功能:线程池的可拓展性使得我们可以自己加入新的功能,比如说定时、延时来执行某些线程。

创建线程池的方式

在 Java 语言中,并发编程都是通过创建线程池来实现的,而线程池的创建方式也有很多种,每种线程池的创建方式都对应了不同的使用场景,总体来说线程池的创建可以分为以下两类:

  • 通过 ThreadPoolExecutor 手动创建线程池。
  • 通过 Executors 执行器自动创建线程池。

而以上两类创建线程池的方式,又有 7 种具体实现方法,这 7 种实现方法分别是:

  • Executors.newFixedThreadPool:创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待。
  • Executors.newCachedThreadPool:创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程。
  • Executors.newSingleThreadExecutor:创建单个线程数的线程池,它可以保证先进先出的执行顺序。
  • Executors.newScheduledThreadPool:创建一个可以执行延迟任务的线程池。
  • Executors.newSingleThreadScheduledExecutor:创建一个单线程的可以执行延迟任务的线程池。
  • Executors.newWorkStealingPool:创建一个抢占式执行的线程池(任务执行顺序不确定)【JDK 1.8 添加】。
  • ThreadPoolExecutor:手动创建线程池的方式,它创建时最多可以设置 7 个参数。

ThreadPoolExecutor参数解析

1
2
3
4
5
6
7
8
9
10
11
构造方法:
public ThreadPoolExecutor(int corePoolSize, //核心线程数量
int maximumPoolSize,// 最大线程数
long keepAliveTime, // 最大空闲时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 饱和处理机制
)
{ ... }

  • corePoolSize用于设置核心(Core)线程池数量。
    • 线程池接收到新任务,当前工作线程数少于corePoolSize, 即使有空闲的工作线程,也会创建新的线程来处理该请求,直到线程数达到corePoolSize。
  • maximumPoolSize用于设置最大线程数量。
    • 当前工作线程数多于corePoolSize数量,但小于maximumPoolSize数量,那么仅当任务排队队列已满时才会创建新线程。
    • maximumPoolSize被设置为无界值(如Integer.MAX_VALUE)时,线程池可以接收任意数量的并发任务。
  • BlockingQueue(阻塞队列)的实例用于暂时接收到的异步任务,如果线程池的核心线程都在忙,那么所接收到的目标任务缓存在阻塞队列中。
  • keepAliveTime空闲线程存活时间,当前的线程数量大于corePoolSize,那么在指定的时间后,这个空闲的线程将被销毁,这个指定的时间就是keepAliveTime。
  • unit空闲线程存活时间单位
  • threadFactory线程工厂,创建新线程的时候使用的工厂,可以用来指定线程名等等。