合理的使用Java多线程可以更好地利用服务器资源。一般来讲,线程内部有自己私有的线程上下文,互不干扰。但是当需要多个线程间相互协作时,就需要掌握线程的通信方式。
通信方式
实际上只有进程间需要通信,同一进程的线程共享地址空间,没有通信的必要,但要做好同步/互斥,保护共享的全局变量。
而进程间通信无论是 信号、管道pipe、共享内存 都由操作系统保证,是系统调用。
进程间通信
管道(pipe)
管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。
有名管道 (namedpipe)
有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。
信号量(semaphore)
信号量是一个计数器,可以用来控制多个进程对共享资源的访问。它常作为一种锁机制,防止某进程正在访问共享资源时,其他进程也访问该资源。因此,主要作为进程间以及同一进程内不同线程之间的同步手段。
消息队列(messagequeue)
消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。
信号 (sinal)
信号是一种比较复杂的通信方式,用于通知接收进程某个事件已经发生。
共享内存(shared memory)
共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的 IPC 方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往与其他通信机制,如信号量,配合使用,来实现进程间的同步和通信。
套接字(socket)
套接口也是一种进程间通信机制,与其他通信机制不同的是,它可用于不同设备及其间的进程通信。
线程间通信
锁机制:包括互斥锁、条件变量、读写锁
互斥锁提供了以排他方式防止数据结构被并发修改的方法。
读写锁允许多个线程同时读共享数据,而对写操作是互斥的。
条件变量可以以原子的方式阻塞进程,直到某个特定条件为真为止。对条件的测试是在互斥锁的保护下进行的。条件变量始终与互斥锁一起使用。
wait/notify 等待唤醒机制
Volatile 内存共享
CountDownLatch 并发工具
CyclicBarrier 并发工具
信号量机制(Semaphore)
包括无名线程信号量和命名线程信号量。
信号机制(Signal)
类似进程间的信号处理。
线程间的通信目的主要是用于线程同步,所以线程没有像进程通信中的用于数据交换的通信机制。
等待/通知范式
synchronized+wait()和notify()
notify()方法会随机叫醒一个正在等待的线程,而notifyAll()会叫醒所有正在等待的线程。
Object的监视器方法 wait()、notify() 配合 synchronized 实现等待通知。
1、JAVA 中,每个对象有且只有一把锁 (lock),也叫监视器 (monitor)。
2、wait()、notify() 方法属于 Object 类,并且无法被重写。
3、调用 wait()、notify() 方法的线程必需获得监视器锁并由锁对象调用,否则会抛出 IllegalMonitorStateException
异常。
4、其它线程调用锁对象的 notify() 方法时,系统会在当前挂起等待的多个线程中,随机唤醒一个,被唤醒的线程需要重新获取锁,获取锁后将继续执行。


