Skip to content

JUC入门

基本概念

Thread 类和 Runnable 接口

// Runnable 接口
@FunctionalInterface
public interface Runnable {
    public abstract void run();
}


// Demo 类
public class Demo {
    public static class MyThread implements Runnable {
        @Override
        public void run() {
            System.out.println("MyThread");
        }
    }

    public static void main(String[] args) {

        new Thread(new MyThread()).start();

        // Java 8 函数式编程,可以省略MyThread类
        new Thread(() -> {
            System.out.println("Java 8 匿名内部类");
        }).start();
    }
}

Callable、Future与FutureTask

// 自定义Callable
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        // 模拟计算需要一秒
        Thread.sleep(1000);
        return 2;
    }
    public static void main(String args[]) throws Exception {
        // 使用
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        // 注意调用get方法会阻塞当前线程,直到得到结果。
        // 所以实际编码中建议使用可以设置超时时间的重载get方法。
        System.out.println(result.get());
    }
}

// Future
public abstract interface Future<V> {
    public abstract boolean cancel(boolean paramBoolean);
    public abstract boolean isCancelled();
    public abstract boolean isDone();
    public abstract V get() throws InterruptedException, ExecutionException;
    public abstract V get(long paramLong, TimeUnit paramTimeUnit)
            throws InterruptedException, ExecutionException, TimeoutException;
}

// FutureTask
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        // 模拟计算需要一秒
        Thread.sleep(1000);
        return 2;
    }
    public static void main(String args[]) throws Exception {
        // 使用
        ExecutorService executor = Executors.newCachedThreadPool();
        FutureTask<Integer> futureTask = new FutureTask<>(new Task());
        executor.submit(futureTask);
        System.out.println(futureTask.get());
    }
}

线程组和线程优先级

线程组(ThreadGroup)

public class Demo {
    public static void main(String[] args) {
        Thread testThread = new Thread(() -> {
            System.out.println("testThread当前线程组名字:" +
                    Thread.currentThread().getThreadGroup().getName());
            System.out.println("testThread线程名字:" +
                    Thread.currentThread().getName());
        });

        testThread.start();
    System.out.println("执行main所在线程的线程组名字: " + Thread.currentThread().getThreadGroup().getName());
        System.out.println("执行main方法线程名字:" + Thread.currentThread().getName());
    }
}

线程优先级

public class Demo {
    public static void main(String[] args) {
        Thread a = new Thread();
        System.out.println("我是默认线程优先级:"+a.getPriority());
        Thread b = new Thread();
        b.setPriority(10);
        System.out.println("我是设置过的线程优先级:"+b.getPriority());
    }
}

Java程序中对线程所设置的优先级只是给操作系统一个建议,操作系统不一定会采纳。而真正的调用顺序,是由操作系统的线程调度算法决定的。

public class Demo {
    public static class T1 extends Thread {
        @Override
        public void run() {
            super.run();
            System.out.println(String.format("当前执行的线程是:%s,优先级:%d",
                    Thread.currentThread().getName(),
                    Thread.currentThread().getPriority()));
        }
    }

    public static void main(String[] args) {
        IntStream.range(1, 10).forEach(i -> {
            Thread thread = new Thread(new T1());
            thread.setPriority(i);
            thread.start();
        });
    }
}

如果某个线程优先级大于线程所在线程组的最大优先级,那么该线程的优先级将会失效,取而代之的是线程组的最大优先级。

线程组的常用方法及数据结构

常用方法

// 获取当前的线程组
ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
// 复制一个线程组到一个线程数组(获取Thread信息)
Thread[] threads = new Thread[threadGroup.activeCount()];
threadGroup.enumerate(threads);

// 线程组统一异常处理
package com.func.axc.threadgroup;

public class ThreadGroupDemo {
    public static void main(String[] args) {
        ThreadGroup threadGroup1 = new ThreadGroup("group1") {
            // 继承ThreadGroup并重新定义以下方法
            // 在线程成员抛出unchecked exception
            // 会执行此方法
            public void uncaughtException(Thread t, Throwable e) {
                System.out.println(t.getName() + ": " + e.getMessage());
            }
        };

        // 这个线程是threadGroup1的一员
        Thread thread1 = new Thread(threadGroup1, new Runnable() {
            public void run() {
                // 抛出unchecked异常
                throw new RuntimeException("测试异常");
            }
        });

        thread1.start();
    }
}

数据结构

// 线程组数据结构
public class ThreadGroup implements Thread.UncaughtExceptionHandler {
    private final ThreadGroup parent; // 父亲ThreadGroup
    String name; // ThreadGroupr 的名称
    int maxPriority; // 线程最大优先级
    boolean destroyed; // 是否被销毁
    boolean daemon; // 是否守护线程
    boolean vmAllowSuspension; // 是否可以中断

    int nUnstartedThreads = 0; // 还未启动的线程
    int nthreads; // ThreadGroup中线程数目
    Thread threads[]; // ThreadGroup中的线程

    int ngroups; // 线程组数目
    ThreadGroup groups[]; // 线程组数组

    // 私有构造函数
    private ThreadGroup() {
        this.name = "system";
        this.maxPriority = Thread.MAX_PRIORITY;
        this.parent = null;
    }

    // 默认是以当前ThreadGroup传入作为parent  ThreadGroup,新线程组的父线程组是目前正在运行线程的线程组。
    public ThreadGroup(String name) {
        this(Thread.currentThread().getThreadGroup(), name);
    }

    // 构造函数
    public ThreadGroup(ThreadGroup parent, String name) {
        this(checkParentAccess(parent), parent, name);
    }

    // 私有构造函数,主要的构造函数
    private ThreadGroup(void unused, ThreadGroup parent, String name) {
        this.name = name;
        this.maxPriority = parent.maxPriority;
        this.daemon = parent.daemon;
        this.vmAllowSuspension = parent.vmAllowSuspension;
        this.parent = parent;
        parent.add(this);
    }

    // 检查parent ThreadGroup
    private static void checkParentAccess(ThreadGroup parent) {
        parent.checkAccess();
        return null;
    }

    // 判断当前运行的线程是否具有修改线程组的权限
    public final void checkAccess() {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkAccess(this);
        }
    }
}

线程状态转换

转换图

操作系统线程主要有以下三个状态: + 就绪状态(ready) + 线程正在等待使用CPU,经调度程序调用之后可进入running状态。 + 执行状态(running) + 线程正在使用CPU。 + 等待状态(waiting) + 线程经过等待事件的调用或者正在等待其他资源(如I/O)。

Java 线程有6个状态

// Thread.State 源码
public enum State {
    NEW,
    RUNNABLE,
    BLOCKED,
    WAITING,
    TIMED_WAITING,
    TERMINATED;
}

转换图

  • NEW
    • 创建了线程而并没有调用start()方法
  • RUNNABLE
    • Java线程的RUNNABLE状态包括了传统操作系统线程的ready和running两个状态
  • BLOCKED
    • 处于阻塞状态的线程正等待锁的释放以进入同步区
  • WAITING
    • 处于等待状态的线程变成RUNNABLE状态需要其他线程唤醒
  • TIMED_WAITING
    • 规定等待时间过后自动唤醒
    • 之后拥有了去争夺锁的资格
  • TERMINATED
    • 线程已执行完毕

BLOCKED与RUNNABLE状态的转换

@Test
public void blockedTest() {

    Thread a = new Thread(new Runnable() {
        @Override
        public void run() {
            testMethod();
        }
    }, "a");
    Thread b = new Thread(new Runnable() {
        @Override
        public void run() {
            testMethod();
        }
    }, "b");

    a.start();
    b.start();
    System.out.println(a.getName() + ":" + a.getState()); // 输出?
    System.out.println(b.getName() + ":" + b.getState()); // 输出?
}

