27Java多线程之线程间通信

合理的使用Java多线程可以更好地利用服务器资源。一般来讲,线程内部有自己私有的线程上下文,互不干扰。但是当需要多个线程间相互协作时,就需要掌握线程的通信方式。

通信方式

实际上只有进程间需要通信,同一进程的线程共享地址空间,没有通信的必要,但要做好同步/互斥,保护共享的全局变量。

而进程间通信无论是 信号、管道pipe、共享内存 都由操作系统保证,是系统调用。

进程间通信

管道(pipe)

管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。

有名管道 (namedpipe)

有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。

信号量(semaphore)

信号量是一个计数器,可以用来控制多个进程对共享资源的访问。它常作为一种锁机制,防止某进程正在访问共享资源时,其他进程也访问该资源。因此,主要作为进程间以及同一进程内不同线程之间的同步手段。

消息队列(messagequeue)

消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。

信号 (sinal)

信号是一种比较复杂的通信方式,用于通知接收进程某个事件已经发生。

共享内存(shared memory)

共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的 IPC 方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往与其他通信机制,如信号量,配合使用,来实现进程间的同步和通信。

套接字(socket)

套接口也是一种进程间通信机制,与其他通信机制不同的是,它可用于不同设备及其间的进程通信。

线程间通信

锁机制:包括互斥锁、条件变量、读写锁

互斥锁提供了以排他方式防止数据结构被并发修改的方法。
读写锁允许多个线程同时读共享数据,而对写操作是互斥的。
条件变量可以以原子的方式阻塞进程,直到某个特定条件为真为止。对条件的测试是在互斥锁的保护下进行的。条件变量始终与互斥锁一起使用。

wait/notify 等待唤醒机制

Volatile 内存共享

CountDownLatch 并发工具

CyclicBarrier 并发工具

信号量机制(Semaphore)

包括无名线程信号量和命名线程信号量。

信号机制(Signal)

类似进程间的信号处理。

线程间的通信目的主要是用于线程同步,所以线程没有像进程通信中的用于数据交换的通信机制。

等待/通知范式

synchronized+wait()和notify()

notify()方法会随机叫醒一个正在等待的线程,而notifyAll()会叫醒所有正在等待的线程。

Object的监视器方法 wait()、notify() 配合 synchronized 实现等待通知。

1、JAVA 中,每个对象有且只有一把锁 (lock),也叫监视器 (monitor)。

2、wait()、notify() 方法属于 Object 类,并且无法被重写。

3、调用 wait()、notify() 方法的线程必需获得监视器锁并由锁对象调用,否则会抛出 IllegalMonitorStateException 异常。

4、其它线程调用锁对象的 notify() 方法时,系统会在当前挂起等待的多个线程中,随机唤醒一个,被唤醒的线程需要重新获取锁,获取锁后将继续执行。

image-20210720130343411

image-20210720130231621

等待者(也可以称之为消费者):

1
2
3
4
5
6
synchronized (对象lock) {
while (条件不满足) {
对象.wait(); // 释放锁
}
// TODO 处理逻辑
}

通知者(也可以称之为生产者):

1
2
3
4
5
6
synchronized (对象lock) {
while (条件满足) {
改变条件
对象.notify();
}
}

注意:

实际开发中最好采用的是超时等待/通知模式,例如 join(long millis, int nanos)

等待/通知机制使用的是使用同一个对象锁,如果你两个线程使用的是不同的对象锁,那它们之间是不能用等待/通知机制通信的。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Test {
public static void main(String[] args) throws InterruptedException {
Task task = new Task();
Thread thread1 = new Thread(task);
Thread thread2 = new Thread(task);
Thread thread3 = new Thread(task);
thread1.start();
thread2.start();
thread3.start();

Thread.sleep(1000);

synchronized (task) {
// 唤醒线程
// task.notify(); // 唤醒正在该对象的监视器上等待的单个线程,随机唤醒一个
task.notifyAll(); // 唤醒在此对象监视器上等待的所有线程
}
}
}

