JUC入门
基本概念
Thread 类和 Runnable 接口
// Runnable 接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
// Demo 类
public class Demo {
public static class MyThread implements Runnable {
@Override
public void run() {
System.out.println("MyThread");
}
}
public static void main(String[] args) {
new Thread(new MyThread()).start();
// Java 8 函数式编程,可以省略MyThread类
new Thread(() -> {
System.out.println("Java 8 匿名内部类");
}).start();
}
}
Callable、Future与FutureTask
// 自定义Callable
class Task implements Callable<Integer>{
@Override
public Integer call() throws Exception {
// 模拟计算需要一秒
Thread.sleep(1000);
return 2;
}
public static void main(String args[]) throws Exception {
// 使用
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
Future<Integer> result = executor.submit(task);
// 注意调用get方法会阻塞当前线程,直到得到结果。
// 所以实际编码中建议使用可以设置超时时间的重载get方法。
System.out.println(result.get());
}
}
// Future
public abstract interface Future<V> {
public abstract boolean cancel(boolean paramBoolean);
public abstract boolean isCancelled();
public abstract boolean isDone();
public abstract V get() throws InterruptedException, ExecutionException;
public abstract V get(long paramLong, TimeUnit paramTimeUnit)
throws InterruptedException, ExecutionException, TimeoutException;
}
// FutureTask
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
class Task implements Callable<Integer>{
@Override
public Integer call() throws Exception {
// 模拟计算需要一秒
Thread.sleep(1000);
return 2;
}
public static void main(String args[]) throws Exception {
// 使用
ExecutorService executor = Executors.newCachedThreadPool();
FutureTask<Integer> futureTask = new FutureTask<>(new Task());
executor.submit(futureTask);
System.out.println(futureTask.get());
}
}
线程组和线程优先级
线程组(ThreadGroup)
public class Demo {
public static void main(String[] args) {
Thread testThread = new Thread(() -> {
System.out.println("testThread当前线程组名字:" +
Thread.currentThread().getThreadGroup().getName());
System.out.println("testThread线程名字:" +
Thread.currentThread().getName());
});
testThread.start();
System.out.println("执行main所在线程的线程组名字: " + Thread.currentThread().getThreadGroup().getName());
System.out.println("执行main方法线程名字:" + Thread.currentThread().getName());
}
}
线程优先级
public class Demo {
public static void main(String[] args) {
Thread a = new Thread();
System.out.println("我是默认线程优先级:"+a.getPriority());
Thread b = new Thread();
b.setPriority(10);
System.out.println("我是设置过的线程优先级:"+b.getPriority());
}
}
Java程序中对线程所设置的优先级只是给操作系统一个建议,操作系统不一定会采纳。而真正的调用顺序,是由操作系统的线程调度算法决定的。
public class Demo {
public static class T1 extends Thread {
@Override
public void run() {
super.run();
System.out.println(String.format("当前执行的线程是:%s,优先级:%d",
Thread.currentThread().getName(),
Thread.currentThread().getPriority()));
}
}
public static void main(String[] args) {
IntStream.range(1, 10).forEach(i -> {
Thread thread = new Thread(new T1());
thread.setPriority(i);
thread.start();
});
}
}
如果某个线程优先级大于线程所在线程组的最大优先级,那么该线程的优先级将会失效,取而代之的是线程组的最大优先级。
线程组的常用方法及数据结构
常用方法
// 获取当前的线程组
ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
// 复制一个线程组到一个线程数组(获取Thread信息)
Thread[] threads = new Thread[threadGroup.activeCount()];
threadGroup.enumerate(threads);
// 线程组统一异常处理
package com.func.axc.threadgroup;
public class ThreadGroupDemo {
public static void main(String[] args) {
ThreadGroup threadGroup1 = new ThreadGroup("group1") {
// 继承ThreadGroup并重新定义以下方法
// 在线程成员抛出unchecked exception
// 会执行此方法
public void uncaughtException(Thread t, Throwable e) {
System.out.println(t.getName() + ": " + e.getMessage());
}
};
// 这个线程是threadGroup1的一员
Thread thread1 = new Thread(threadGroup1, new Runnable() {
public void run() {
// 抛出unchecked异常
throw new RuntimeException("测试异常");
}
});
thread1.start();
}
}
数据结构
// 线程组数据结构
public class ThreadGroup implements Thread.UncaughtExceptionHandler {
private final ThreadGroup parent; // 父亲ThreadGroup
String name; // ThreadGroupr 的名称
int maxPriority; // 线程最大优先级
boolean destroyed; // 是否被销毁
boolean daemon; // 是否守护线程
boolean vmAllowSuspension; // 是否可以中断
int nUnstartedThreads = 0; // 还未启动的线程
int nthreads; // ThreadGroup中线程数目
Thread threads[]; // ThreadGroup中的线程
int ngroups; // 线程组数目
ThreadGroup groups[]; // 线程组数组
// 私有构造函数
private ThreadGroup() {
this.name = "system";
this.maxPriority = Thread.MAX_PRIORITY;
this.parent = null;
}
// 默认是以当前ThreadGroup传入作为parent ThreadGroup,新线程组的父线程组是目前正在运行线程的线程组。
public ThreadGroup(String name) {
this(Thread.currentThread().getThreadGroup(), name);
}
// 构造函数
public ThreadGroup(ThreadGroup parent, String name) {
this(checkParentAccess(parent), parent, name);
}
// 私有构造函数,主要的构造函数
private ThreadGroup(void unused, ThreadGroup parent, String name) {
this.name = name;
this.maxPriority = parent.maxPriority;
this.daemon = parent.daemon;
this.vmAllowSuspension = parent.vmAllowSuspension;
this.parent = parent;
parent.add(this);
}
// 检查parent ThreadGroup
private static void checkParentAccess(ThreadGroup parent) {
parent.checkAccess();
return null;
}
// 判断当前运行的线程是否具有修改线程组的权限
public final void checkAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkAccess(this);
}
}
}
线程状态转换
操作系统线程主要有以下三个状态: + 就绪状态(ready) + 线程正在等待使用CPU,经调度程序调用之后可进入running状态。 + 执行状态(running) + 线程正在使用CPU。 + 等待状态(waiting) + 线程经过等待事件的调用或者正在等待其他资源(如I/O)。
Java 线程有6个状态
// Thread.State 源码
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
- NEW
- 创建了线程而并没有调用start()方法
- RUNNABLE
- Java线程的RUNNABLE状态包括了传统操作系统线程的ready和running两个状态
- BLOCKED
- 处于阻塞状态的线程正等待锁的释放以进入同步区
- WAITING
- 处于等待状态的线程变成RUNNABLE状态需要其他线程唤醒
- TIMED_WAITING
- 规定等待时间过后自动唤醒
- 之后拥有了去争夺锁的资格
- TERMINATED
- 线程已执行完毕
BLOCKED与RUNNABLE状态的转换
@Test
public void blockedTest() {
Thread a = new Thread(new Runnable() {
@Override
public void run() {
testMethod();
}
}, "a");
Thread b = new Thread(new Runnable() {
@Override
public void run() {
testMethod();
}
}, "b");
a.start();
b.start();
System.out.println(a.getName() + ":" + a.getState()); // 输出?
System.out.println(b.getName() + ":" + b.getState()); // 输出?
}
// 同步方法争夺锁
private synchronized void testMethod() {
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
WAITING状态与RUNNABLE状态的转换
public void blockedTest() {
······
a.start();
a.join();
b.start();
System.out.println(a.getName() + ":" + a.getState()); // 输出 TERMINATED
System.out.println(b.getName() + ":" + b.getState());
}
TIMED_WAITING与RUNNABLE状态转换
public void blockedTest() {
······
a.start();
a.join(1000L);
b.start();
System.out.println(a.getName() + ":" + a.getState()); // 输出 TIEMD_WAITING
System.out.println(b.getName() + ":" + b.getState());
}
线程中断
- Thread.interrupt()
- 中断线程。
- 这里的中断线程并不会立即停止线程,而是设置线程的中断状态为true(默认是flase)
- Thread.interrupted()
- 测试当前线程是否被中断。
- 线程的中断状态受这个方法的影响,意思是调用一次使线程中断状态设置为true,连续调用两次会使得这个线程的中断状态重新转为false
- Thread.isInterrupted()
- 测试当前线程是否被中断。
- 与上面方法不同的是调用这个方法并不会影响线程的中断状态。
线程间的通信
锁与同步
- 无锁
public class NoneLock {
static class ThreadA implements Runnable {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println("Thread A " + i);
}
}
}
static class ThreadB implements Runnable {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println("Thread B " + i);
}
}
}
public static void main(String[] args) {
new Thread(new ThreadA()).start();
new Thread(new ThreadB()).start();
}
}
- 有锁
public class ObjectLock {
private static Object lock = new Object();
static class ThreadA implements Runnable {
@Override
public void run() {
synchronized (lock) {
for (int i = 0; i < 100; i++) {
System.out.println("Thread A " + i);
}
}
}
}
static class ThreadB implements Runnable {
@Override
public void run() {
synchronized (lock) {
for (int i = 0; i < 100; i++) {
System.out.println("Thread B " + i);
}
}
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(new ThreadA()).start();
Thread.sleep(10);
new Thread(new ThreadB()).start();
}
}
等待/通知机制
Java多线程的等待/通知机制是基于Object类的wait()方法和notify(), notifyAll()方法来实现的。
观察者模式
public class WaitAndNotify {
private static Object lock = new Object();
static class ThreadA implements Runnable {
@Override
public void run() {
synchronized (lock) {
for (int i = 0; i < 5; i++) {
try {
System.out.println("ThreadA: " + i);
lock.notify();
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lock.notify();
}
}
}
static class ThreadB implements Runnable {
@Override
public void run() {
synchronized (lock) {
for (int i = 0; i < 5; i++) {
try {
System.out.println("ThreadB: " + i);
lock.notify();
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lock.notify();
}
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(new ThreadA()).start();
Thread.sleep(1000);
new Thread(new ThreadB()).start();
}
}
信号量
volatile关键字能够保证内存的可见性。
如果用volatile关键字声明了一个变量, 在一个线程里面改变了这个变量的值, 那其它线程是立马可见更改后的值的。 这里的信号量和操作系统里的有所区别,更像一种享元。
public class Signal { private static volatile int signal = 0; static class ThreadA implements Runnable { @Override public void run() { while (signal < 5) { if (signal % 2 == 0) { System.out.println("threadA: " + signal); signal++; } } } } static class ThreadB implements Runnable { @Override public void run() { while (signal < 5) { if (signal % 2 == 1) { System.out.println("threadB: " + signal); signal = signal + 1; } } } } public static void main(String[] args) throws InterruptedException { new Thread(new ThreadA()).start(); Thread.sleep(1000); new Thread(new ThreadB()).start(); } }
管道
一个线程需要先另一个线程发送一个信息
线程的责任链模式
public class Pipe {
static class ReaderThread implements Runnable {
private PipedReader reader;
public ReaderThread(PipedReader reader) {
this.reader = reader;
}
@Override
public void run() {
System.out.println("this is reader");
int receive = 0;
try {
while ((receive = reader.read()) != -1) {
System.out.print((char)receive);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
static class WriterThread implements Runnable {
private PipedWriter writer;
public WriterThread(PipedWriter writer) {
this.writer = writer;
}
@Override
public void run() {
System.out.println("this is writer");
int receive = 0;
try {
writer.write("test");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws IOException, InterruptedException {
PipedWriter writer = new PipedWriter();
PipedReader reader = new PipedReader();
writer.connect(reader); // 这里注意一定要连接,才能通信
new Thread(new ReaderThread(reader)).start();
Thread.sleep(1000);
new Thread(new WriterThread(writer)).start();
}
}
其它通信相关
- join方法
让当前线程陷入“等待”状态
public class Join {
static class ThreadA implements Runnable {
@Override
public void run() {
try {
System.out.println("我是子线程,我先睡一秒");
Thread.sleep(1000);
System.out.println("我是子线程,我睡完了一秒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new ThreadA());
thread.start();
thread.join();
System.out.println("如果不加join方法,我会先被打出来,加了就不一样了");
}
}
- sleep方法
让当前线程睡眠一段时间 sleep方法是不会释放当前的锁的,而wait方法会
- wait可以指定时间,也可以不指定;而sleep必须指定时间。
- wait释放cpu资源,同时释放锁;sleep释放cpu资源,但是不释放锁,所以易死锁。
-
wait必须放在同步块或同步方法中,而sleep可以在任意位置
-
ThreadLocal类
ThreadLocal类并不属于多线程间的通信,而是让每个线程有自己”独立“的变量
public class ThreadLocalDemo {
static class ThreadA implements Runnable {
private ThreadLocal<String> threadLocal;
public ThreadA(ThreadLocal<String> threadLocal) {
this.threadLocal = threadLocal;
}
@Override
public void run() {
threadLocal.set("A");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ThreadA输出:" + threadLocal.get());
}
static class ThreadB implements Runnable {
private ThreadLocal<String> threadLocal;
public ThreadB(ThreadLocal<String> threadLocal) {
this.threadLocal = threadLocal;
}
@Override
public void run() {
threadLocal.set("B");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ThreadB输出:" + threadLocal.get());
}
}
public static void main(String[] args) {
ThreadLocal<String> threadLocal = new ThreadLocal<>();
new Thread(new ThreadA(threadLocal)).start();
new Thread(new ThreadB(threadLocal)).start();
}
}
}
原理构造
Java 内存模型基础知识
两个关键问题
- 线程间如何通信
- 线程之间以何种机制来交换信息
- 线程间如何同步
- 线程以何种机制来控制不同线程间操作发生的相对顺序
有两种并发模型可以解决这两个问题 消息传递并发模型 共享内存并发模型
内存模型的抽象结构
- 运行时内存的划分
对于每一个线程来说,栈都是私有的,而堆是共有的。
- 内存不可见问题
计算机为了高效,往往会在高速缓存区中缓存共享变量
线程A无法直接访问线程B的工作内存,线程间通信必须经过主内存。 线程对共享变量的所有操作都必须在自己的本地内存中进行,不能直接从主内存中读取。
-
内存区域划分的区别与联系
-
区别
- 两者是不同的概念层次。
- JMM是抽象的,他是用来描述一组规则,通过这个规则来控制各个变量的访问方式,围绕原子性、有序性、可见性等展开的。
- Java运行时内存的划分是具体的,是JVM运行Java程序时,必要的内存划分。
-
联系
- 都存在私有数据区域和共享数据区域。
- JMM中的主内存属于共享数据区域,他是包含了堆和方法区;
- JMM中的本地内存属于私有数据区域,包含了程序计数器、本地方法栈、虚拟机栈。
重排序与happens-before
重排序的意义
避免流水线中断将运算符放到后面 编译成更快的机器指令 解释器模式
顺序一致性
-
数据竞争 当程序未正确同步的时候,就可能存在数据竞争。那么运行的结果往往充满了不确定性,比如读发生在了写之前,可能就会读到错误的值
-
顺序一致性模型
- 一个线程中的所有操作必须按照程序的顺序(即Java代码的顺序)来执行。
- 不管程序是否同步,所有线程都只能看到一个单一的操作执行顺序。即在顺序一致性模型中,每个操作必须是原子性的,且立刻对所有线程可见。
happens-before
- 如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。
- 两个操作之间存在happens-before关系,并不意味着Java平台的具体实现必须要按照happens-before关系指定的顺序来执行。如果重排序之后的执行结果,与按happens-before关系来执行的结果一致,那么JMM也允许这样的重排序。
happens-before关系
- 程序顺序规则
- 一个线程中的每一个操作,happens-before于该线程中的任意后续操作。
- 监视器锁规则
- 对一个锁的解锁,happens-before于随后对这个锁的加锁。
- volatile变量规则
- 对一个volatile域的写,happens-before于任意后续对这个volatile域的读。
- 传递性
- 如果A happens-before B,且B happens-before C,那么A happens-before C。
- start规则
- 如果线程A执行操作ThreadB.start()启动线程B,那么A线程的ThreadB.start()操作happens-before于线程B中的任意操作、
- join规则
- 如果线程A执行操作ThreadB.join()并成功返回,那么线程B中的任意操作happens-before于线程A从ThreadB.join()操作成功返回。
volatile
内存可见性
指的是线程之间的可见性,当一个线程修改了共享变量时,另一个线程可以读取到这个修改后的值。
public class VolatileExample {
int a = 0;
volatile boolean flag = false;
public void writer() {
a = 1; // step 1
flag = true; // step 2
}
public void reader() {
if (flag) { // step 3
System.out.println(a); // step 4
}
}
}
禁止重排序
编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。
// 声明变量
int a = 0; // 声明普通变量
volatile boolean flag = false; // 声明volatile变量
// 以下两个变量的读操作是可以重排序的
int i = a; // 普通变量读
boolean j = flag; // volatile变量读
volatile的用途
在功能上,锁比volatile更强大;在性能上,volatile更有优势。
public class Singleton {
private static Singleton instance; // 不使用volatile关键字
// 双重锁检验
public static Singleton getInstance() {
if (instance == null) { // 第7行
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton(); // 第10行
}
}
}
return instance;
}
}
instance = new Singleton(); // 第10行
// 可以分解为以下三个步骤
1 memory=allocate();// 分配内存 相当于c的malloc
2 ctorInstanc(memory) //初始化对象
3 s=memory //设置s指向刚分配的地址
// 上述三个步骤可能会被重排序为 1-3-2,也就是:
1 memory=allocate();// 分配内存 相当于c的malloc
3 s=memory //设置s指向刚分配的地址
2 ctorInstanc(memory) //初始化对象
synchronized与锁
Java类只有一个Class对象(可以有多个实例对象,多个实例共享这个Class对象),而Class对象也是特殊的Java对象。 所以我们常说的类锁,其实就是Class对象的锁。
Synchronized关键字
// 关键字在实例方法上,锁为当前实例
public synchronized void instanceLock() {
// code
}
// 关键字在静态方法上,锁为当前Class对象
public static synchronized void classLock() {
// code
}
// 关键字在代码块上,锁为括号里面的对象
public void blockLock() {
Object o = new Object();
synchronized (o) {
// code
}
}
几种锁
- 无锁状态
- 偏向锁状态
- 轻量级锁状态
- 重量级锁状态
偏向锁
- 为何会引入
锁不仅不存在多线程竞争,而且总是由同一线程多次获得
偏向锁会偏向于第一个访问锁的线程,如果在接下来的运行过程中,该锁没有被其他的线程访问,则持有偏向锁的线程将永远不需要触发同步。
也就是说,偏向锁在资源无竞争情况下消除了同步语句,连CAS操作都不做了,提高了程序的运行性能。
- 获得与撤消
-
轻量级锁
-
重量级锁
重量级锁依赖于操作系统的互斥量(mutex) 实现
-
优缺点对比 | 锁 | 优点 | 缺点 | 适用场景 | |----------|--------------------------------------------------------------------|--------------------------------------------------|--------------------------------------| | 偏向锁 | 加锁和解锁不需要额外的消耗,和执行非同步方法比仅存在纳秒级的差距。 | 如果线程间存在锁竞争,会带来额外的锁撤销的消耗。 | 适用于只有一个线程访问同步块场景。 | | 轻量级锁 | 竞争的线程不会阻塞,提高了程序的响应速度。 | 如果始终得不到锁竞争的线程使用自旋会消耗CPU。 | 追求响应时间。同步块执行速度非常快。 | | 重量级锁 | 线程竞争不使用自旋,不会消耗CPU。 | 线程阻塞,响应时间缓慢。 | 追求吞吐量。同步块执行时间较长。 |
CAS与原子操作
乐观锁与悲观锁
-
悲观锁
- 悲观锁就是我们常说的锁。
- 对于悲观锁来说,它总是认为每次访问共享资源时会发生冲突,所以必须对每次数据操作加上锁,
- 以保证临界区的程序同一时间只能有一个线程在执行。
-
乐观锁
- 乐观锁又称为“无锁”,顾名思义,它是乐观派。
- 乐观锁总是假设对共享资源的访问没有冲突,线程可以不停地执行,无需加锁也无需等待。
- 而一旦多个线程发生冲突,乐观锁通常是使用一种称为CAS的技术来保证线程执行的安全性。
CAS的概念
- CAS的全称是 比较并交换(Compare And Swap)。
- 在CAS中,有这样三个值
- V:要更新的变量(var)
- E:预期值(expected)
- N:新值(new)
当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败, 但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。
Java实现CAS
使用Unsafe类,它在sun.misc包中。它里面是一些native方法,其中就有几个关于CAS的:
boolean compareAndSwapObject(Object o, long offset,Object expected, Object x);
boolean compareAndSwapInt(Object o, long offset,int expected,int x);
boolean compareAndSwapLong(Object o, long offset,long expected,long x);
原子操作
JDK提供了一些用于原子操作的类,在java.util.concurrent.atomic包下面
三个问题
- ABA 问题 一个值原来是A,变成了B,又变回了A。这个时候使用CAS是检查不出变化的,但实际上却被更新了两次。
// 使用版本号或者时间戳
// class AtomicStampedReference
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
-
循环时间长开销大 CAS多与自旋结合。如果自旋CAS长时间不成功,会占用大量的CPU资源。 解决思路是让JVM支持处理器提供的
pause
指令。 -
只能保证一个共享变量的原子操作
- 使用 AtomicReference
- 使用锁
AQS
简介
- AQS是AbstractQueuedSynchronizer的简称,即抽象队列同步器,从字面意思上理解:
- 抽象:抽象类,只实现一些主要逻辑,有些方法由子类实现;
- 队列:使用先进先出(FIFO)队列存储数据;
- 同步:实现了同步的功能。
数据结构
资源共享模式
- 两种共享模式(两种同步方式)
- 独占模式(Exclusive) > 资源是独占的,一次只能一个线程获取。 > 如ReentrantLock。
- 共享模式(Share) > 同时可以被多个线程获取,具体的资源个数可以通过参数指定。 > 如Semaphore/CountDownLatch。
static final class Node {
// 标记一个结点(对应的线程)在共享模式下等待
static final Node SHARED = new Node();
// 标记一个结点(对应的线程)在独占模式下等待
static final Node EXCLUSIVE = null;
// waitStatus的值,表示该结点(对应的线程)已被取消
static final int CANCELLED = 1;
// waitStatus的值,表示后继结点(对应的线程)需要被唤醒
static final int SIGNAL = -1;
// waitStatus的值,表示该结点(对应的线程)在等待某一条件
static final int CONDITION = -2;
/*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/
static final int PROPAGATE = -3;
// 等待状态,取值范围,-3,-2,-1,0,1
volatile int waitStatus;
volatile Node prev; // 前驱结点
volatile Node next; // 后继结点
volatile Thread thread; // 结点对应的线程
Node nextWaiter; // 等待队列里下一个等待条件的结点
// 判断共享模式的方法
final boolean isShared() {
return nextWaiter == SHARED;
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 其它方法忽略,可以参考具体的源码
}
// AQS里面的addWaiter私有方法
private Node addWaiter(Node mode) {
// 使用了Node的这个构造函数
Node node = new Node(Thread.currentThread(), mode);
// 其它代码省略
}
源码解析
AQS的设计基于模板方法模式,它有一些方法必须要子类去实现的。
- isHeldExclusively()
- 该线程是否正在独占资源。
- 只有用到condition才需要去实现它。
- tryAcquire(int)
- 独占方式。
- 尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int)
- 独占方式。
- 尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int)
- 共享方式。
- 尝试获取资源,负数表示失败;
- 0表示成功,但没有剩余可用资源;
- 正数表示成功,且有剩余资源。
- tryReleaseShared(int)
- 共享方式。
- 尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
获取资源
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
// 生成该线程对应的Node节点
Node node = new Node(Thread.currentThread(), mode);
// 将Node插入队列中
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 使用CAS尝试,如果成功就返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果等待队列为空或者上述CAS失败,再自旋CAS插入
enq(node);
return node;
}
// 自旋CAS插入等待队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
final Node p = node.predecessor();
// 如果node的前驱结点p是head,表示node是第二个结点,就可以尝试去获取资源了
if (p == head && tryAcquire(arg)) {
// 拿到资源后,将head指向该结点。
// 所以head所指的结点,就是当前获取到资源的那个结点或null。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果自己可以休息了,就进入waiting状态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
结点进入等待队列后,是调用park使它进入阻塞状态的。只有头结点的线程是处于活跃状态的。
释放资源
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 如果状态是负数,尝试把它设置为0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 得到头结点的后继结点head.next
Node s = node.next;
// 如果这个后继结点为空或者状态大于0
// 通过前面的定义我们知道,大于0只有一种可能,就是这个结点已被取消
if (s == null || s.waitStatus > 0) {
s = null;
// 等待队列中所有还有用的结点,都向前移动
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果后继结点不为空,
if (s != null)
LockSupport.unpark(s.thread);
}
工具类
线程池原理
为什么要使用线程池
- 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程。
- 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃。(主要原因)
- 可以对线程做统一管理。
线程池原理与实现接口
Java中的线程池顶层接口是Executor接口,ThreadPoolExecutor是这个接口的实现类。
- ThreadPoolExecutor 结构
// 五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
// 六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
// 六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
// 七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
static class DefaultThreadFactory implements ThreadFactory {
// 省略属性
// 构造函数
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
// 省略
}
2. ThreadPoolExecutor的策略
ThreadPoolExecutor类中定义了一个volatile int变量runState来表示线程池的状态 ,
分别为`RUNNING`、`SHUTDOWN`、`STOP`、`TIDYING` 、`TERMINATED`。
+ 线程池创建后处于RUNNING状态。
+ 调用shutdown()方法后处于SHUTDOWN状态,
+ 线程池不能接受新的任务,清除一些空闲worker,会等待阻塞队列的任务完成。
+ 调用shutdownNow()方法后处于STOP状态,
+ 线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。
+ 此时,poolsize=0,阻塞队列的size也为0。
+ 当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。
+ 接着会执行terminated()函数。
- 主要的任务处理流程
// JDK 1.8
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1.当前线程数小于corePoolSize,则调用addWorker创建核心线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果不小于corePoolSize,则将任务添加到workQueue队列。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2.1 如果isRunning返回false(状态检查),则remove这个任务,然后执行拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.2 线程池处于running状态,但是没有线程,则创建线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.如果放入workQueue失败,则创建非核心线程执行任务,
// 如果这时创建非核心线程失败(当前线程总数不小于maximumPoolSize时),就会执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
- 多路复用
// ThreadPoolExecutor.addWorker方法源码上半部分
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
// 1.如果core是ture,证明需要创建的线程为核心线程,则先判断当前线程是否大于核心线程
// 如果core是false,证明需要创建的是非核心线程,则先判断当前线程数是否大于总线程数
// 如果不小于,则返回false
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// ThreadPoolExecutor.addWorker方法源码下半部分
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 1.创建一个worker对象
w = new Worker(firstTask);
// 2.实例化一个Thread对象
final Thread t = w.thread;
if (t != null) {
// 3.线程池全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 4.启动这个线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// Worker类部分源码
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
//其余代码略...
}
// Worker.runWorker方法源代码
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 1.线程启动之后,通过unlock方法释放锁
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 2.Worker执行firstTask或从workQueue中获取任务,如果getTask方法不返回null,循环不退出
while (task != null || (task = getTask()) != null) {
// 2.1进行加锁操作,保证thread不被其他线程中断(除非线程池被中断)
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 2.2检查线程池状态,倘若线程池处于中断状态,当前线程将中断。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 2.3执行beforeExecute
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 2.4执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 2.5执行afterExecute方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 2.6解锁操作
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
// Worker.getTask方法源码
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 1.allowCoreThreadTimeOut变量默认是false,核心线程即使空闲也不会被销毁
// 如果为true,核心线程在keepAliveTime内仍空闲则会被销毁。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 2.如果运行线程数超过了最大线程数,但是缓存队列已经空了,这时递减worker数量。
// 如果有设置允许线程超时或者线程数量超过了核心线程数量,
// 并且线程在规定时间内均未poll到任务且队列为空则递减worker数量
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 3.如果timed为true(想想哪些情况下timed为true),则会调用workQueue的poll方法获取任务.
// 超时时间是keepAliveTime。如果超过keepAliveTime时长,
// poll返回了null,上边提到的while循序就会退出,线程也就执行完了。
// 如果timed为false(allowCoreThreadTimeOut为falsefalse
// 且wc > corePoolSize为false),则会调用workQueue的take方法阻塞在当前。
// 队列中有任务加入时,线程被唤醒,take方法返回任务,并执行。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
四种常见的线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
阻塞队列
- 在生产者-消费者模式中,因为需要让多个线程操作共享变量(即资源),所以很容易引发线程安全问题,造成重复消费和死锁。
- 阻塞队列(BlockingQueue),你只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。
实现类
public ArrayBlockingQueue(int capacity, boolean fair){
//..省略代码
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
this.lock = new ReentrantLock(); //默认构造方法-非公平锁
...//其余代码略
}
阻塞队列原理
//数据元素数组
final Object[] items;
//下一个待取出元素索引
int takeIndex;
//下一个待添加元素索引
int putIndex;
//元素个数
int count;
//内部锁
final ReentrantLock lock;
//消费者监视器
private final Condition notEmpty;
//生产者监视器
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
//..省略其他代码
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 1.自旋拿锁
lock.lockInterruptibly();
try {
// 2.判断队列是否满了
while (count == items.length)
// 2.1如果满了,阻塞该线程,并标记为notFull线程,
// 等待notFull的唤醒,唤醒之后继续执行while循环。
notFull.await();
// 3.如果没有满,则进入队列
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 4 唤醒一个等待的线程
notEmpty.signal();
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
使用场景
public class Test {
private int queueSize = 10;
private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
public static void main(String[] args) {
Test test = new Test();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread{
@Override
public void run() {
consume();
}
private void consume() {
while(true){
try {
queue.take();
System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Producer extends Thread{
@Override
public void run() {
produce();
}
private void produce() {
while(true){
try {
queue.put(1);
System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
锁接口和类
synchronized的不足之处
- 如果临界区是只读操作,其实可以多线程一起执行,但使用synchronized的话,同一时间只能有一个线程执行。
- synchronized无法知道线程有没有成功获取到锁
- 使用synchronized,如果临界区因为IO或者sleep方法等原因阻塞了,而当前线程又没有释放锁,就会导致所有线程等待。
锁的几种分类
- 可重入锁和非可重入锁
- 公平锁与非公平锁
- 读写锁和排它锁
JDK中有关锁的一些接口和类
-
抽象类AQS/AQLS/AOS
-
接口Condition/Lock/ReadWriteLock
-
ReentrantLock
ReentrantLock是一个非抽象类,它是Lock接口的JDK默认实现,实现了锁的基本功能。
-
ReentrantReadWriteLock
// 内部结构
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
// 具体实现
}
static final class NonfairSync extends Sync {
// 具体实现
}
static final class FairSync extends Sync {
// 具体实现
}
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 具体实现
}
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 具体实现
}
// 构造方法,初始化两个锁
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
// 获取读锁和写锁的方法
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
- StampedLock
在读的时候如果发生了写,应该通过重试的方式来获取新的值,而不应该阻塞写操作。 这种模式也就是典型的无锁编程思想,和CAS自旋的思想一样。
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
// 写锁的使用
void move(double deltaX, double deltaY) {
long stamp = sl.writeLock(); // 获取写锁
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp); // 释放写锁
}
}
// 乐观读锁的使用
double distanceFromOrigin() {
long stamp = sl.tryOptimisticRead(); // 获取乐观读锁
double currentX = x, currentY = y;
if (!sl.validate(stamp)) { // //检查乐观读锁后是否有其他写锁发生,有则返回false
stamp = sl.readLock(); // 获取一个悲观读锁
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp); // 释放悲观读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
// 悲观读锁以及读锁升级写锁的使用
void moveIfAtOrigin(double newX, double newY) {
long stamp = sl.readLock(); // 悲观读锁
try {
while (x == 0.0 && y == 0.0) {
// 读锁尝试转换为写锁:转换成功后相当于获取了写锁,转换失败相当于有写锁被占用
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) { // 如果转换成功
stamp = ws; // 读锁的票据更新为写锁的
x = newX;
y = newY;
break;
}
else { // 如果转换失败
sl.unlockRead(stamp); // 释放读锁
stamp = sl.writeLock(); // 强制获取写锁
}
}
} finally {
sl.unlock(stamp); // 释放所有锁
}
}
}
// 用于操作state后获取stamp的值
private static final int LG_READERS = 7;
private static final long RUNIT = 1L; //0000 0000 0001
private static final long WBIT = 1L << LG_READERS; //0000 1000 0000
private static final long RBITS = WBIT - 1L; //0000 0111 1111
private static final long RFULL = RBITS - 1L; //0000 0111 1110
private static final long ABITS = RBITS | WBIT; //0000 1111 1111
private static final long SBITS = ~RBITS; //1111 1000 0000
// 初始化时state的值
private static final long ORIGIN = WBIT << 1; //0001 0000 0000
// 锁共享变量state
private transient volatile long state;
// 读锁溢出时用来存储多出的读锁
private transient int readerOverflow;
并发容器集合
同步容器与并发容器
public class TestVector {
private Vector<String> vector;
//方法一
public Object getLast(Vector vector) {
int lastIndex = vector.size() - 1;
return vector.get(lastIndex);
}
//方法二
public void deleteLast(Vector vector) {
int lastIndex = vector.size() - 1;
vector.remove(lastIndex);
}
//方法三
public Object getLastSysnchronized(Vector vector) {
synchronized(vector){
int lastIndex = vector.size() - 1;
return vector.get(lastIndex);
}
}
//方法四
public void deleteLastSysnchronized(Vector vector) {
synchronized (vector){
int lastIndex = vector.size() - 1;
vector.remove(lastIndex);
}
}
}
并发容器类
- 并发Map
-
并发Queue
很难去开发一个通用并且没有并发瓶颈的线程安全的List
-
并发Set
CopyOnWrite容器
概念
写时复制的容器 内存开辟新单元给另一个线程 形成读写分离
CopyOnWriteArrayList
public boolean add(E e) {
// ReentrantLock加锁,保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 拷贝原容器,长度为原容器长度加一
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 在新副本上执行添加操作
newElements[len] = e;
// 将原容器引用指向新副本
setArray(newElements);
return true;
} finally {
// 解锁
lock.unlock();
}
}
public E remove(int index) {
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
// 如果要删除的是列表末端数据,拷贝前len-1个数据到新副本上,再切换引用
setArray(Arrays.copyOf(elements, len - 1));
else {
// 否则,将除要删除元素之外的其他元素拷贝到新副本中,并切换引用
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
// 解锁
lock.unlock();
}
}
public E get(int index) {
return get(getArray(), index);
}
private E get(Object[] a, int index) {
return (E) a[index];
}
CopyOnWrite的业务中实现
import java.util.Collection;
import java.util.Map;
import java.util.Set;
public class CopyOnWriteMap<K, V> implements Map<K, V>, Cloneable {
private volatile Map<K, V> internalMap;
public CopyOnWriteMap() {
internalMap = new HashMap<K, V>();
}
public V put(K key, V value) {
synchronized (this) {
Map<K, V> newMap = new HashMap<K, V>(internalMap);
V val = newMap.put(key, value);
internalMap = newMap;
return val;
}
}
public V get(Object key) {
return internalMap.get(key);
}
public void putAll(Map<? extends K, ? extends V> newData) {
synchronized (this) {
Map<K, V> newMap = new HashMap<K, V>(internalMap);
newMap.putAll(newData);
internalMap = newMap;
}
}
}
// 黑名单服务
public class BlackListServiceImpl {
// 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。
private static CopyOnWriteMap<String, Boolean> blackListMap =
new CopyOnWriteMap<String, Boolean>(1000);
public static boolean isBlackList(String id) {
return blackListMap.get(id) == null ? false : true;
}
public static void addBlackList(String id) {
blackListMap.put(id, Boolean.TRUE);
}
/**
* 批量添加黑名单
* (使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。
* 如使用上面代码里的addBlackList方法)
* @param ids
*/
public static void addBlackList(Map<String,Boolean> ids) {
blackListMap.putAll(ids);
}
}
通信工具类
Semaphore
-
Semaphore介绍 限制同时工作的线程数(信号量表示)
-
案例
public class SemaphoreDemo { static class MyThread implements Runnable { private int value; private Semaphore semaphore; public MyThread(int value, Semaphore semaphore) { this.value = value; this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); // 获取permit System.out.println(String.format("当前线程是%d, 还剩%d个资源,还有%d个线程在等待", value, semaphore.availablePermits(), semaphore.getQueueLength())); // 睡眠随机时间,打乱释放顺序 Random random =new Random(); Thread.sleep(random.nextInt(1000)); System.out.println(String.format("线程%d释放了资源", value)); } catch (InterruptedException e) { e.printStackTrace(); } finally{ semaphore.release(); // 释放permit } } } public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 10; i++) { new Thread(new MyThread(i, semaphore)).start(); } } } // 忽略中断 public void acquireUninterruptibly() public void acquireUninterruptibly(int permits) // 不进入等待队列,底层使用CAS public boolean tryAcquire public boolean tryAcquire(int permits) public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException public boolean tryAcquire(long timeout, TimeUnit unit)
Exchanger
用于两个线程交换数据
public class ExchangerDemo { public static void main(String[] args) throws InterruptedException { Exchanger<String> exchanger = new Exchanger<>(); new Thread(() -> { try { System.out.println("这是线程A,得到了另一个线程的数据:" + exchanger.exchange("这是来自线程A的数据")); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); System.out.println("这个时候线程A是阻塞的,在等待线程B的数据"); Thread.sleep(1000); new Thread(() -> { try { System.out.println("这是线程B,得到了另一个线程的数据:" + exchanger.exchange("这是来自线程B的数据")); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
CountDownLatch
- 概念
代表计数递减门闩 闭锁 原子变量
// 构造方法:
public CountDownLatch(int count)
public void await() // 等待
public boolean await(long timeout, TimeUnit unit) // 超时等待
public void countDown() // count - 1
public long getCount() // 获取当前还有多少count
-
案例
public class CountDownLatchDemo { // 定义前置任务线程 static class PreTaskThread implements Runnable { private String task; private CountDownLatch countDownLatch; public PreTaskThread(String task, CountDownLatch countDownLatch) { this.task = task; this.countDownLatch = countDownLatch; } @Override public void run() { try { Random random = new Random(); Thread.sleep(random.nextInt(1000)); System.out.println(task + " - 任务完成"); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { // 假设有三个模块需要加载 CountDownLatch countDownLatch = new CountDownLatch(3); // 主任务 new Thread(() -> { try { System.out.println("等待数据加载..."); System.out.println(String.format("还有%d个前置任务", countDownLatch.getCount())); countDownLatch.await(); System.out.println("数据加载完成,正式开始游戏!"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 前置任务 new Thread(new PreTaskThread("加载地图数据", countDownLatch)).start(); new Thread(new PreTaskThread("加载人物模型", countDownLatch)).start(); new Thread(new PreTaskThread("加载背景音乐", countDownLatch)).start(); } }
-
原理
- 计数值(count)实际上就是闭锁需要等待的线程数量。
- 这个值只能被设置一次
- CountDownLatch没有提供任何机制去重新设置这个计数值。
CyclicBarrier
CyclicBarrier拥有CountDownLatch的所有功能,还可以使用reset()方法重置屏障
public class CyclicBarrierDemo {
static class PreTaskThread implements Runnable {
private String task;
private CyclicBarrier cyclicBarrier;
public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
this.task = task;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
// 假设总共三个关卡
for (int i = 1; i < 4; i++) {
try {
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("关卡%d的任务%s完成", i, task));
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
cyclicBarrier.reset(); // 重置屏障
}
}
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
System.out.println("本关卡所有前置任务完成,开始游戏...");
});
new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();
new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();
new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();
}
}
// 构造方法
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
// 具体实现
}
private int dowait(boolean timed, long nanos)
Phaser
public class PhaserDemo {
static class PreTaskThread implements Runnable {
private String task;
private Phaser phaser;
public PreTaskThread(String task, Phaser phaser) {
this.task = task;
this.phaser = phaser;
}
@Override
public void run() {
for (int i = 1; i < 4; i++) {
try {
// 第二次关卡起不加载NPC,跳过
if (i >= 2 && "加载新手教程".equals(task)) {
continue;
}
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("关卡%d,需要加载%d个模块,当前模块【%s】",
i, phaser.getRegisteredParties(), task));
// 从第二个关卡起,不加载NPC
if (i == 1 && "加载新手教程".equals(task)) {
System.out.println("下次关卡移除加载【新手教程】模块");
phaser.arriveAndDeregister(); // 移除一个模块
} else {
phaser.arriveAndAwaitAdvance();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(4) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(String.format("第%d次关卡准备完成", phase + 1));
return phase == 3 || registeredParties == 0;
}
};
new Thread(new PreTaskThread("加载地图数据", phaser)).start();
new Thread(new PreTaskThread("加载人物模型", phaser)).start();
new Thread(new PreTaskThread("加载背景音乐", phaser)).start();
new Thread(new PreTaskThread("加载新手教程", phaser)).start();
}
}
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;
static final class QNode implements ForkJoinPool.ManagedBlocker {
// 实现代码
}
Fork/Join框架
概念
工作窃取算法
Fork/Join的具体实现
- ForkJoinTask
// 本文所有代码都引自Java 8
public final ForkJoinTask<V> fork() {
Thread t;
// ForkJoinWorkerThread是执行ForkJoinTask的专有线程,由ForkJoinPool管理
// 先判断当前线程是否是ForkJoin专有线程,如果是,则将任务push到当前线程所负责的队列里去
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
// 如果不是则将线程加入队列
// 没有显式创建ForkJoinPool的时候走这里,提交任务到默认的common线程池中
ForkJoinPool.common.externalPush(this);
return this;
}
public final V join() {
int s;
// doJoin()方法来获取当前任务的执行状态
if ((s = doJoin() & DONE_MASK) != NORMAL)
// 任务异常,抛出异常
reportException(s);
// 任务正常完成,获取返回值
return getRawResult();
}
/**
* doJoin()方法用来返回当前任务的执行状态
**/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
// 先判断任务是否执行完毕,执行完毕直接返回结果(执行状态)
return (s = status) < 0 ? s :
// 如果没有执行完毕,先判断是否是ForkJoinWorkThread线程
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
// 如果是,先判断任务是否处于工作队列顶端(意味着下一个就执行它)
// tryUnpush()方法判断任务是否处于当前工作队列顶端,是返回true
// doExec()方法执行任务
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
// 如果是处于顶端并且任务执行完毕,返回结果
tryUnpush(this) && (s = doExec()) < 0 ? s :
// 如果不在顶端或者在顶端却没未执行完毕,那就调用awitJoin()执行任务
// awaitJoin():使用自旋使任务执行完成,返回结果
wt.pool.awaitJoin(w, this, 0L) :
// 如果不是ForkJoinWorkThread线程,执行externalAwaitDone()返回任务结果
externalAwaitDone();
}
- ForkJoinPool
@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
// 任务队列
volatile WorkQueue[] workQueues
// 线程的运行状态
volatile int runState;
// 创建ForkJoinWorkerThread的默认工厂,可以通过构造函数重写
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
// 公用的线程池,其运行状态不受shutdown()和shutdownNow()的影响
static final ForkJoinPool common;
// 私有构造方法,没有任何安全检查和参数校验,由makeCommonPool直接调用
// 其他构造方法都是源自于此方法
// parallelism: 并行度,
// 默认调用java.lang.Runtime.availableProcessors() 方法返回可用处理器的数量
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory, // 工作线程工厂
UncaughtExceptionHandler handler, // 拒绝任务的handler
int mode, // 同步模式
String workerNamePrefix) { // 线程名prefix
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
}
Fork/Join的使用
public class FibonacciTest {
class Fibonacci extends RecursiveTask<Integer> {
int n;
public Fibonacci(int n) {
this.n = n;
}
// 主要的实现逻辑都在compute()里
@Override
protected Integer compute() {
// 这里先假设 n >= 0
if (n <= 1) {
return n;
} else {
// f(n-1)
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
// f(n-2)
Fibonacci f2 = new Fibonacci(n - 2);
f2.fork();
// f(n) = f(n-1) + f(n-2)
return f1.join() + f2.join();
}
}
}
@Test
public void testFib() throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
System.out.println("CPU核数:" + Runtime.getRuntime().availableProcessors());
long start = System.currentTimeMillis();
Fibonacci fibonacci = new Fibonacci(40);
Future<Integer> future = forkJoinPool.submit(fibonacci);
System.out.println(future.get());
long end = System.currentTimeMillis();
System.out.println(String.format("耗时:%d millis", end - start));
}
}
// 普通递归,复杂度为O(2^n)
public int plainRecursion(int n) {
if (n == 1 || n == 2) {
return 1;
} else {
return plainRecursion(n -1) + plainRecursion(n - 2);
}
}
@Test
public void testPlain() {
long start = System.currentTimeMillis();
int result = plainRecursion(40);
long end = System.currentTimeMillis();
System.out.println("计算结果:" + result);
System.out.println(String.format("耗时:%d millis", end -start));
}
// 通过循环来计算,复杂度为O(n)
private int computeFibonacci(int n) {
// 假设n >= 0
if (n <= 1) {
return n;
} else {
int first = 1;
int second = 1;
int third = 0;
for (int i = 3; i <= n; i ++) {
// 第三个数是前两个数之和
third = first + second;
// 前两个数右移
first = second;
second = third;
}
return third;
}
}
@Test
public void testComputeFibonacci() {
long start = System.currentTimeMillis();
int result = computeFibonacci(40);
long end = System.currentTimeMillis();
System.out.println("计算结果:" + result);
System.out.println(String.format("耗时:%d millis", end -start));
}
Stream并行计算原理
简介
使用Stream接口以及 lambda
表达式进行“流式计算”
Stream单线程串行计算
public class StreamDemo {
public static void main(String[] args) {
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
.reduce((a, b) -> {
System.out.println(String.format("%s: %d + %d = %d",
Thread.currentThread().getName(), a, b, a + b));
return a + b;
})
.ifPresent(System.out::println);
}
}
Stream多线程并行计算
public class StreamParallelDemo {
public static void main(String[] args) {
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
.parallel()
.reduce((a, b) -> {
System.out.println(String.format("%s: %d + %d = %d",
Thread.currentThread().getName(), a, b, a + b));
return a + b;
})
.ifPresent(System.out::println);
}
}
并行计算原理
public final S parallel() {
sourceStage.parallel = true;
return (S) this;
}
// ReferencePipeline抽象类的reduce方法
@Override
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
// 调用evaluate方法
return evaluate(ReduceOps.makeRef(accumulator));
}
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel() // 调用isParallel()判断是否使用并行模式
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
@Override
public final boolean isParallel() {
// 根据之前在parallel()方法设置的那个flag来判断。
return sourceStage.parallel;
}
// java.util.stream.ReduceOps.ReduceOp.evaluateParallel
@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
并行计算的性能提升
public class StreamParallelDemo {
public static void main(String[] args) {
System.out.println(String.format("本计算机的核数:%d", Runtime.getRuntime().availableProcessors()));
// 产生100w个随机数(1 ~ 100),组成列表
Random random = new Random();
List<Integer> list = new ArrayList<>(1000_0000);
for (int i = 0; i < 1000_0000; i++) {
list.add(random.nextInt(100));
}
long prevTime = getCurrentTime();
list.stream().reduce((a, b) -> a + b).ifPresent(System.out::println);
System.out.println(String.format("单线程计算耗时:%d", getCurrentTime() - prevTime));
prevTime = getCurrentTime();
list.stream().parallel().reduce((a, b) -> a + b).ifPresent(System.out::println);
System.out.println(String.format("多线程计算耗时:%d", getCurrentTime() - prevTime));
}
private static long getCurrentTime() {
return System.currentTimeMillis();
}
}
计划任务
ScheduledThreadPoolExecutor + 在给定的延迟之后运行任务 + 周期性重复执行任务
案例
public class ThreadPool {
private static final ScheduledExecutorService executor = new
ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());
private static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args){
// 新建一个固定延迟时间的计划任务
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
if (haveMsgAtCurrentTime()) {
System.out.println(df.format(new Date()));
System.out.println("大家注意了,我要发消息了");
}
}
}, 1, 1, TimeUnit.SECONDS);
}
public static boolean haveMsgAtCurrentTime(){
//查询数据库,有没有当前时间需要发送的消息
//这里省略实现,直接返回true
return true;
}
}
类结构
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
//……
}
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
方法
- schedule
// delay时长后执行任务command,该任务只执行一次
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// 这里的decorateTask方法仅仅返回第二个参数
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null, triggerTime(delay,unit)));
// 延时或者周期执行任务的主要方法,稍后统一说明
delayedExecute(t);
return t;
}
// 继承Comparable接口,表示该类对象支持排序
public interface Delayed extends Comparable<Delayed> {
// 返回该对象剩余时延
long getDelay(TimeUnit unit);
}
// 仅仅继承了Delayed和Future接口,自己没有任何代码
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
// 是否是周期任务,周期任务可被调度运行多次,非周期任务只被运行一次
boolean isPeriodic();
}
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
// 调用父类FutureTask的构造方法
super(r, result);
// time表示任务下次执行的时间
this.time = ns;
// 周期任务,正数表示按照固定速率,负数表示按照固定时延,0表示不是周期任务
this.period = period;
// 任务的编号
this.sequenceNumber = sequencer.getAndIncrement();
}
// 实现Delayed接口的getDelay方法,返回任务开始执行的剩余时间
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}
// Comparable接口的compareTo方法,比较两个任务的”大小”。
public int compareTo(Delayed other) {
if (other == this)
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
// 小于0,说明当前任务的执行时间点早于other,要排在延时队列other的前面
if (diff < 0)
return -1;
// 大于0,说明当前任务的执行时间点晚于other,要排在延时队列other的后面
else if (diff > 0)
return 1;
// 如果两个任务的执行时间点一样,比较两个任务的编号,编号小的排在队列前面,编号大的排在队列后面
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
// 如果任务类型不是ScheduledFutureTask,通过getDelay方法比较
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
// 任务执行完后,设置下次执行的时间
private void setNextRunTime() {
long p = period;
// p > 0,说明是固定速率运行的任务
// 在原来任务开始执行时间的基础上加上p即可
if (p > 0)
time += p;
// p < 0,说明是固定时延运行的任务,
// 下次执行时间在当前时间(任务执行完成的时间)的基础上加上-p的时间
else
time = triggerTime(-p);
}
public void run() {
boolean periodic = isPeriodic();
// 如果当前状态下不能执行任务,则取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 不是周期性任务,执行一次任务即可,调用父类的run方法
else if (!periodic)
ScheduledFutureTask.super.run();
// 是周期性任务,调用FutureTask的runAndReset方法,方法执行完成后
// 重新设置任务下一次执行的时间,并将该任务重新入队,等待再次被调度
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
- scheduledAtFixedRate
// 注意,固定速率和固定时延,传入的参数都是Runnable,也就是说这种定时任务是没有返回值的
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 创建一个有初始延时和固定周期的任务
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// outerTask表示将会重新入队的任务
sft.outerTask = t;
// 稍后说明
delayedExecute(t);
return t;
}
- delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 线程池已经关闭,调用拒绝执行处理器处理
if (isShutdown())
reject(task);
else {
// 将任务加入到等待队列
super.getQueue().add(task);
// 线程池已经关闭,且当前状态不能运行该任务,将该任务从等待队列移除并取消该任务
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 增加一个worker,就算corePoolSize=0也要增加一个worker
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
DelayedWorkQueue
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
// 队列初始容量
private static final int INITIAL_CAPACITY = 16;
// 数组用来存储定时任务,通过数组实现堆排序
private RunnableScheduledFuture[] queue = new RunnableScheduledFuture[INITIAL_CAPACITY];
// 当前在队首等待的线程
private Thread leader = null;
// 锁和监视器,用于leader线程
private final ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
// 其他代码,略
}
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 取堆顶的任务,堆顶是最近要执行的任务
RunnableScheduledFuture first = queue[0];
// 堆顶为空,线程要在条件available上等待
if (first == null)
available.await();
else {
// 堆顶任务还要多长时间才能执行
long delay = first.getDelay(TimeUnit.NANOSECONDS);
// 堆顶任务已经可以执行了,finishPoll会重新调整堆,使其满足最小堆特性,该方法设置任务在
// 堆中的index为-1并返回该任务
if (delay <= 0)
return finishPoll(first);
// 如果leader不为空,说明已经有线程成为leader并等待堆顶任务
// 到达执行时间,此时,其他线程都需要在available条件上等待
else if (leader != null)
available.await();
else {
// leader为空,当前线程成为新的leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 当前线程已经成为leader了,只需要等待堆顶任务到达执行时间即可
available.awaitNanos(delay);
} finally {
// 返回堆顶元素之前将leader设置为空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 通知其他在available条件等待的线程,这些线程可以去竞争成为新的leader
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture e = (RunnableScheduledFuture)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 队列元素已经大于等于数组的长度,需要扩容,新堆的容量是原来堆容量的1.5倍
if (i >= queue.length)
grow();
// 堆中元素增加1
size = i + 1;
// 调整堆
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 调整堆,使的满足最小堆,比较大小的方式就是上文提到的compareTo方法
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
// 通知其他在available条件上等待的线程,这些线程可以竞争成为新的leader
available.signal();
}
} finally {
lock.unlock();
}
return true;
}