// 同步方法争夺锁
private synchronized void testMethod() {
    try {
        Thread.sleep(2000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

WAITING状态与RUNNABLE状态的转换

public void blockedTest() {
    ······
    a.start();
    a.join();
    b.start();
    System.out.println(a.getName() + ":" + a.getState()); // 输出 TERMINATED
    System.out.println(b.getName() + ":" + b.getState());
}

TIMED_WAITING与RUNNABLE状态转换

public void blockedTest() {
    ······
    a.start();
    a.join(1000L);
    b.start();
    System.out.println(a.getName() + ":" + a.getState()); // 输出 TIEMD_WAITING
    System.out.println(b.getName() + ":" + b.getState());
}

线程中断

  • Thread.interrupt()
    • 中断线程。
    • 这里的中断线程并不会立即停止线程,而是设置线程的中断状态为true(默认是flase)
  • Thread.interrupted()
    • 测试当前线程是否被中断。
    • 线程的中断状态受这个方法的影响,意思是调用一次使线程中断状态设置为true,连续调用两次会使得这个线程的中断状态重新转为false
  • Thread.isInterrupted()
    • 测试当前线程是否被中断。
    • 与上面方法不同的是调用这个方法并不会影响线程的中断状态。

线程间的通信

锁与同步

  1. 无锁
public class NoneLock {

    static class ThreadA implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                System.out.println("Thread A " + i);
            }
        }
    }

    static class ThreadB implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                System.out.println("Thread B " + i);
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new ThreadA()).start();
        new Thread(new ThreadB()).start();
    }
}
  1. 有锁
public class ObjectLock {
    private static Object lock = new Object();

    static class ThreadA implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 100; i++) {
                    System.out.println("Thread A " + i);
                }
            }
        }
    }

    static class ThreadB implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 100; i++) {
                    System.out.println("Thread B " + i);
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Thread(new ThreadA()).start();
        Thread.sleep(10);
        new Thread(new ThreadB()).start();
    }
}

等待/通知机制

Java多线程的等待/通知机制是基于Object类的wait()方法和notify(), notifyAll()方法来实现的。

观察者模式

public class WaitAndNotify {
    private static Object lock = new Object();

    static class ThreadA implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 5; i++) {
                    try {
                        System.out.println("ThreadA: " + i);
                        lock.notify();
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                lock.notify();
            }
        }
    }

    static class ThreadB implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 5; i++) {
                    try {
                        System.out.println("ThreadB: " + i);
                        lock.notify();
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                lock.notify();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Thread(new ThreadA()).start();
        Thread.sleep(1000);
        new Thread(new ThreadB()).start();
    }
}

信号量

volatile关键字能够保证内存的可见性。

如果用volatile关键字声明了一个变量, 在一个线程里面改变了这个变量的值, 那其它线程是立马可见更改后的值的。 这里的信号量和操作系统里的有所区别,更像一种享元。

public class Signal {
    private static volatile int signal = 0;

    static class ThreadA implements Runnable {
        @Override
        public void run() {
            while (signal < 5) {
                if (signal % 2 == 0) {
                    System.out.println("threadA: " + signal);
                    signal++;
                }
            }
        }
    }