等待者(也可以称之为消费者):
1 2 3 4 5 6
| synchronized (对象lock) { while (条件不满足) { 对象.wait(); } }
|
通知者(也可以称之为生产者):
1 2 3 4 5 6
| synchronized (对象lock) { while (条件满足) { 改变条件 对象.notify(); } }
|
注意:
实际开发中最好采用的是超时等待/通知模式,例如 join(long millis, int nanos)
。
等待/通知机制使用的是使用同一个对象锁,如果你两个线程使用的是不同的对象锁,那它们之间是不能用等待/通知机制通信的。
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class Test { public static void main(String[] args) throws InterruptedException { Task task = new Task(); Thread thread1 = new Thread(task); Thread thread2 = new Thread(task); Thread thread3 = new Thread(task); thread1.start(); thread2.start(); thread3.start();
Thread.sleep(1000);
synchronized (task) { task.notifyAll(); } } }
class Task implements Runnable { @Override public void run() { synchronized (this) { try { this.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("等待唤醒机制"); } } }
|
Lock+Condition
Lock 锁的等待唤醒机制和 synchronized 稍微有点不同。Lock 锁的是靠 Condition 实现的,Condition 是通过 Lock 锁的 newCondition()
方法获取,每调用一次都会返回一个新的 Condition 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
|
Condition 接口中的方法和 synchronized 同步锁的方法对应关系,红色字体是 Condition 的常用方法:
synchronized |
Condition |
wait() |
await() |
|
awaitUninterruptibly() |
wait(long timeout) |
awaitNanos(long nanosTimeout) |
wait(long timeout, int nanos) |
await(long time, TimeUnit unit) |
|
awaitUntil(Date deadline) |
notify() |
signal() |
notifyAll() |
signalAll() |
Condition配合Lock实现等待唤醒机制的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public class Test { public static void main(String[] args) { Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); Task task = new Task(lock, condition); Thread thread1 = new Thread(task); Thread thread2 = new Thread(task); Thread thread3 = new Thread(task); thread1.start(); thread2.start(); thread3.start();
try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } lock.lock(); condition.signal(); condition.signalAll(); lock.unlock(); } }
class Task implements Runnable { private Lock lock; private Condition condition;
public Task(Lock lock, Condition condition) { this.lock = lock; this.condition = condition; }
@Override public void run() { lock.lock(); try { condition.await(); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
|
LockSupport
LockSupport实现互斥锁(等待唤醒机制):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| public class Test { public static void main(String[] args) { Task task = new Task(); Thread thread0 = new Thread(task); Thread thread1 = new Thread(task); Thread thread2 = new Thread(task); thread0.start(); thread1.start(); thread2.start(); } }
class Task implements Runnable {
private final FIFOMutex lock = new FIFOMutex();
@Override public void run() { lock.lock(); try { Thread.sleep(3000); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
class FIFOMutex {
private final AtomicBoolean locked = new AtomicBoolean(false);
private final Queue<Thread> waiters = new ConcurrentLinkedDeque<>();
public void lock() { boolean wasInterrupted = false; Thread current = Thread.currentThread(); waiters.add(current); while (waiters.peek() != current || !locked.compareAndSet(false, true)) { LockSupport.park(this); if (Thread.interrupted()) { wasInterrupted = true; } }
waiters.remove(); if (wasInterrupted) { current.interrupt(); } }
public void unlock() { locked.set(false); LockSupport.unpark(waiters.peek());
} }
|
生产者与消费者
synchronized+wait()和notify()
示例1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| public class Test { public static void main(String[] args) { Data data = new Data(); Producer producer = new Producer(data); Consumer consumer = new Consumer(data); Thread producerThread1 = new Thread(producer); Thread producerThread2 = new Thread(producer); Thread consumerThread1 = new Thread(consumer); Thread consumerThread2 = new Thread(consumer); producerThread1.start(); producerThread2.start(); consumerThread1.start(); consumerThread2.start(); } }
class Producer implements Runnable { private Data data;
public Producer(Data data) { this.data = data; }
@Override public void run() { while (!Thread.currentThread().isInterrupted()) { synchronized (data) { if (!data.isFlag()) { data.setDataNum(data.getDataNum() + 1); System.out.println(Thread.currentThread().getName() + " 已生产 数据编号: " + data.getDataNum()); data.setFlag(true); } data.notify(); try { data.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
class Consumer implements Runnable { private Data data;
public Consumer(Data data) { this.data = data; }
@Override public void run() { while (!Thread.currentThread().isInterrupted()) { synchronized (data) { if (data.isFlag()) { System.out.println(Thread.currentThread().getName() + " 已消费 数据编号: " + data.getDataNum()); data.setFlag(false); } data.notify(); try { data.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
class Data { private int dataNum = 0;
private boolean flag = false;
public void setDataNum(int dataNum) { this.dataNum = dataNum; }
public int getDataNum() { return dataNum; }
public void setFlag(boolean flag) { this.flag = flag; }
public boolean isFlag() { return flag; } }
|
示例2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
| public class Test { public static void main(String[] args) { Data data = new Data(); Producer producer = new Producer(data); Consumer consumer = new Consumer(data); Thread producerThread1 = new Thread(producer); Thread producerThread2 = new Thread(producer); Thread consumerThread1 = new Thread(consumer); Thread consumerThread2 = new Thread(consumer); producerThread1.start(); producerThread2.start(); consumerThread1.start(); consumerThread2.start(); } }
class Consumer implements Runnable { private final Data data;
public Consumer(Data data) { this.data = data; }
@Override public void run() { while (!Thread.currentThread().isInterrupted()) { synchronized (data) { if (data.getMessage() != null) { System.out.println("消费: " + data.getMessage()); data.setMessage(null); } data.notify(); try { data.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
class Producer implements Runnable { private final Data data;
public Producer(Data data) { this.data = data; }
@Override public void run() { int i = 0; while (!Thread.currentThread().isInterrupted()) { synchronized (data) { if (data.getMessage() == null) { data.setMessage("香蕉" + i++); System.out.println("生产: " + data.getMessage()); } data.notify(); try { data.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
class Data { private String message;
public void setMessage(String message) { this.message = message; }
public String getMessage() { return message; } }
|
Lock+Condition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
| public class Test { public static void main(String[] args) { Data data = new Data(); Producer producer = new Producer(data); Consumer consumer = new Consumer(data); Thread producerThread1 = new Thread(producer); Thread producerThread2 = new Thread(producer); Thread consumerThread1 = new Thread(consumer); Thread consumerThread2 = new Thread(consumer); producerThread1.start(); producerThread2.start(); consumerThread1.start(); consumerThread2.start(); } }
class Consumer implements Runnable { private final Data data;
public Consumer(Data data) { this.data = data; }
@Override public void run() { while (!Thread.currentThread().isInterrupted()) { try { System.out.println("消费: " + data.getMessage()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
class Producer implements Runnable { private final Data data;
public Producer(Data data) { this.data = data; }
@Override public void run() { int i = 0; while (!Thread.currentThread().isInterrupted()) { try { data.setMessage("香蕉" + i++); } catch (InterruptedException e) { e.printStackTrace(); } } } }
class Data { private Lock lock = new ReentrantLock(); private Condition producerCondition = lock.newCondition(); private Condition consumerCondition = lock.newCondition(); private String message;
public void setMessage(String message) throws InterruptedException { lock.lock(); try { while (this.message != null) { producerCondition.await(); } this.message = message; System.out.println("生产: " + this.message); consumerCondition.signalAll(); } finally { lock.unlock(); } }
public String getMessage() throws InterruptedException { lock.lock(); try { while (this.message == null) { consumerCondition.await(); } producerCondition.signalAll(); return message; } finally { message = null; lock.unlock(); } } }
|
信号量
JDK提供了一个类似于“信号量”功能的类 Semaphore
。但本文不是要介绍这个类,而是介绍一种基于 volatile
关键字实现的信号量通信。
比如现在有一个需求,想让线程A输出0,然后线程B输出1,再然后线程A输出2…以此类推。应该怎样实现呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| public class Signal { private static volatile int signal = 0; private static int num = 10000;
public static void main(String[] args) throws InterruptedException { RunnableA runnableA = new RunnableA(); new Thread(runnableA, "ThreadA").start(); new Thread(runnableA, "ThreadA").start();
Thread.sleep(1000);
RunnableB runnableB = new RunnableB(); new Thread(runnableB, "ThreadB").start(); new Thread(runnableB, "ThreadB").start(); }
static class RunnableA implements Runnable { @Override public void run() { while (true) { if (signal % 2 == 0 && signal < num) { synchronized (this) { if (signal % 2 == 0 && signal < num) { System.out.println(Thread.currentThread().getName() + " " + signal); signal++; } } } else if (signal >= num) { break; } } } }
static class RunnableB implements Runnable { @Override public void run() { while (true) { if (signal % 2 == 1 && signal < num) { synchronized (this) { if (signal % 2 == 1 && signal < num) { System.out.println(Thread.currentThread().getName() + " " + signal); signal = signal + 1; } } } else if (signal >= num) { break; } } } } }
threadA: 0 threadB: 1 threadA: 2 threadB: 3 threadA: 4
|
可以看到,使用 volatile
变量 signal
实现了“信号量”的模型。但需要注意,volatile
变量需要进行原子操作。signal++
并不是一个原子操作,所以一旦有多个 ThreadA 或 ThreadB 就需要使用 synchronized
给它“上锁”。
这种实现方式并不一定高效,本例只是演示信号量
信号量的应用场景:
在公共停车场中,车位是公共资源,先到先得,车辆看做线程,看门的管理员就是起到了“信号量”的作用。
因为在这种场景下,多个线程(超过2个)需要相互合作,我们用简单的“锁”和“等待通知机制”就不那么方便了。这个时候就可以用到信号量。
其实JDK中提供的很多多线程通信工具类都是基于信号量模型的。