class Task implements Runnable {
@Override
public void run() {
synchronized (this) {
try {
// 是当前线程等待
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("等待唤醒机制");
}
}
}

Lock+Condition

Lock 锁的等待唤醒机制和 synchronized 稍微有点不同。Lock 锁的是靠 Condition 实现的,Condition 是通过 Lock 锁的 newCondition() 方法获取,每调用一次都会返回一个新的 Condition 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Condition {
// 使当前线程等待,直到唤醒或被中断为止。
void await() throws InterruptedException;
// 使当前线程等待,直到唤醒为止,无法被中断。
void awaitUninterruptibly();
// 使当前线程等待,直到唤醒或被中断或经过指定的等待时间为止。
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待线程。
void signal();
// 唤醒所有等待线程
void signalAll();
}

Condition 接口中的方法和 synchronized 同步锁的方法对应关系,红色字体是 Condition 的常用方法:

synchronized Condition
wait() await()
awaitUninterruptibly()
wait(long timeout) awaitNanos(long nanosTimeout)
wait(long timeout, int nanos) await(long time, TimeUnit unit)
awaitUntil(Date deadline)
notify() signal()
notifyAll() signalAll()

Condition配合Lock实现等待唤醒机制的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class Test {
public static void main(String[] args) {
Lock lock = new ReentrantLock(); // 创建锁
Condition condition = lock.newCondition(); // 创建条件
Task task = new Task(lock, condition);
Thread thread1 = new Thread(task);
Thread thread2 = new Thread(task);
Thread thread3 = new Thread(task);
thread1.start();
thread2.start();
thread3.start();

// 唤醒等待线程
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
lock.lock(); // 获取锁
condition.signal(); // 唤醒单个等待线程
condition.signalAll(); // 唤醒所有等待线程
lock.unlock(); // 释放锁
}
}

class Task implements Runnable {
// 注意:Lock 和 Condition 都是在Task外部创建的,因为唤醒操作需要先获取锁,才能调用Condition方法的signal()或signalAll()
private Lock lock;
private Condition condition;

public Task(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}

@Override
public void run() {
lock.lock(); // 获取锁,阻塞
try {
condition.await(); // 使当前线程等待
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // 释放锁
}
}
}

LockSupport

LockSupport实现互斥锁(等待唤醒机制):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class Test {
public static void main(String[] args) {
Task task = new Task();
Thread thread0 = new Thread(task);
Thread thread1 = new Thread(task);
Thread thread2 = new Thread(task);
thread0.start();
thread1.start();
thread2.start();
}
}

class Task implements Runnable {
/**
* 互斥锁
*/
private final FIFOMutex lock = new FIFOMutex();

@Override
public void run() {
lock.lock();
try {
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

/**
* 先进先出互斥锁
*/
class FIFOMutex {
/**
* 锁标志
*/
private final AtomicBoolean locked = new AtomicBoolean(false);
/**
* 线程等待队列
*/
private final Queue<Thread> waiters = new ConcurrentLinkedDeque<>();

public void lock() {
// 是否被中断
boolean wasInterrupted = false;
Thread current = Thread.currentThread();
// 加入线程等待队列
waiters.add(current);
// 当前线程不是队列中的第一个或没有获取到锁时
while (waiters.peek() != current || !locked.compareAndSet(false, true)) {
// 使当前线程等待
LockSupport.park(this);
// 被唤醒后,判断当前线程是否被中断并清除中断状态
if (Thread.interrupted()) {
// 记录当前线程被中断
// 但在等到时忽略中断
wasInterrupted = true;
}
}

// 获取锁之后将当前线程移出队列
waiters.remove();
// 当当前线程被中断时
if (wasInterrupted) {
// 在退出时重新标记中断状态
current.interrupt();
}
}

public void unlock() {
// 将锁标记设置为false
locked.set(false);
// 唤醒指定线程
LockSupport.unpark(waiters.peek());

}
}

生产者与消费者

synchronized+wait()和notify()

示例1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public class Test {
public static void main(String[] args) {
// 创建数据
Data data = new Data();
// 创建生产者、消费者任务
Producer producer = new Producer(data);
Consumer consumer = new Consumer(data);
// 创建生产者、消费者线程
Thread producerThread1 = new Thread(producer);
Thread producerThread2 = new Thread(producer);
Thread consumerThread1 = new Thread(consumer);
Thread consumerThread2 = new Thread(consumer);
// 启动线程
producerThread1.start();
producerThread2.start();
consumerThread1.start();
consumerThread2.start();
}
}

class Producer implements Runnable {
private Data data;

public Producer(Data data) {
this.data = data;
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
synchronized (data) {
if (!data.isFlag()) {
data.setDataNum(data.getDataNum() + 1);
System.out.println(Thread.currentThread().getName() + " 已生产 数据编号: " + data.getDataNum());
// 标记已生产状态
data.setFlag(true);
}
// 唤醒消费者线程
data.notify();
try {
// 使当前线程等待
data.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

class Consumer implements Runnable {
private Data data;

public Consumer(Data data) {
this.data = data;
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
synchronized (data) {
if (data.isFlag()) {
System.out.println(Thread.currentThread().getName() + " 已消费 数据编号: " + data.getDataNum());
// 标记未生产状态
data.setFlag(false);
}
// 唤醒生产者线程
data.notify();
try {
// 使当前线程等待
data.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

class Data {
// 数据编号
private int dataNum = 0;

/**
* 数据生产状态,默认false。
* 已生产: true;
* 未生产: false;
*/
private boolean flag = false;

public void setDataNum(int dataNum) {
this.dataNum = dataNum;
}

public int getDataNum() {
return dataNum;
}

public void setFlag(boolean flag) {
this.flag = flag;
}

public boolean isFlag() {
return flag;
}
}

示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public class Test {
public static void main(String[] args) {
// 创建数据
Data data = new Data();
// 创建生产者、消费者任务
Producer producer = new Producer(data);
Consumer consumer = new Consumer(data);
// 创建生产者、消费者线程
Thread producerThread1 = new Thread(producer);
Thread producerThread2 = new Thread(producer);
Thread consumerThread1 = new Thread(consumer);
Thread consumerThread2 = new Thread(consumer);
// 启动线程
producerThread1.start();
producerThread2.start();
consumerThread1.start();
consumerThread2.start();
}
}

class Consumer implements Runnable {
private final Data data;

public Consumer(Data data) {
this.data = data;
}

@Override
public void run() {
// 无限消费
while (!Thread.currentThread().isInterrupted()) {
synchronized (data) {
// 当数据不为空时
if (data.getMessage() != null) {
// 消费数据
System.out.println("消费: " + data.getMessage());
// 重置数据
data.setMessage(null);
}
// 唤醒生产者线程
data.notify();
try {
// 使当前线程等待
data.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}


class Producer implements Runnable {
private final Data data;

public Producer(Data data) {
this.data = data;
}

@Override
public void run() {
// 计数器
int i = 0;
// 无限生产
while (!Thread.currentThread().isInterrupted()) {
synchronized (data) {
// 当数据为空时
if (data.getMessage() == null) {
// 正产数据
data.setMessage("香蕉" + i++);
System.out.println("生产: " + data.getMessage());
}
// 唤醒消费者线程
data.notify();
try {
// 使当前线程等待
data.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

/**
* 数据类
*/
class Data {
// 消息
private String message;

public void setMessage(String message) {
this.message = message;
}

public String getMessage() {
return message;
}
}

Lock+Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
public class Test {
public static void main(String[] args) {
// 创建数据
Data data = new Data();
// 创建生产者、消费者任务
Producer producer = new Producer(data);
Consumer consumer = new Consumer(data);
// 创建生产者、消费者线程
Thread producerThread1 = new Thread(producer);
Thread producerThread2 = new Thread(producer);
Thread consumerThread1 = new Thread(consumer);
Thread consumerThread2 = new Thread(consumer);
// 启动线程
producerThread1.start();
producerThread2.start();
consumerThread1.start();
consumerThread2.start();
}
}

class Consumer implements Runnable {
private final Data data;

public Consumer(Data data) {
this.data = data;
}

@Override
public void run() {
// 无限消费
while (!Thread.currentThread().isInterrupted()) {
try {
// 消费数据
System.out.println("消费: " + data.getMessage());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}


class Producer implements Runnable {
private final Data data;

public Producer(Data data) {
this.data = data;
}

@Override
public void run() {
// 计数器
int i = 0;
// 无限生产
while (!Thread.currentThread().isInterrupted()) {
try {
// 生产数据
data.setMessage("香蕉" + i++);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

/**
* 数据类
*/
class Data {
// 锁
private Lock lock = new ReentrantLock();
// 生产者条件
private Condition producerCondition = lock.newCondition();
// 消费者条件
private Condition consumerCondition = lock.newCondition();
// 消息
private String message;

public void setMessage(String message) throws InterruptedException {
lock.lock(); // 获取锁,阻塞
try {
// 当数据不为空时
while (this.message != null) {
// 使生产者线程等待
producerCondition.await();
}
// 设置数据
this.message = message;
System.out.println("生产: " + this.message);
// 唤醒所有消费者线程
consumerCondition.signalAll();
} finally {
lock.unlock(); // 释放锁
}
}

public String getMessage() throws InterruptedException {
lock.lock(); // 获取锁,阻塞
try {
// 当数据为空时
while (this.message == null) {
// 使生产者线程等待
consumerCondition.await();
}
// 唤醒所有生产者线程
producerCondition.signalAll();
// 返回数据
return message;
} finally {
// 重置数据
message = null;
lock.unlock(); // 释放锁
}
}
}

信号量

JDK提供了一个类似于“信号量”功能的类 Semaphore。但本文不是要介绍这个类,而是介绍一种基于 volatile 关键字实现的信号量通信。

比如现在有一个需求,想让线程A输出0,然后线程B输出1,再然后线程A输出2…以此类推。应该怎样实现呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class Signal {
private static volatile int signal = 0;
private static int num = 10000;

public static void main(String[] args) throws InterruptedException {
RunnableA runnableA = new RunnableA();
new Thread(runnableA, "ThreadA").start();
new Thread(runnableA, "ThreadA").start();

Thread.sleep(1000);

RunnableB runnableB = new RunnableB();
new Thread(runnableB, "ThreadB").start();
new Thread(runnableB, "ThreadB").start();
}

static class RunnableA implements Runnable {
@Override
public void run() {
while (true) {
// 其实这里sleep或yield一下就不需要用volatile修饰了
if (signal % 2 == 0 && signal < num) {
synchronized (this) {
if (signal % 2 == 0 && signal < num) {
System.out.println(Thread.currentThread().getName() + " " + signal);
signal++;
}
}
} else if (signal >= num) {
break;
}
// 其实这里sleep或yield一下就不需要用volatile修饰了
}
}
}

static class RunnableB implements Runnable {
@Override
public void run() {
while (true) {
// 其实这里sleep或yield一下就不需要用volatile修饰了
if (signal % 2 == 1 && signal < num) {
synchronized (this) {
if (signal % 2 == 1 && signal < num) {
System.out.println(Thread.currentThread().getName() + " " + signal);
signal = signal + 1;
}
}
} else if (signal >= num) {
break;
}
// 其实这里sleep或yield一下就不需要用volatile修饰了
}
}
}
}

// 输出:
threadA: 0
threadB: 1
threadA: 2
threadB: 3
threadA: 4

可以看到,使用 volatile 变量 signal 实现了“信号量”的模型。但需要注意,volatile 变量需要进行原子操作。signal++ 并不是一个原子操作,所以一旦有多个 ThreadA 或 ThreadB 就需要使用 synchronized 给它“上锁”。

这种实现方式并不一定高效,本例只是演示信号量

信号量的应用场景:

在公共停车场中,车位是公共资源,先到先得,车辆看做线程,看门的管理员就是起到了“信号量”的作用。

因为在这种场景下,多个线程(超过2个)需要相互合作,我们用简单的“锁”和“等待通知机制”就不那么方便了。这个时候就可以用到信号量。

其实JDK中提供的很多多线程通信工具类都是基于信号量模型的。