    static class ThreadB implements Runnable {
        @Override
        public void run() {
            while (signal < 5) {
                if (signal % 2 == 1) {
                    System.out.println("threadB: " + signal);
                    signal = signal + 1;
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Thread(new ThreadA()).start();
        Thread.sleep(1000);
        new Thread(new ThreadB()).start();
    }
}

管道

一个线程需要先另一个线程发送一个信息

线程的责任链模式

public class Pipe {
    static class ReaderThread implements Runnable {
        private PipedReader reader;

        public ReaderThread(PipedReader reader) {
            this.reader = reader;
        }

        @Override
        public void run() {
            System.out.println("this is reader");
            int receive = 0;
            try {
                while ((receive = reader.read()) != -1) {
                    System.out.print((char)receive);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    static class WriterThread implements Runnable {

        private PipedWriter writer;

        public WriterThread(PipedWriter writer) {
            this.writer = writer;
        }

        @Override
        public void run() {
            System.out.println("this is writer");
            int receive = 0;
            try {
                writer.write("test");
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        PipedWriter writer = new PipedWriter();
        PipedReader reader = new PipedReader();
        writer.connect(reader); // 这里注意一定要连接,才能通信

        new Thread(new ReaderThread(reader)).start();
        Thread.sleep(1000);
        new Thread(new WriterThread(writer)).start();
    }
}

其它通信相关

  1. join方法

让当前线程陷入“等待”状态

public class Join {
    static class ThreadA implements Runnable {

        @Override
        public void run() {
            try {
                System.out.println("我是子线程,我先睡一秒");
                Thread.sleep(1000);
                System.out.println("我是子线程,我睡完了一秒");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new ThreadA());
        thread.start();
        thread.join();
        System.out.println("如果不加join方法,我会先被打出来,加了就不一样了");
    }
}
  1. sleep方法

让当前线程睡眠一段时间 sleep方法是不会释放当前的锁的,而wait方法会

  • wait可以指定时间,也可以不指定;而sleep必须指定时间。
  • wait释放cpu资源,同时释放锁;sleep释放cpu资源,但是不释放锁,所以易死锁。
  • wait必须放在同步块或同步方法中,而sleep可以在任意位置

  • ThreadLocal类

ThreadLocal类并不属于多线程间的通信,而是让每个线程有自己”独立“的变量

public class ThreadLocalDemo {
    static class ThreadA implements Runnable {
        private ThreadLocal<String> threadLocal;

        public ThreadA(ThreadLocal<String> threadLocal) {
            this.threadLocal = threadLocal;
        }

        @Override
        public void run() {
            threadLocal.set("A");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("ThreadA输出:" + threadLocal.get());
        }

        static class ThreadB implements Runnable {
            private ThreadLocal<String> threadLocal;

            public ThreadB(ThreadLocal<String> threadLocal) {
                this.threadLocal = threadLocal;
            }

            @Override
            public void run() {
                threadLocal.set("B");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("ThreadB输出:" + threadLocal.get());
            }
        }

        public static void main(String[] args) {
            ThreadLocal<String> threadLocal = new ThreadLocal<>();
            new Thread(new ThreadA(threadLocal)).start();
            new Thread(new ThreadB(threadLocal)).start();
        }
    }
}

原理构造

Java 内存模型基础知识

两个关键问题

  • 线程间如何通信
    • 线程之间以何种机制来交换信息
  • 线程间如何同步
    • 线程以何种机制来控制不同线程间操作发生的相对顺序

有两种并发模型可以解决这两个问题 消息传递并发模型 共享内存并发模型

并发模型比较

内存模型的抽象结构

  1. 运行时内存的划分

对于每一个线程来说,栈都是私有的,而堆是共有的。

Java运行时

  1. 内存不可见问题

计算机为了高效,往往会在高速缓存区中缓存共享变量

Java运行时

线程A无法直接访问线程B的工作内存,线程间通信必须经过主内存。 线程对共享变量的所有操作都必须在自己的本地内存中进行,不能直接从主内存中读取。

  1. 内存区域划分的区别与联系

  2. 区别

    • 两者是不同的概念层次。
    • JMM是抽象的,他是用来描述一组规则,通过这个规则来控制各个变量的访问方式,围绕原子性、有序性、可见性等展开的。
    • Java运行时内存的划分是具体的,是JVM运行Java程序时,必要的内存划分。
  3. 联系

    • 都存在私有数据区域和共享数据区域。
    • JMM中的主内存属于共享数据区域,他是包含了堆和方法区;
    • JMM中的本地内存属于私有数据区域,包含了程序计数器、本地方法栈、虚拟机栈。

重排序与happens-before

重排序的意义

避免流水线中断将运算符放到后面 编译成更快的机器指令 解释器模式

顺序一致性

  1. 数据竞争 当程序未正确同步的时候,就可能存在数据竞争。那么运行的结果往往充满了不确定性,比如读发生在了写之前,可能就会读到错误的值

  2. 顺序一致性模型

  3. 一个线程中的所有操作必须按照程序的顺序(即Java代码的顺序)来执行。
  4. 不管程序是否同步,所有线程都只能看到一个单一的操作执行顺序。即在顺序一致性模型中,每个操作必须是原子性的,且立刻对所有线程可见。

happens-before

  • 如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。
  • 两个操作之间存在happens-before关系,并不意味着Java平台的具体实现必须要按照happens-before关系指定的顺序来执行。如果重排序之后的执行结果,与按happens-before关系来执行的结果一致,那么JMM也允许这样的重排序。

happens-before关系

  • 程序顺序规则
    • 一个线程中的每一个操作,happens-before于该线程中的任意后续操作。
  • 监视器锁规则
    • 对一个锁的解锁,happens-before于随后对这个锁的加锁。
  • volatile变量规则
    • 对一个volatile域的写,happens-before于任意后续对这个volatile域的读。
  • 传递性
    • 如果A happens-before B,且B happens-before C,那么A happens-before C。
  • start规则
    • 如果线程A执行操作ThreadB.start()启动线程B,那么A线程的ThreadB.start()操作happens-before于线程B中的任意操作、
  • join规则
    • 如果线程A执行操作ThreadB.join()并成功返回,那么线程B中的任意操作happens-before于线程A从ThreadB.join()操作成功返回。

volatile

内存可见性

指的是线程之间的可见性,当一个线程修改了共享变量时,另一个线程可以读取到这个修改后的值。

public class VolatileExample {
    int a = 0;
    volatile boolean flag = false;

    public void writer() {
        a = 1; // step 1
        flag = true; // step 2
    }

    public void reader() {
        if (flag) { // step 3
            System.out.println(a); // step 4
        }
    }
}

内存示意图

禁止重排序

编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。

内存屏障

// 声明变量
int a = 0; // 声明普通变量
volatile boolean flag = false; // 声明volatile变量

// 以下两个变量的读操作是可以重排序的
int i = a; // 普通变量读
boolean j = flag; // volatile变量读

volatile的用途

在功能上,锁比volatile更强大;在性能上,volatile更有优势。

public class Singleton {

    private static Singleton instance; // 不使用volatile关键字

    // 双重锁检验
    public static Singleton getInstance() {
        if (instance == null) { // 第7行
            synchronized (Singleton.class) {
                if (instance == null) {
                    instance = new Singleton(); // 第10行
                }
            }
        }
        return instance;
    }
}

instance = new Singleton(); // 第10行

// 可以分解为以下三个步骤
1 memory=allocate();// 分配内存 相当于c的malloc
2 ctorInstanc(memory) //初始化对象
3 s=memory //设置s指向刚分配的地址

// 上述三个步骤可能会被重排序为 1-3-2,也就是:
1 memory=allocate();// 分配内存 相当于c的malloc
3 s=memory //设置s指向刚分配的地址
2 ctorInstanc(memory) //初始化对象

synchronized与锁

Java类只有一个Class对象(可以有多个实例对象,多个实例共享这个Class对象),而Class对象也是特殊的Java对象。 所以我们常说的类锁,其实就是Class对象的锁。

Synchronized关键字

// 关键字在实例方法上,锁为当前实例
public synchronized void instanceLock() {
    // code
}

// 关键字在静态方法上,锁为当前Class对象
public static synchronized void classLock() {
    // code
}

// 关键字在代码块上,锁为括号里面的对象
public void blockLock() {
    Object o = new Object();
    synchronized (o) {
        // code
    }
}

几种锁

  • 无锁状态
  • 偏向锁状态
  • 轻量级锁状态
  • 重量级锁状态

偏向锁

  1. 为何会引入

锁不仅不存在多线程竞争,而且总是由同一线程多次获得

偏向锁会偏向于第一个访问锁的线程,如果在接下来的运行过程中,该锁没有被其他的线程访问,则持有偏向锁的线程将永远不需要触发同步。

也就是说,偏向锁在资源无竞争情况下消除了同步语句,连CAS操作都不做了,提高了程序的运行性能。

流程图

  1. 获得与撤消

流程图

  1. 轻量级锁 流程图

  2. 重量级锁

    重量级锁依赖于操作系统的互斥量(mutex) 实现

  3. 优缺点对比 | 锁 | 优点 | 缺点 | 适用场景 | |----------|--------------------------------------------------------------------|--------------------------------------------------|--------------------------------------| | 偏向锁 | 加锁和解锁不需要额外的消耗,和执行非同步方法比仅存在纳秒级的差距。 | 如果线程间存在锁竞争,会带来额外的锁撤销的消耗。 | 适用于只有一个线程访问同步块场景。 | | 轻量级锁 | 竞争的线程不会阻塞,提高了程序的响应速度。 | 如果始终得不到锁竞争的线程使用自旋会消耗CPU。 | 追求响应时间。同步块执行速度非常快。 | | 重量级锁 | 线程竞争不使用自旋,不会消耗CPU。 | 线程阻塞,响应时间缓慢。 | 追求吞吐量。同步块执行时间较长。 |

CAS与原子操作

乐观锁与悲观锁

  • 悲观锁

    • 悲观锁就是我们常说的锁。
    • 对于悲观锁来说,它总是认为每次访问共享资源时会发生冲突,所以必须对每次数据操作加上锁,
    • 以保证临界区的程序同一时间只能有一个线程在执行。
  • 乐观锁

    • 乐观锁又称为“无锁”,顾名思义,它是乐观派。
    • 乐观锁总是假设对共享资源的访问没有冲突,线程可以不停地执行,无需加锁也无需等待。
    • 而一旦多个线程发生冲突,乐观锁通常是使用一种称为CAS的技术来保证线程执行的安全性。

CAS的概念

  • CAS的全称是 比较并交换(Compare And Swap)。
  • 在CAS中,有这样三个值
    • V:要更新的变量(var)
    • E:预期值(expected)
    • N:新值(new)

当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败, 但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。

Java实现CAS

使用Unsafe类,它在sun.misc包中。它里面是一些native方法,其中就有几个关于CAS的:

boolean compareAndSwapObject(Object o, long offset,Object expected, Object x);
boolean compareAndSwapInt(Object o, long offset,int expected,int x);
boolean compareAndSwapLong(Object o, long offset,long expected,long x);

原子操作

JDK提供了一些用于原子操作的类,在java.util.concurrent.atomic包下面 原子类

三个问题

  1. ABA 问题 一个值原来是A,变成了B,又变回了A。这个时候使用CAS是检查不出变化的,但实际上却被更新了两次。
// 使用版本号或者时间戳
// class AtomicStampedReference
public boolean compareAndSet(V   expectedReference,
                             V   newReference,
                             int expectedStamp,
                             int newStamp) {
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&
        expectedStamp == current.stamp &&
        ((newReference == current.reference &&
          newStamp == current.stamp) ||
         casPair(current, Pair.of(newReference, newStamp)));
}
  1. 循环时间长开销大 CAS多与自旋结合。如果自旋CAS长时间不成功,会占用大量的CPU资源。 解决思路是让JVM支持处理器提供的 pause 指令。

  2. 只能保证一个共享变量的原子操作

  3. 使用 AtomicReference
  4. 使用锁

AQS

简介

  • AQS是AbstractQueuedSynchronizer的简称,即抽象队列同步器,从字面意思上理解:
    • 抽象:抽象类,只实现一些主要逻辑,有些方法由子类实现;
    • 队列:使用先进先出(FIFO)队列存储数据;
    • 同步:实现了同步的功能。

数据结构

结构图

资源共享模式

  • 两种共享模式(两种同步方式)
    • 独占模式(Exclusive) > 资源是独占的,一次只能一个线程获取。 > 如ReentrantLock。
    • 共享模式(Share) > 同时可以被多个线程获取,具体的资源个数可以通过参数指定。 > 如Semaphore/CountDownLatch。
static final class Node {
    // 标记一个结点(对应的线程)在共享模式下等待
    static final Node SHARED = new Node();
    // 标记一个结点(对应的线程)在独占模式下等待
    static final Node EXCLUSIVE = null;

    // waitStatus的值,表示该结点(对应的线程)已被取消
    static final int CANCELLED = 1;
    // waitStatus的值,表示后继结点(对应的线程)需要被唤醒
    static final int SIGNAL = -1;
    // waitStatus的值,表示该结点(对应的线程)在等待某一条件
    static final int CONDITION = -2;
    /*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/
    static final int PROPAGATE = -3;

    // 等待状态,取值范围,-3,-2,-1,0,1
    volatile int waitStatus;
    volatile Node prev; // 前驱结点
    volatile Node next; // 后继结点
    volatile Thread thread; // 结点对应的线程
    Node nextWaiter; // 等待队列里下一个等待条件的结点


    // 判断共享模式的方法
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    // 其它方法忽略,可以参考具体的源码
}

// AQS里面的addWaiter私有方法
private Node addWaiter(Node mode) {
    // 使用了Node的这个构造函数
    Node node = new Node(Thread.currentThread(), mode);
    // 其它代码省略
}

源码解析

AQS的设计基于模板方法模式,它有一些方法必须要子类去实现的。

  • isHeldExclusively()
    • 该线程是否正在独占资源。
    • 只有用到condition才需要去实现它。
  • tryAcquire(int)
    • 独占方式。
    • 尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int)
    • 独占方式。
    • 尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int)
    • 共享方式。
    • 尝试获取资源,负数表示失败;
    • 0表示成功,但没有剩余可用资源;
    • 正数表示成功,且有剩余资源。
  • tryReleaseShared(int)
    • 共享方式。
    • 尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

获取资源

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

private Node addWaiter(Node mode) {
    // 生成该线程对应的Node节点
    Node node = new Node(Thread.currentThread(), mode);
    // 将Node插入队列中
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 使用CAS尝试,如果成功就返回
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果等待队列为空或者上述CAS失败,再自旋CAS插入
    enq(node);
    return node;
}

// 自旋CAS插入等待队列
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋
        for (;;) {
            final Node p = node.predecessor();
            // 如果node的前驱结点p是head,表示node是第二个结点,就可以尝试去获取资源了
            if (p == head && tryAcquire(arg)) {
                // 拿到资源后,将head指向该结点。
                // 所以head所指的结点,就是当前获取到资源的那个结点或null。
                setHead(node); 
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果自己可以休息了,就进入waiting状态,直到被unpark()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

结点进入等待队列后,是调用park使它进入阻塞状态的。只有头结点的线程是处于活跃状态的。

流程图

释放资源

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    // 如果状态是负数,尝试把它设置为0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 得到头结点的后继结点head.next
    Node s = node.next;
    // 如果这个后继结点为空或者状态大于0
    // 通过前面的定义我们知道,大于0只有一种可能,就是这个结点已被取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 等待队列中所有还有用的结点,都向前移动
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果后继结点不为空,
    if (s != null)
        LockSupport.unpark(s.thread);
}

工具类

线程池原理

为什么要使用线程池

  • 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程。
  • 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃。(主要原因)
  • 可以对线程做统一管理。

线程池原理与实现接口

Java中的线程池顶层接口是Executor接口,ThreadPoolExecutor是这个接口的实现类。

  1. ThreadPoolExecutor 结构
// 五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)

// 六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

// 六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)

// 七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)


static class DefaultThreadFactory implements ThreadFactory {
    // 省略属性
    // 构造函数
    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
        Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
            poolNumber.getAndIncrement() +
            "-thread-";
    }

    // 省略
}

2. ThreadPoolExecutor的策略
ThreadPoolExecutor类中定义了一个volatile int变量runState来表示线程池的状态 
分别为`RUNNING`、`SHUTDOWN`、`STOP`、`TIDYING` 、`TERMINATED`。

+ 线程池创建后处于RUNNING状态
+ 调用shutdown()方法后处于SHUTDOWN状态
    + 线程池不能接受新的任务清除一些空闲worker,会等待阻塞队列的任务完成
+ 调用shutdownNow()方法后处于STOP状态
    + 线程池不能接受新的任务中断所有线程阻塞队列中没有被执行的任务全部丢弃
    + 此时poolsize=0,阻塞队列的size也为0
+ 当所有的任务已终止ctl记录的任务数量为0线程池会变为TIDYING状态
    + 接着会执行terminated()函数
  1. 主要的任务处理流程
// JDK 1.8 
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();   
    int c = ctl.get();
    // 1.当前线程数小于corePoolSize,则调用addWorker创建核心线程执行任务
    if (workerCountOf(c) < corePoolSize) {
       if (addWorker(command, true))
           return;
       c = ctl.get();
    }
    // 2.如果不小于corePoolSize,则将任务添加到workQueue队列。
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 2.1 如果isRunning返回false(状态检查),则remove这个任务,然后执行拒绝策略。
        if (! isRunning(recheck) && remove(command))
            reject(command);
            // 2.2 线程池处于running状态,但是没有线程,则创建线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3.如果放入workQueue失败,则创建非核心线程执行任务,
    // 如果这时创建非核心线程失败(当前线程总数不小于maximumPoolSize时),就会执行拒绝策略。
    else if (!addWorker(command, false))
         reject(command);
}

流程图

  1. 多路复用
// ThreadPoolExecutor.addWorker方法源码上半部分
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                // 1.如果core是ture,证明需要创建的线程为核心线程,则先判断当前线程是否大于核心线程
                // 如果core是false,证明需要创建的是非核心线程,则先判断当前线程数是否大于总线程数
                // 如果不小于,则返回false
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

// ThreadPoolExecutor.addWorker方法源码下半部分
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 1.创建一个worker对象
        w = new Worker(firstTask);
        // 2.实例化一个Thread对象
        final Thread t = w.thread;
        if (t != null) {
            // 3.线程池全局锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 4.启动这个线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

// Worker类部分源码
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;
    Runnable firstTask;

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
            runWorker(this);
    }
    //其余代码略...
}

// Worker.runWorker方法源代码
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 1.线程启动之后,通过unlock方法释放锁
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 2.Worker执行firstTask或从workQueue中获取任务,如果getTask方法不返回null,循环不退出
        while (task != null || (task = getTask()) != null) {
            // 2.1进行加锁操作,保证thread不被其他线程中断(除非线程池被中断)
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 2.2检查线程池状态,倘若线程池处于中断状态,当前线程将中断。 
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 2.3执行beforeExecute 
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 2.4执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 2.5执行afterExecute方法 
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                // 2.6解锁操作
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

// Worker.getTask方法源码
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // 1.allowCoreThreadTimeOut变量默认是false,核心线程即使空闲也不会被销毁
        // 如果为true,核心线程在keepAliveTime内仍空闲则会被销毁。 
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 2.如果运行线程数超过了最大线程数,但是缓存队列已经空了,这时递减worker数量。 
     // 如果有设置允许线程超时或者线程数量超过了核心线程数量,
        // 并且线程在规定时间内均未poll到任务且队列为空则递减worker数量
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 3.如果timed为true(想想哪些情况下timed为true),则会调用workQueue的poll方法获取任务.
            // 超时时间是keepAliveTime。如果超过keepAliveTime时长,
            // poll返回了null,上边提到的while循序就会退出,线程也就执行完了。
            // 如果timed为false(allowCoreThreadTimeOut为falsefalse
            // 且wc > corePoolSize为false),则会调用workQueue的take方法阻塞在当前。
            // 队列中有任务加入时,线程被唤醒,take方法返回任务,并执行。
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

四种常见的线程池

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

阻塞队列

  • 在生产者-消费者模式中,因为需要让多个线程操作共享变量(即资源),所以很容易引发线程安全问题,造成重复消费和死锁。
  • 阻塞队列(BlockingQueue),你只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。

实现类

public ArrayBlockingQueue(int capacity, boolean fair){
    //..省略代码
}

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    this.lock = new ReentrantLock(); //默认构造方法-非公平锁
    ...//其余代码略
}

阻塞队列原理

//数据元素数组
final Object[] items;
//下一个待取出元素索引
int takeIndex;
//下一个待添加元素索引
int putIndex;
//元素个数
int count;
//内部锁
final ReentrantLock lock;
//消费者监视器
private final Condition notEmpty;
//生产者监视器
private final Condition notFull;  

public ArrayBlockingQueue(int capacity, boolean fair) {
    //..省略其他代码
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}


public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 1.自旋拿锁
    lock.lockInterruptibly();
    try {
        // 2.判断队列是否满了
        while (count == items.length)
            // 2.1如果满了,阻塞该线程,并标记为notFull线程,
            // 等待notFull的唤醒,唤醒之后继续执行while循环。
            notFull.await();
        // 3.如果没有满,则进入队列
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 4 唤醒一个等待的线程
    notEmpty.signal();
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

使用场景

public class Test {
    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);

    public static void main(String[] args)  {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();

        producer.start();
        consumer.start();
    }

    class Consumer extends Thread{

        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while(true){
                try {
                    queue.take();
                    System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Producer extends Thread{

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while(true){
                try {
                    queue.put(1);
                    System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

锁接口和类

synchronized的不足之处

  • 如果临界区是只读操作,其实可以多线程一起执行,但使用synchronized的话,同一时间只能有一个线程执行。
  • synchronized无法知道线程有没有成功获取到锁
  • 使用synchronized,如果临界区因为IO或者sleep方法等原因阻塞了,而当前线程又没有释放锁,就会导致所有线程等待。

锁的几种分类

  • 可重入锁和非可重入锁
  • 公平锁与非公平锁
  • 读写锁和排它锁

JDK中有关锁的一些接口和类

  1. 抽象类AQS/AQLS/AOS

    // 独占模式,锁的持有者  
    private transient Thread exclusiveOwnerThread;  
    
    // 设置锁持有者  
    protected final void setExclusiveOwnerThread(Thread t) {  
        exclusiveOwnerThread = t;  
    }  
    
    // 获取锁的持有线程  
    protected final Thread getExclusiveOwnerThread() {  
        return exclusiveOwnerThread;  
    }
    

  2. 接口Condition/Lock/ReadWriteLock

    public interface ReadWriteLock {
        Lock readLock();
        Lock writeLock();
    }
    
    Condition newCondition();
    

  3. ReentrantLock

    ReentrantLock是一个非抽象类,它是Lock接口的JDK默认实现,实现了锁的基本功能。

  4. ReentrantReadWriteLock

// 内部结构
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
    // 具体实现
}
static final class NonfairSync extends Sync {
    // 具体实现
}
static final class FairSync extends Sync {
    // 具体实现
}
public static class ReadLock implements Lock, java.io.Serializable {
    private final Sync sync;
    protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
    }
    // 具体实现
}
public static class WriteLock implements Lock, java.io.Serializable {
    private final Sync sync;
    protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
    }
    // 具体实现
}

// 构造方法,初始化两个锁
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

// 获取读锁和写锁的方法
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
  1. StampedLock

    在读的时候如果发生了写,应该通过重试的方式来获取新的值,而不应该阻塞写操作。 这种模式也就是典型的无锁编程思想,和CAS自旋的思想一样。

class Point {
   private double x, y;
   private final StampedLock sl = new StampedLock();

   // 写锁的使用
   void move(double deltaX, double deltaY) {
     long stamp = sl.writeLock(); // 获取写锁
     try {
       x += deltaX;
       y += deltaY;
     } finally {
       sl.unlockWrite(stamp); // 释放写锁
     }
   }

   // 乐观读锁的使用
   double distanceFromOrigin() {
     long stamp = sl.tryOptimisticRead(); // 获取乐观读锁
     double currentX = x, currentY = y;
     if (!sl.validate(stamp)) { // //检查乐观读锁后是否有其他写锁发生,有则返回false
        stamp = sl.readLock(); // 获取一个悲观读锁
        try {
          currentX = x;
          currentY = y;
        } finally {
           sl.unlockRead(stamp); // 释放悲观读锁
        }
     }
     return Math.sqrt(currentX * currentX + currentY * currentY);
   }

   // 悲观读锁以及读锁升级写锁的使用
   void moveIfAtOrigin(double newX, double newY) {
     long stamp = sl.readLock(); // 悲观读锁
     try {
       while (x == 0.0 && y == 0.0) {
         // 读锁尝试转换为写锁:转换成功后相当于获取了写锁,转换失败相当于有写锁被占用
         long ws = sl.tryConvertToWriteLock(stamp); 

         if (ws != 0L) { // 如果转换成功
           stamp = ws; // 读锁的票据更新为写锁的
           x = newX;
           y = newY;
           break;
         }
         else { // 如果转换失败
           sl.unlockRead(stamp); // 释放读锁
           stamp = sl.writeLock(); // 强制获取写锁
         }
       }
     } finally {
       sl.unlock(stamp); // 释放所有锁
     }
   }
}

// 用于操作state后获取stamp的值
private static final int LG_READERS = 7;
private static final long RUNIT = 1L;               //0000 0000 0001
private static final long WBIT  = 1L << LG_READERS; //0000 1000 0000
private static final long RBITS = WBIT - 1L;        //0000 0111 1111
private static final long RFULL = RBITS - 1L;       //0000 0111 1110
private static final long ABITS = RBITS | WBIT;     //0000 1111 1111
private static final long SBITS = ~RBITS;           //1111 1000 0000

// 初始化时state的值
private static final long ORIGIN = WBIT << 1;       //0001 0000 0000

// 锁共享变量state
private transient volatile long state;
// 读锁溢出时用来存储多出的读锁
private transient int readerOverflow;

并发容器集合

同步容器与并发容器

public class TestVector {
    private Vector<String> vector;

    //方法一
    public  Object getLast(Vector vector) {
        int lastIndex = vector.size() - 1;
        return vector.get(lastIndex);
    }

    //方法二
    public  void deleteLast(Vector vector) {
        int lastIndex = vector.size() - 1;
        vector.remove(lastIndex);
    }

    //方法三
    public  Object getLastSysnchronized(Vector vector) {
        synchronized(vector){
            int lastIndex = vector.size() - 1;
            return vector.get(lastIndex);
        }
    }

    //方法四
    public  void deleteLastSysnchronized(Vector vector) {
        synchronized (vector){
            int lastIndex = vector.size() - 1;
            vector.remove(lastIndex);
        }
    }

}

并发容器类

架构图

  1. 并发Map
    public interface ConcurrentMap<K, V> extends Map<K, V> {
    
        //插入元素
        V putIfAbsent(K key, V value);
    
        //移除元素
        boolean remove(Object key, Object value);
    
        //替换元素
        boolean replace(K key, V oldValue, V newValue);
    
        //替换元素
        V replace(K key, V value);
    
    }
    

结构图

  1. 并发Queue

    很难去开发一个通用并且没有并发瓶颈的线程安全的List

  2. 并发Set

    Set<String> s = Sets.newConcurrentHashSet();
    

CopyOnWrite容器

概念

写时复制的容器 内存开辟新单元给另一个线程 形成读写分离

CopyOnWriteArrayList

public boolean add(E e) {

    // ReentrantLock加锁,保证线程安全
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        // 拷贝原容器,长度为原容器长度加一
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        // 在新副本上执行添加操作
        newElements[len] = e;
        // 将原容器引用指向新副本
        setArray(newElements);
        return true;
    } finally {
        // 解锁
        lock.unlock();
    }
}

public E remove(int index) {

        // 加锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            E oldValue = get(elements, index);
            int numMoved = len - index - 1;
            if (numMoved == 0)
                // 如果要删除的是列表末端数据,拷贝前len-1个数据到新副本上,再切换引用
                setArray(Arrays.copyOf(elements, len - 1));
            else {
                // 否则,将除要删除元素之外的其他元素拷贝到新副本中,并切换引用
                Object[] newElements = new Object[len - 1];
                System.arraycopy(elements, 0, newElements, 0, index);
                System.arraycopy(elements, index + 1, newElements, index,
                                 numMoved);
                setArray(newElements);
            }
            return oldValue;
        } finally {
            // 解锁
            lock.unlock();
        }
    }


public E get(int index) {
    return get(getArray(), index);
}
private E get(Object[] a, int index) {
    return (E) a[index];
}

CopyOnWrite的业务中实现

import java.util.Collection;
import java.util.Map;
import java.util.Set;

public class CopyOnWriteMap<K, V> implements Map<K, V>, Cloneable {
    private volatile Map<K, V> internalMap;

    public CopyOnWriteMap() {
        internalMap = new HashMap<K, V>();
    }

    public V put(K key, V value) {
        synchronized (this) {
            Map<K, V> newMap = new HashMap<K, V>(internalMap);
            V val = newMap.put(key, value);
            internalMap = newMap;
            return val;
        }
    }

    public V get(Object key) {
        return internalMap.get(key);
    }

    public void putAll(Map<? extends K, ? extends V> newData) {
        synchronized (this) {
            Map<K, V> newMap = new HashMap<K, V>(internalMap);
            newMap.putAll(newData);
            internalMap = newMap;
        }
    }
}

// 黑名单服务
public class BlackListServiceImpl {
    // 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。
    private static CopyOnWriteMap<String, Boolean> blackListMap = 
        new CopyOnWriteMap<String, Boolean>(1000);

    public static boolean isBlackList(String id) {
        return blackListMap.get(id) == null ? false : true;
    }

    public static void addBlackList(String id) {
        blackListMap.put(id, Boolean.TRUE);
    }

    /**
     * 批量添加黑名单
     * (使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。
     * 如使用上面代码里的addBlackList方法)
     * @param ids
     */
    public static void addBlackList(Map<String,Boolean> ids) {
        blackListMap.putAll(ids);
    }

}

通信工具类

Semaphore

  1. Semaphore介绍 限制同时工作的线程数(信号量表示)

    // 默认情况下使用非公平
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    

  2. 案例

    public class SemaphoreDemo {
        static class MyThread implements Runnable {
    
            private int value;
            private Semaphore semaphore;
    
            public MyThread(int value, Semaphore semaphore) {
                this.value = value;
                this.semaphore = semaphore;
            }
    
            @Override
            public void run() {
                try {
                    semaphore.acquire(); // 获取permit
                    System.out.println(String.format("当前线程是%d, 还剩%d个资源,还有%d个线程在等待",
                            value, semaphore.availablePermits(), semaphore.getQueueLength()));
                    // 睡眠随机时间,打乱释放顺序
                    Random random =new Random();
                    Thread.sleep(random.nextInt(1000));
                    System.out.println(String.format("线程%d释放了资源", value));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally{
                    semaphore.release(); // 释放permit
                }
            }
        }
    
        public static void main(String[] args) {
            Semaphore semaphore = new Semaphore(3);
            for (int i = 0; i < 10; i++) {
                new Thread(new MyThread(i, semaphore)).start();
            }
        }
    }
    
    
    // 忽略中断
    public void acquireUninterruptibly()
    public void acquireUninterruptibly(int permits)
    
    // 不进入等待队列,底层使用CAS
    public boolean tryAcquire
    public boolean tryAcquire(int permits)
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
            throws InterruptedException
    public boolean tryAcquire(long timeout, TimeUnit unit)
    

Exchanger

用于两个线程交换数据

public class ExchangerDemo {
    public static void main(String[] args) throws InterruptedException {
        Exchanger<String> exchanger = new Exchanger<>();

        new Thread(() -> {
            try {
                System.out.println("这是线程A,得到了另一个线程的数据:"
                        + exchanger.exchange("这是来自线程A的数据"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        System.out.println("这个时候线程A是阻塞的,在等待线程B的数据");
        Thread.sleep(1000);

        new Thread(() -> {
            try {
                System.out.println("这是线程B,得到了另一个线程的数据:"
                        + exchanger.exchange("这是来自线程B的数据"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

CountDownLatch

  1. 概念

    代表计数递减门闩 闭锁 原子变量

// 构造方法:
public CountDownLatch(int count)

public void await() // 等待
public boolean await(long timeout, TimeUnit unit) // 超时等待
public void countDown() // count - 1
public long getCount() // 获取当前还有多少count
  1. 案例

    public class CountDownLatchDemo {
        // 定义前置任务线程
        static class PreTaskThread implements Runnable {
    
            private String task;
            private CountDownLatch countDownLatch;
    
            public PreTaskThread(String task, CountDownLatch countDownLatch) {
                this.task = task;
                this.countDownLatch = countDownLatch;
            }
    
            @Override
            public void run() {
                try {
                    Random random = new Random();
                    Thread.sleep(random.nextInt(1000));
                    System.out.println(task + " - 任务完成");
                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            // 假设有三个模块需要加载
            CountDownLatch countDownLatch = new CountDownLatch(3);
    
            // 主任务
            new Thread(() -> {
                try {
                    System.out.println("等待数据加载...");
                    System.out.println(String.format("还有%d个前置任务", countDownLatch.getCount()));
                    countDownLatch.await();
                    System.out.println("数据加载完成,正式开始游戏!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    
            // 前置任务
            new Thread(new PreTaskThread("加载地图数据", countDownLatch)).start();
            new Thread(new PreTaskThread("加载人物模型", countDownLatch)).start();
            new Thread(new PreTaskThread("加载背景音乐", countDownLatch)).start();
        }
    }
    

  2. 原理

  3. 计数值(count)实际上就是闭锁需要等待的线程数量。
  4. 这个值只能被设置一次
  5. CountDownLatch没有提供任何机制去重新设置这个计数值。

CyclicBarrier

CyclicBarrier拥有CountDownLatch的所有功能,还可以使用reset()方法重置屏障

public class CyclicBarrierDemo {
    static class PreTaskThread implements Runnable {

        private String task;
        private CyclicBarrier cyclicBarrier;

        public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
            this.task = task;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            // 假设总共三个关卡
            for (int i = 1; i < 4; i++) {
                try {
                    Random random = new Random();
                    Thread.sleep(random.nextInt(1000));
                    System.out.println(String.format("关卡%d的任务%s完成", i, task));
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                cyclicBarrier.reset(); // 重置屏障
            }
        }
    }

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
            System.out.println("本关卡所有前置任务完成,开始游戏...");
        });

        new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();
        new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();
        new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();
    }
}


// 构造方法
public CyclicBarrier(int parties) {
    this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
    // 具体实现
}


private int dowait(boolean timed, long nanos)

Phaser

public class PhaserDemo {
    static class PreTaskThread implements Runnable {

        private String task;
        private Phaser phaser;

        public PreTaskThread(String task, Phaser phaser) {
            this.task = task;
            this.phaser = phaser;
        }

        @Override
        public void run() {
            for (int i = 1; i < 4; i++) {
                try {
                    // 第二次关卡起不加载NPC,跳过
                    if (i >= 2 && "加载新手教程".equals(task)) {
                        continue;
                    }
                    Random random = new Random();
                    Thread.sleep(random.nextInt(1000));
                    System.out.println(String.format("关卡%d,需要加载%d个模块,当前模块【%s】",
                            i, phaser.getRegisteredParties(), task));

                    // 从第二个关卡起,不加载NPC
                    if (i == 1 && "加载新手教程".equals(task)) {
                        System.out.println("下次关卡移除加载【新手教程】模块");
                        phaser.arriveAndDeregister(); // 移除一个模块
                    } else {
                        phaser.arriveAndAwaitAdvance();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        Phaser phaser = new Phaser(4) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println(String.format("第%d次关卡准备完成", phase + 1));
                return phase == 3 || registeredParties == 0;
            }
        };

        new Thread(new PreTaskThread("加载地图数据", phaser)).start();
        new Thread(new PreTaskThread("加载人物模型", phaser)).start();
        new Thread(new PreTaskThread("加载背景音乐", phaser)).start();
        new Thread(new PreTaskThread("加载新手教程", phaser)).start();
    }
}


private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;

static final class QNode implements ForkJoinPool.ManagedBlocker {
    // 实现代码
}

Fork/Join框架

概念

结构图

工作窃取算法

结构图

Fork/Join的具体实现

  1. ForkJoinTask
// 本文所有代码都引自Java 8
public final ForkJoinTask<V> fork() {
    Thread t;
    // ForkJoinWorkerThread是执行ForkJoinTask的专有线程,由ForkJoinPool管理
    // 先判断当前线程是否是ForkJoin专有线程,如果是,则将任务push到当前线程所负责的队列里去
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
         // 如果不是则将线程加入队列
        // 没有显式创建ForkJoinPool的时候走这里,提交任务到默认的common线程池中
        ForkJoinPool.common.externalPush(this);
    return this;
}


public final V join() {
    int s;
    // doJoin()方法来获取当前任务的执行状态
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        // 任务异常,抛出异常
        reportException(s);
    // 任务正常完成,获取返回值
    return getRawResult();
}

/**
 * doJoin()方法用来返回当前任务的执行状态
 **/
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    // 先判断任务是否执行完毕,执行完毕直接返回结果(执行状态)
    return (s = status) < 0 ? s :
    // 如果没有执行完毕,先判断是否是ForkJoinWorkThread线程
    ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        // 如果是,先判断任务是否处于工作队列顶端(意味着下一个就执行它)
        // tryUnpush()方法判断任务是否处于当前工作队列顶端,是返回true
        // doExec()方法执行任务
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        // 如果是处于顶端并且任务执行完毕,返回结果
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        // 如果不在顶端或者在顶端却没未执行完毕,那就调用awitJoin()执行任务
        // awaitJoin():使用自旋使任务执行完成,返回结果
        wt.pool.awaitJoin(w, this, 0L) :
    // 如果不是ForkJoinWorkThread线程,执行externalAwaitDone()返回任务结果
    externalAwaitDone();
}

流程图

  1. ForkJoinPool
@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
    // 任务队列
    volatile WorkQueue[] workQueues

    // 线程的运行状态
    volatile int runState;

    // 创建ForkJoinWorkerThread的默认工厂,可以通过构造函数重写
    public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;

    // 公用的线程池,其运行状态不受shutdown()和shutdownNow()的影响
    static final ForkJoinPool common;

    // 私有构造方法,没有任何安全检查和参数校验,由makeCommonPool直接调用
    // 其他构造方法都是源自于此方法
    // parallelism: 并行度,
    // 默认调用java.lang.Runtime.availableProcessors() 方法返回可用处理器的数量
    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory, // 工作线程工厂
                         UncaughtExceptionHandler handler, // 拒绝任务的handler
                         int mode, // 同步模式
                         String workerNamePrefix) { // 线程名prefix
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

}

Fork/Join的使用

public class FibonacciTest {

    class Fibonacci extends RecursiveTask<Integer> {

        int n;

        public Fibonacci(int n) {
            this.n = n;
        }

        // 主要的实现逻辑都在compute()里
        @Override
        protected Integer compute() {
            // 这里先假设 n >= 0
            if (n <= 1) {
                return n;
            } else {
                // f(n-1)
                Fibonacci f1 = new Fibonacci(n - 1);
                f1.fork();
                // f(n-2)
                Fibonacci f2 = new Fibonacci(n - 2);
                f2.fork();
                // f(n) = f(n-1) + f(n-2)
                return f1.join() + f2.join();
            }
        }
    }

    @Test
    public void testFib() throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        System.out.println("CPU核数:" + Runtime.getRuntime().availableProcessors());
        long start = System.currentTimeMillis();
        Fibonacci fibonacci = new Fibonacci(40);
        Future<Integer> future = forkJoinPool.submit(fibonacci);
        System.out.println(future.get());
        long end = System.currentTimeMillis();
        System.out.println(String.format("耗时:%d millis", end - start));
    }


}


// 普通递归,复杂度为O(2^n)
public int plainRecursion(int n) {
    if (n == 1 || n == 2) {
        return 1;
    } else {
        return plainRecursion(n -1) + plainRecursion(n - 2);
    }
}

@Test
public void testPlain() {
    long start = System.currentTimeMillis();
    int result = plainRecursion(40);
    long end = System.currentTimeMillis();
    System.out.println("计算结果:" + result);
    System.out.println(String.format("耗时:%d millis",  end -start));
}


// 通过循环来计算,复杂度为O(n)
private int computeFibonacci(int n) {
    // 假设n >= 0
    if (n <= 1) {
        return n;
    } else {
        int first = 1;
        int second = 1;
        int third = 0;
        for (int i = 3; i <= n; i ++) {
            // 第三个数是前两个数之和
            third = first + second;
            // 前两个数右移
            first = second;
            second = third;
        }
        return third;
    }
}

@Test
public void testComputeFibonacci() {
    long start = System.currentTimeMillis();
    int result = computeFibonacci(40);
    long end = System.currentTimeMillis();
    System.out.println("计算结果:" + result);
    System.out.println(String.format("耗时:%d millis",  end -start));
}

Stream并行计算原理

简介

使用Stream接口以及 lambda 表达式进行“流式计算”

Stream单线程串行计算

public class StreamDemo {
    public static void main(String[] args) {
        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
                .reduce((a, b) -> {
                    System.out.println(String.format("%s: %d + %d = %d",
                            Thread.currentThread().getName(), a, b, a + b));
                    return a + b;
                })
                .ifPresent(System.out::println);
    }
}

Stream多线程并行计算

public class StreamParallelDemo {
    public static void main(String[] args) {
        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
                .parallel()
                .reduce((a, b) -> {
                    System.out.println(String.format("%s: %d + %d = %d",
                            Thread.currentThread().getName(), a, b, a + b));
                    return a + b;
                })
                .ifPresent(System.out::println);
    }
}

并行计算原理

public final S parallel() {
    sourceStage.parallel = true;
    return (S) this;
}

// ReferencePipeline抽象类的reduce方法
@Override
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
    // 调用evaluate方法
    return evaluate(ReduceOps.makeRef(accumulator));
}

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel() // 调用isParallel()判断是否使用并行模式
        ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
        : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

@Override
public final boolean isParallel() {
    // 根据之前在parallel()方法设置的那个flag来判断。
    return sourceStage.parallel;
}

// java.util.stream.ReduceOps.ReduceOp.evaluateParallel
@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                 Spliterator<P_IN> spliterator) {
    return new ReduceTask<>(this, helper, spliterator).invoke().get();
}

并行计算的性能提升

public class StreamParallelDemo {
    public static void main(String[] args) {
        System.out.println(String.format("本计算机的核数:%d", Runtime.getRuntime().availableProcessors()));

        // 产生100w个随机数(1 ~ 100),组成列表
        Random random = new Random();
        List<Integer> list = new ArrayList<>(1000_0000);

        for (int i = 0; i < 1000_0000; i++) {
            list.add(random.nextInt(100));
        }

        long prevTime = getCurrentTime();
        list.stream().reduce((a, b) -> a + b).ifPresent(System.out::println);
        System.out.println(String.format("单线程计算耗时:%d", getCurrentTime() - prevTime));

        prevTime = getCurrentTime();
        list.stream().parallel().reduce((a, b) -> a + b).ifPresent(System.out::println);
        System.out.println(String.format("多线程计算耗时:%d", getCurrentTime() - prevTime));

    }

    private static long getCurrentTime() {
        return System.currentTimeMillis();
    }
}

计划任务

ScheduledThreadPoolExecutor + 在给定的延迟之后运行任务 + 周期性重复执行任务

案例

public class ThreadPool {

    private static final ScheduledExecutorService executor = new
        ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());

    private static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args){
        // 新建一个固定延迟时间的计划任务
        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                if (haveMsgAtCurrentTime()) {
                    System.out.println(df.format(new Date()));
                    System.out.println("大家注意了,我要发消息了");
                }
            }
        }, 1, 1, TimeUnit.SECONDS);
    }

    public static boolean haveMsgAtCurrentTime(){
        //查询数据库,有没有当前时间需要发送的消息
        //这里省略实现,直接返回true
        return true;
    }
}

类结构

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
    implements ScheduledExecutorService {

    public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
    //……
}

public interface ScheduledExecutorService extends ExecutorService {

    public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

方法

  1. schedule
// delay时长后执行任务command,该任务只执行一次
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    // 这里的decorateTask方法仅仅返回第二个参数
    RunnableScheduledFuture<?> t = decorateTask(command,
                                           new ScheduledFutureTask<Void>(command, null, triggerTime(delay,unit)));
    // 延时或者周期执行任务的主要方法,稍后统一说明
    delayedExecute(t);
    return t;
}


// 继承Comparable接口,表示该类对象支持排序
public interface Delayed extends Comparable<Delayed> {
    // 返回该对象剩余时延
    long getDelay(TimeUnit unit);
}

// 仅仅继承了Delayed和Future接口,自己没有任何代码
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {      
    // 是否是周期任务,周期任务可被调度运行多次,非周期任务只被运行一次   
    boolean isPeriodic();
}
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    // 调用父类FutureTask的构造方法
    super(r, result);
    // time表示任务下次执行的时间
    this.time = ns;
    // 周期任务,正数表示按照固定速率,负数表示按照固定时延,0表示不是周期任务
    this.period = period;
    // 任务的编号
    this.sequenceNumber = sequencer.getAndIncrement();
}

// 实现Delayed接口的getDelay方法,返回任务开始执行的剩余时间
public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}

// Comparable接口的compareTo方法,比较两个任务的”大小”。
public int compareTo(Delayed other) {
    if (other == this)
      return 0;
    if (other instanceof ScheduledFutureTask) {
      ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
      long diff = time - x.time;
      // 小于0,说明当前任务的执行时间点早于other,要排在延时队列other的前面
      if (diff < 0)
        return -1;
      // 大于0,说明当前任务的执行时间点晚于other,要排在延时队列other的后面
      else if (diff > 0)
        return 1;
      // 如果两个任务的执行时间点一样,比较两个任务的编号,编号小的排在队列前面,编号大的排在队列后面
      else if (sequenceNumber < x.sequenceNumber)
        return -1;
      else
        return 1;
    }
    // 如果任务类型不是ScheduledFutureTask,通过getDelay方法比较
    long d = (getDelay(TimeUnit.NANOSECONDS) -
              other.getDelay(TimeUnit.NANOSECONDS));
    return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}

// 任务执行完后,设置下次执行的时间
private void setNextRunTime() {
    long p = period;
    // p > 0,说明是固定速率运行的任务
    // 在原来任务开始执行时间的基础上加上p即可
    if (p > 0)
      time += p;
    // p < 0,说明是固定时延运行的任务,
    // 下次执行时间在当前时间(任务执行完成的时间)的基础上加上-p的时间
    else
      time = triggerTime(-p);
}

public void run() {
    boolean periodic = isPeriodic();
    // 如果当前状态下不能执行任务,则取消任务
    if (!canRunInCurrentRunState(periodic))
      cancel(false);
    // 不是周期性任务,执行一次任务即可,调用父类的run方法
    else if (!periodic)
      ScheduledFutureTask.super.run();
    // 是周期性任务,调用FutureTask的runAndReset方法,方法执行完成后
    // 重新设置任务下一次执行的时间,并将该任务重新入队,等待再次被调度
    else if (ScheduledFutureTask.super.runAndReset()) {
      setNextRunTime();
      reExecutePeriodic(outerTask);
    }
}

类图

  1. scheduledAtFixedRate
// 注意,固定速率和固定时延,传入的参数都是Runnable,也就是说这种定时任务是没有返回值的
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
    if (command == null || unit == null)
      throw new NullPointerException();
    if (period <= 0)
      throw new IllegalArgumentException();
    // 创建一个有初始延时和固定周期的任务
    ScheduledFutureTask<Void> sft =
      new ScheduledFutureTask<Void>(command,
                                    null,
                                    triggerTime(initialDelay, unit),
                                    unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    // outerTask表示将会重新入队的任务
    sft.outerTask = t;
    // 稍后说明
    delayedExecute(t);
    return t;
}
  1. delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 线程池已经关闭,调用拒绝执行处理器处理
    if (isShutdown())
      reject(task);
    else {
      // 将任务加入到等待队列
      super.getQueue().add(task);
      // 线程池已经关闭,且当前状态不能运行该任务,将该任务从等待队列移除并取消该任务
      if (isShutdown() &&
          !canRunInCurrentRunState(task.isPeriodic()) &&
          remove(task))
        task.cancel(false);
      else
        // 增加一个worker,就算corePoolSize=0也要增加一个worker
        ensurePrestart();
    }
}


void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

DelayedWorkQueue

static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
    // 队列初始容量
    private static final int INITIAL_CAPACITY = 16;
    // 数组用来存储定时任务,通过数组实现堆排序
    private RunnableScheduledFuture[] queue = new RunnableScheduledFuture[INITIAL_CAPACITY];
    // 当前在队首等待的线程
    private Thread leader = null;
    // 锁和监视器,用于leader线程
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition available = lock.newCondition();
    // 其他代码,略
}


public RunnableScheduledFuture take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      for (;;) {
        // 取堆顶的任务,堆顶是最近要执行的任务
        RunnableScheduledFuture first = queue[0];
        // 堆顶为空,线程要在条件available上等待
        if (first == null)
          available.await();
        else {
          // 堆顶任务还要多长时间才能执行
          long delay = first.getDelay(TimeUnit.NANOSECONDS);
          // 堆顶任务已经可以执行了,finishPoll会重新调整堆,使其满足最小堆特性,该方法设置任务在
          // 堆中的index为-1并返回该任务
          if (delay <= 0)
            return finishPoll(first);
          // 如果leader不为空,说明已经有线程成为leader并等待堆顶任务
          // 到达执行时间,此时,其他线程都需要在available条件上等待
          else if (leader != null)
            available.await();
          else {
            // leader为空,当前线程成为新的leader
            Thread thisThread = Thread.currentThread();
            leader = thisThread;
            try {
              // 当前线程已经成为leader了,只需要等待堆顶任务到达执行时间即可
              available.awaitNanos(delay);
            } finally {
              // 返回堆顶元素之前将leader设置为空
              if (leader == thisThread)
                leader = null;
            }
          }
        }
      }
    } finally {
      // 通知其他在available条件等待的线程,这些线程可以去竞争成为新的leader
      if (leader == null && queue[0] != null)
        available.signal();
      lock.unlock();
    }
}


public boolean offer(Runnable x) {
    if (x == null)
      throw new NullPointerException();
    RunnableScheduledFuture e = (RunnableScheduledFuture)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      int i = size;
      // 队列元素已经大于等于数组的长度,需要扩容,新堆的容量是原来堆容量的1.5倍
      if (i >= queue.length)
        grow();
      // 堆中元素增加1
      size = i + 1;
      // 调整堆
      if (i == 0) {
        queue[0] = e;
        setIndex(e, 0);
      } else {
          // 调整堆,使的满足最小堆,比较大小的方式就是上文提到的compareTo方法
        siftUp(i, e);
      }
      if (queue[0] == e) {
        leader = null;
        // 通知其他在available条件上等待的线程,这些线程可以竞争成为新的leader
        available.signal();
      }
    } finally {
      lock.unlock();
    }
    return true;
}