基础知识
1.进程和线程
线程是最小的调度单位,进程是最小的资源分配单位
进程:当程序从磁盘加载到内存中这时就开启了一个进程,进程可视为程序的一个实例。大部分程序可以同时运行多个实例。
线程:线程是进程的一个子集,是一个指令流,并且将指令流中的指令按顺寻交给cpu执行
进程大多相互独立,线程存于进程内部。
进程拥有共享的资源供其内部线程共享
进程的通信复杂:同一机器进程通信的IPC和不同机器通信的HTTP等
线程通信简单,如共享一个变量。
线程量级更小,上下文切换成本较低。
并行与并发
并发:多个线程轮流使用cpu一个核心的做法称为并发(串行)
并行:多个指令同时运行称为并行。
同步与异步调用
定义:
同步——需要等待结果返回才能运行就是同步。
异步——不需要等待结果返回就可以运行。
场景:
异步:类似于应对长时间的操作并且结果不是必须时我们需要开启一个新线程来处理,避免长时间阻塞主线程。
例如:tomcat中的异步servlet,让用户线程处理耗时长的操作,从而避免阻塞tomcat工作线程,提高吞吐量。
关于异步和并发并行的关系:可见的是异步确实是开启了一个子线程去处理,对于cpu来说没有什么子主之分,都是单独的线程,不存在主线程处理完子线程就一定会over。在多核CPU时他们时可以并行执行的,在单核CPU时他们时可以并发执行的。
这里引入前台线程和后台线程:通常只有手动将线程设为后台线程才会让线程变为后台。
前台线程:主线程或者说时程序执行完后依然不会结束。
后台线程(守护线程):主线程执行完后就跟着结束。
线程的创建和运行
方法1:利用Thread
// 创建线程对象
Thread t = new Thread() {
public void run() {
// 要执行的任务
}
};
// 启动线程
t.start();
例如
// 构造方法的参数是给线程指定名字,推荐
Thread t1 = new Thread("t1") {
@Override
// run 方法内实现了要执行的任务
public void run() {
log.debug("hello");
}
};
t1.start()
输出为:
19:19:00 [t1] c.ThreadStarter – hello
方法2:利用runnable分离代码和线程创建。
这种方法源码最终还是调用Thread的run方法,只不过发现你有runnable对象之后用你runnable的run()方法。
推荐第二种方法,突出组合关系,使得代码更加灵活
例如:
// 创建任务对象
Runnable task2 = new Runnable() {
@Override
public void run() {
log.debug("hello");
}
};
// 参数1 是任务对象; 参数2 是线程名字,推荐
Thread t2 = new Thread(task2, "t2");
t2.start();
输出为:
19:19:00 [t2] c.ThreadStarter – hello
关于lambda表达式简介化代码
在JDK中,会把只有一个抽象方法的接口注解为@FunctionalInterface,这种接口可以被lambda简化
看了源码会直到你用第二种方法时必须传递target否则start()是无法启动新线程的。
// 创建任务对象
Runnable task2 = () -> log.debug("hello");
// 参数1 是任务对象; 参数2 是线程名字,推荐
Thread t2 = new Thread(task2, "t2");
t2.start();
方法3:FutureTask
FutureTask 能够接收 Callable 类型的参数,用来处理有返回结果的情况。
这个类实际上式间接实现了Runnable结构。
// 创建任务对象
FutureTask<Integer> task3 = new FutureTask<>(() -> {
log.debug("hello");
return 100;
});
// 参数1 是任务对象; 参数2 是线程名字,推荐
new Thread(task3, "t3").start();
// 主线程阻塞,同步等待 task 执行完毕的结果
Integer result = task3.get();
log.debug("结果是:{}", result);
输出:
19:22:27 [t3] c.ThreadStarter – hello
19:22:27 [main] c.ThreadStarter – 结果是:100
进程查看和杀死。
windows下
tasklist 查看进程
taskkill 杀死进程
同时配合JAVA指令
jps 命令查看所有 Java 进程
jstack 查看某个 Java 进程(PID)的所有线程状态
jconsole 来查看某个 Java 进程中线程的运行情况(图形界面)
linux下
ps -fe 查看所有进程
ps -fT -p 查看某个进程(PID)的所有线程
kill 杀死进程
top 按大写 H 切换是否显示线程
top -H -p 查看某个进程(PID)的所有线程
Jconsole下查看进程
java -Djava.rmi.server.hostname=
ip地址
-Dcom.sun.management.jmxremote –
Dcom.sun.management.jmxremote.port=连接端口
-Dcom.sun.management.jmxremote.ssl=是否安全连接 –
Dcom.sun.management.jmxremote.authenticate=是否认证 java类
修改 /etc/hosts 文件将 127.0.0.1 映射至主机名
如果要认证访问,还需要做如下步骤
复制 jmxremote.password 文件 修改 jmxremote.password 和 jmxremote.access 文件的权限为 600 即文件所有者可读写
连接时填入 controlRole(用户名),R&D(密码)
线程的运行与原理
栈帧的运行
JVM 中由堆、栈、方法区所组成,其中栈内存是给谁用的呢?其实就是线程,每个线程启动后,虚拟
机就会为其分配一块栈内存。
每个栈由多个栈帧(Frame)组成,对应着每次方法调用时所占用的内存
每个线程只能有一个活动栈帧,对应着当前正在执行的那个方法
我们加载这个类的字节码之后会在方法去列出对应的几个方法
堆中用来存放对象
如图是主线程运行到调用method2时的栈堆方法区的状态。
接下来会让对应的生成了object n之后就取消methord2的栈空间。
多线程运行时
在多线程运行时,每个线程有独立的栈空间,不会互相干扰,不会说主线程释放了method2导致子线程的method2空间也被释放了。
线程的上下文切换( Context Switch )
线程的上下文切换就是指CPU不执行当前线程转而执行其他线程:线程调度
有如下几种可能
线程的 cpu 时间片用完
垃圾回收
有更高优先级的线程需要运行 线程自己调用了
sleep、yield、wait、join、park、synchronized、lock 等方法
-
当 Context Switch 发生时,需要由操作系统保存当前线程的状态,并恢复另一个线程的状态,Java 中对应的概念
-
就是程序计数器(Program Counter Register),它的作用是记住下一条 jvm 指令的执行地址,是线程私有的
-
状态包括程序计数器、虚拟机栈中每个栈帧的信息,如局部变量、操作数栈、返回地址等
-
Context Switch 频繁发生会影响性能(抖动)
常见的线程方法
start与run
直接调用t.run()是主线程去执行,没有新的线程产生。需要调用start()方法才能产生新的线程,通过新的线程间接执行 run 中的代码。
sleep
通常用于主动让出CPU的使用权
- 调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞)可以使用对象名.getstate()查看
- 其它线程可以使用正在Timed Waiting线程的 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException
- 睡眠结束后的线程未必会立刻得到执行,可能CPU还正在执行其他进程
- 建议用 TimeUnit 的 sleep 代替 Thread 的 sleep 来获得更好的可读性
关于第四点的对比如下
TimeUnit.SECONDS.sleep(1);
Thread.sleep(1000);
yield
1.主动放弃,从Running进入Runnable状态(就绪)。
2.具体结果依赖于OS的任务调度器。如果没有其他线程需要执行,或者其他线程优先级都太低了,会yield不出去。
setPriority
可以设置提醒信息的线程优先级,但无法左右任务调度器的决定。
join
主动等待,底层原理就是wait。
两个线程有共同变量,A线程想要拿到B线程的某个结果后再执行,就可以再A线程的RUN代码中调用B.join()。
当Join(Long n)传入n时,就是等待多长时间后就不等了继续执行自身线程任务。
interrupt
打断阻塞态上面已经说了不再赘述。
打断正常态线程;
我们调用想要打断的interrupt(),如果该进程正在正常运行,那么不会立即终止,而是会把对象内部的isInterrupted标记状态改为true。相当于只是告诉它我要打断你。
进程对象的isInterrupted()方法可以获取自身的标记,如果发现为true,那么就可以做出一些改变。
private static void test2() throws InterruptedException {
Thread t2 = new Thread(()->{
while(true) {
Thread current = Thread.currentThread();
boolean interrupted = current.isInterrupted();
if(interrupted) {
log.debug(" 打断状态: {}", interrupted);
break;
}
}
}, "t2");
t2.start();
sleep(0.5);
t2.interrupt();
}
打断正常进程——二阶段终止。
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Test3 {
public static void main(String[] args) throws InterruptedException {
TwoPhase twoPhase = new TwoPhase();
twoPhase.start();
Thread.sleep(3000);
twoPhase.stop();
}
}
@Slf4j(topic = "123")
class TwoPhase{
private Thread moniter;
public void start(){
moniter = new Thread(()->{
Thread thread = Thread.currentThread();
while(true) {
if (thread.isInterrupted()) {
log.debug("被打断了");
break;
}
log.debug("监控");
try {
thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
//打断sleep中的进程会清空打断的状态,所以需要下面再调用一下
thread.interrupt();
}
}
}
);
moniter.start();
}
public void stop() {
moniter.interrupt();
}
}
打断park线程:打断处于park状态中的线程可以让线程继续执行。
常见的线程状态
-
【初始状态】
仅是在语言层面创建了线程对象,还未与操作系统线程关联 -
【可运行状态】(就绪状态)
指该线程已经被创建(与操作系统线程关联),可以由 CPU 调度执行 -
【运行状态】
指获取了 CPU 时间片运行中的状态,当 CPU 时间片用完,会从【运行状态】转换至【可运行状态】,会导致线程的上下文切换 -
【阻塞状态】
- 如果调用了阻塞 API,如 BIO 读写文件,这时该线程实际不会用到 CPU,会导致线程上下文切换,进入【阻塞状态】
- 等 BIO 操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】
- 与【可运行状态】的区别是,对【阻塞状态】的线程来说只要它们一直不唤醒,调度器就一直不会考虑调度它们。
-
【终止状态】表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态。
再JAVA的Thread类中
初始状态——new
就绪状态和运行状态——Runnable
阻塞状态——BLOCKED-WAITING-TIMED_WAITING
终止状态——TERMINATED
管程——悲观锁
查看如下代码
static int counter = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
counter++;
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
counter--;
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}",counter);
}
实际运行结果counter则是不为0的。
在java中,会把counter++转换为如下字节码
getstatic i // 获取静态变量i的值
iconst_1 // 准备常量1
iadd // 自增
putstatic i // 将修改后的值存入静态变量i
不同线程访问共享变量时,临界区内java代码虽然看上去只有一条,但是转换为执行的字节码的时候则是分了多步,在写后写就会造成这种问题。
synchronized
基本语法如下:
synchronized(对象) // 线程1, 线程2(blocked)
{
临界区
}
解决如上问题
static int counter = 0;
static final Object room = new Object();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (room) {
counter++;
}
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (room) {
counter--;
}
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}",counter);
}
synchronized会用对象锁保证了临界区内代码的原子性,临界区内的代码对外是不可分割的,不会被线程切换所打断。
面向对象改进,锁this对象
class Room {
int value = 0;
public void increment() {
synchronized (this) {
value++;
}
}
public void decrement() {
synchronized (this) {
value--;
}
}
public int get() {
synchronized (this) {
return value;
}
}
}
@Slf4j
public class Test1 {
public static void main(String[] args) throws InterruptedException {
Room room = new Room();
Thread t1 = new Thread(() -> {
for (int j = 0; j < 5000; j++) {
room.increment();
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int j = 0; j < 5000; j++) {
room.decrement();
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("count: {}" , room.get());
}
}
将synchronized加在方法上。
对象锁
对象锁
对象锁会将所有锁住的临界区变为串行,但是注意必须是同一个实例才行
class Test{
public synchronized void test() {
}
}
等价于
class Test{
public void test() {
synchronized(this) {
}
}
}
类对象锁(类锁)
类锁会锁住类对象,而类对象只有一个。所以会只允许一个线程访问被类锁锁住的方法。
class Test{
public synchronized static void test() {
}
}
等价于
class Test{
public static void test() {
synchronized(Test.class) {
}
}
}
特别注意:
1.注意synchronized只能锁对象
2.synchronized锁住的临界区即便有主动让出代码的动作也无法让其他线程夺取CPU
3.被类锁锁住的方法不特指某个方法,而是被类锁锁住的全部方法。比如线程1要方位a方法,线程2要访问b方法,那么a和b都是被类锁锁住的,这时线程1和线程2也是串行的。
线程安全分析
局部变量
public static void test1() {
int i = 10;
i++;
}
如上代码在多个线程并发时会产生线程安全吗?
答案是不会的,不同线程执行时所用的栈空间相对独立,这里的i并不是共享变量,所以没有涉及线程安全。
成员变量
static final int THREAD_NUMBER = 2;
static final int LOOP_NUMBER = 200;
public static void main(String[] args) {
ThreadUnsafe test = new ThreadUnsafe();
for (int i = 0; i < THREAD_NUMBER; i++) {
new Thread(() -> {
test.method1(LOOP_NUMBER);
}, "Thread" + i).start();
}
}
class ThreadUnsafe {
ArrayList<String> list = new ArrayList<>();
public void method1(int loopNumber) {
for (int i = 0; i < loopNumber; i++) {
// { 临界区, 会产生竞态条件
method2();
method3();
// } 临界区
}
}
private void method2() {
list.add("1");
}
private void method3() {
list.remove(0);
}
}
这时是会产生线程安全问题的,因为成员变量的对象会在堆里,可以被多个线程共享,这时就会发生RR等情况导致list无法正确得到写。
常见的线程安全类
- String
- Integer
- StringBuffer
- Random
- Vector
- Hashtable
- java.util.concurrent 包下的类
注意,每一个都是原子的,但是多个组合在一起就不一定了
String和Integer的安全是基于不可变的,所以他们的方法都是线程安全的。
而不可变并不是无法改变,只是不会改变,比如String的replace是会先复制原本的字符串底层的char数组然后再去实现的replace,实际上是又创建了一个对象。
PS:这里建议去看一下每个类的源码。
对象头
对象分为对象头+对象体
32位虚拟机JAVA中的对象头是8个字节。
前四个字节:hashcode是每个对象都有的哈希码,age是每个对象的分代年龄用于垃圾回收,后面是是否获得偏向锁,获取锁状态。
再往下有轻重锁的指针存放,用于存放moniter的指针。
后四个字节:后四个字节是一个指针,指向对象对应的类型(类对象)
包装类和基本类型对比:Integer 8+4字节,int 4字节
Moniter
Moniter是操作系统中的一个对象,在多线程并发时起到管程作用,这里我们利用synchronized锁对象时,
1.被锁对象根据自身的markword尝试获取系统中的monitor,
2.如果获取成功线程变为owner,
3.失败则进入阻塞态(BLOCKED or WAITING)。
4.线程执行完后解锁
synchronized锁升级
synchronized对于锁对象加锁的流程是:无锁或匿名偏向锁——偏向锁——轻量锁——重量锁。
注意:对象默认开偏向锁,但是可以关掉。
锁对象markword的锁指向
- 偏向锁:MarkWord存储的是偏向的线程ID。
- 轻量锁:MarkWord存储的是指向线程栈中Lock Record的指针。
- 重量锁:MarkWord存储的是指向堆中的monitor对象的指针
CAS
CAS是JDK提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性。
CAS有3个操作数,位置内存值V,旧的预期值A,要修改的更新值B。当且仅当旧的预期值A和内存值V相同时,将
内存值V修改为B,否则什么都不做或重试。
重试的这种行为称为自旋,原理有点类似乐观锁,修改带版本号。
偏向锁
为什么要引入偏向锁
大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,直接使用轻量级锁会有一定的性能损耗,因为轻量级锁加锁或者解锁都需要一次 CAS 操作,而偏向锁解锁时不需要修改对象头的 Mark Word 就会少一次 CAS操作。为了让线程获取锁的代价更低而引入了偏向锁。
加锁过程
当synchronized尝试锁对象时发现该对象处于匿名偏向状态或无所则会直接在锁对象 mark word 高位内存储当前线程内存地址,这个部分是使用 CAS 比较并替换完成的,如果设置成功,那该锁就是偏向当前线程的锁。
一次cas
不主动释放锁与锁重入
JVM处理释放锁的第一步是将当前线程栈内与当前锁对象相关的锁记录全部拿到,然后将最后一条锁记录释放掉。
通过检查 lock 对象的 markword,如果当前锁对象是偏向锁状态,就啥也不做,也就是说,就算线程退出同步代码块了,该锁对象仍然保留了偏向当前线程的偏向锁。这样下次同一进程重入只需比对偏向锁是否偏向自己。
轻量级锁
轻量级锁的使用场景:如果一个对象虽然有多线程要加锁,但加锁的时间是错开的(也就是没有竞争),那么可以
使用轻量级锁来优化。
加锁过程
线程1尝试获取轻量级锁,如果锁对象并没有被获取,那么线程1将用cas替换掉锁对象中的锁状态信息,并且把锁对象的状态信息作为取值,获取轻量级锁成功。
一次cas
锁重入
如果发现是锁对象的轻量级锁是自己先前获取的轻量级锁,那么则再添加一条取值为null的锁记录用于记录重入次数。
无cas
释放锁过程
释放锁时JVM在线程栈中找到最后一条锁记录,如果取值字段为null,则无需cas,直接删除这条锁记录。否则cas把原本的取值替换还给锁对象。
一次cas
重量级锁
获取锁,释放锁,锁重入参考上述moniter。
锁升级过程
假设目前锁偏向A,B来竞争。
- 1.B向JVM的任务队列提交一条撤销锁指令,
JVM在安全点状态下执行。 - 2.执行时JVM检查偏向进程A是否存活,若否,将锁对象改为 无锁状态 或者 匿名偏向锁状态 ,若是,则判断是否仍在同步代码块内,不在等同于线程死亡,若在则升级轻量锁。
- 3.修改锁对象状态为轻量锁,
并且保留持锁线程A的地址以便比对后序请求释放锁的线程是否为A。 - 4.线程B将锁对象升级为轻量级锁之后自旋,自旋后仍无法获取锁,则为锁对象获取moniter升级重量级锁,让锁对象指向重量级锁地址,B进入自旋(循环等待几次,而不是直接进入moniter的阻塞队列,避免上下文切换)。
简单来说就是:无锁-偏向锁(处于同步代码内)->轻量级锁-(自旋,升级)>重量级锁(自旋,阻塞)
以上就是锁升级。
5.线程A前来释放锁,使用 cas 将 Mark Word 的值恢复给对象头,失败。这时会进入重量级解锁流程,即按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中 BLOCKED 线程。
注意自旋这种方式只有在线程并行也就是多核cpu时是有意义的。
wait/notify
- obj.wait() 让进入 object 监视器的线程到 waitSet 等待
- obj.notify() 在 object 上正在 waitSet 等待的线程中挑一个唤醒
- obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒
sleep和wait
- sleep 是 Thread 方法,而 wait 是 Object 的方法
- sleep 不需要强制和 synchronized 配合使用,但 wait 需要和 synchronized 一起用
- sleep 在睡眠的同时,不会释放对象锁的(yield也是),但 wait 在等待的时候会释放对象锁
- 它们状态 TIMED_WAITING
使用规范性上和C++的信号量机制基本一样。
Park & Unpark
暂停当前线程:LockSupport.park();
恢复某个线程的运行:LockSupport.unpark(暂停线程对象)
park将让进程进入wait状态。
park和unpark只是会影响线程对象内部一个counter值,所以先unpark和park也会唤醒。
unpark多次只会让counter加一。
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(1);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
},"t1");
t1.start();
sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);
生产者-消费者模式(很像我之前写C++的信号量机制)
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
注意点:
1.需要获取的锁对象为消息队列。
2.消费者消费结束后唤醒生产者。
package 生产者消费者;
import javax.swing.*;
import java.util.LinkedList;
public class Test {
}
class MessageQueue{
private LinkedList<Message> list = new LinkedList<>();//双向队列
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
}
public Message take(){
synchronized (list){
while (list.isEmpty()){
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = list.removeFirst();
list.notifyAll();
return message;
}
}
public void put(Message message) {
synchronized (list){
while (list.size()==capacity){
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.addLast(message);
}
}
}
final class Message{//严格开闭原则,final禁止子类
private int id;
private Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
}
锁的活跃性
死锁
多个进程互相等待对方手中的“资源”,导致谁都无法开始。
死锁的定位
1.JPS定位
cmd > jps
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
12320 Jps
22816 KotlinCompileDaemon
33200 TestDeadLock // JVM 进程
11508 Main
28468 Launcher
这个指令在控制台输入后可以显示被阻塞(死锁)的进程,等待锁对象的信息。
类似于下面说的Thread-1进入死锁。
"Thread-1" #12 prio=5 os_prio=0 tid=0x000000001eb69000 nid=0xd40 waiting for monitor entry
[0x000000001f54f000]
java.lang.Thread.State: BLOCKED (on object monitor)
at thread.TestDeadLock.lambda$main$1(TestDeadLock.java:28)
- waiting to lock <0x000000076b5bf1c0> (a java.lang.Object)
- locked <0x000000076b5bf1d0> (a java.lang.Object)
at thread.TestDeadLock$$Lambda$2/883049899.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
活锁
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束
例如下面这两个线程。
public class TestLiveLock {
static volatile int count = 10;
static final Object lock = new Object();
public static void main(String[] args) {
new Thread(() -> {
// 期望减到 0 退出循环
while (count > 0) {
sleep(0.2);
count--;
log.debug("count: {}", count);
}
}, "t1").start();
new Thread(() -> {
// 期望超过 20 退出循环
while (count < 20) {
sleep(0.2);
count++;
log.debug("count: {}", count);
}
}, "t2").start();
}
}
饥饿
由于线程优先级相对较低总是拿不到资源,导致既无法结束又无法开始。长时间处于此状态会导致线程饿死。
ReentrantLock
相对于 synchronized 它具备如下特点
- 可被其他线程打断(lock.lockInterruptibly();)
- 可以设置超时时间(lock.tryLock(1, TimeUnit.SECONDS))
- 可以设置为公平锁(ReentrantLock 默认是不公平的,改为:ReentrantLock lock = new ReentrantLock(true);才能实现FIFO)
- 支持多个条件变量(根据不满足条件的不同,可以放入不同的waitset)
与synchronized相同,Reentrantlock也支持锁重入。
基本语法
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
可打断
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("等锁的过程中被打断");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(1);
t1.interrupt();
log.debug("执行打断");
} finally{
lock.unlock();
}
锁超时
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("获取等待 1s 后失败,返回");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(2);
} finally {
lock.unlock();
}
公平锁
可将下图的
ReentrantLock lock = new ReentrantLock(false)
改为
ReentrantLock lock = new ReentrantLock(true)
实现公平锁。
多个条件变量
在拥有一个新的reentrantlock对象后,可以通过Condition waitCigaretteQueue = lock.newCondition();来申请对于不同的条件变量的等待集合。
通过对应的Condition对象的await方法将不同线程送入不同的集合中。
通过对应的Condition对象的singal方法将不同线程送入不同的集合中。
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
public static void main(String[] args) {
new Thread(() -> {
try {
lock.lock();
while (!hasCigrette){
try {
waitCigaretteQueue.await();
} catch (InterruptedException e){
e.printStackTrace();
}
}
log.debug("等到了它的烟");
}finally {
lock.unlock();
}
}).start();
new Thread(() -> {
try {
lock.lock();
while (!hasBreakfast){
try {
waitbreakfastQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();
sleep(1);
sendBreakfast();
sleep(1);
sendCigarette();
}
private static void sendCigarette() {
lock.lock();
try {
log.debug("送烟来了");
hasCigrette = true;
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}
private static void sendBreakfast() {
lock.lock();
try {
log.debug("送早餐来了");
hasBreakfast = true;
waitbreakfastQueue.signal();
} finally {
lock.unlock();
}
}
JMM
- 有序性 – 保证指令不会受 cpu 指令并行优化的影响
可见性
可见性 – 一个线程对变量的修改对于别的线程是可见的,保证指令不会受 cpu 缓存的影响。
static boolean run = true;
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(()->{
while(run){
// ....
}
});
t.start();
sleep(1);
run = false; // 线程t不会如预想的停下来
}
如上代码运行时,线程t总会频繁访问run,所以java自动缓存了一份,导致主线程修改run之后t线程却看不到run被修改了
解决方法:volatile关键字,synchronized加锁
我们在变量前加上volatile的时候可以保证可见性。
原子性
原子性 – 保证指令不会受到线程上下文切换的影响。
就是最开始搞加锁是讨论的线程1进行i++,线程2进行i–,i++这种实际上是四个指令的集合,就有可能让i++这条代码的指令受到线程切换的影响。
volatile虽然可以保证可见性,但是volatile无法像synchronized一样保证原子性,所以适用于不会发生写冲突的时候使用,synchronized虽然能保证原子性但是之前学过synchronized的加锁操作较重,性能较低,需要根据不同的业务场景适当选择。
有序性
java会在自以为保证正确性的情况下来重新对指令排序。
这种正确性的保证对于单线程是没有问题的,但是对于多线程或者有流水线实现指令级并行的cpu而言就不可行了。
如下代码就会出现最终r.r1=0 4 1三种情况的出现。
@JCStressTest
@Outcome(id = {"1", "4"}, expect = Expect.ACCEPTABLE, desc = "ok")
@Outcome(id = "0", expect = Expect.ACCEPTABLE_INTERESTING, desc = "!!!!")
@State
public class ConcurrencyTest {
int num = 0;
boolean ready = false;
@Actor
public void actor1(I_Result r) {
if(ready) {
r.r1 = num + num;
} else {
r.r1 = 1;
}
}
@Actor
public void actor2(I_Result r) {
num = 2;
ready = true;
}
}
对于这种问题我们同样可以用volatile解决,volatile可以保证线程内添加volatile修饰符的变量在指令执行到它之前没有禁用指令重排序。
volatile原理
Memory Barrier 读屏障和写屏障
写屏障:
1.保证在该屏障之前的对共享变量的改动都同步到主存(可见性)。
2.确保不会将写屏障之前的代码排在写屏障之后(有序性)。
读屏障:
1.保证在该屏障之后读取的共享变量是主存的最新数据(可见性)
2.读屏障会确保指令重排序时不会将屏障后的代码排序到屏障前(有序性)。
为什么不能解决原子性也可见了
1.写屏障仅仅是保证之后的读能够读到最新的结果,但不能保证读跑到它前面去
2.有序性的保证也只是保证了本线程内相关代码不被重排序
DCL问题——有序性实例
单例模式为例,前些天我寻思复习复习,正好复习了这玩意儿真巧啊
从前有个人觉得如果直接把synchronized锁住单例的类对象,会让性能下滑幅度较大,所以他这么写了一下。
public final class Singleton {
private Singleton() { }
private static Singleton INSTANCE = null;
public static Singleton getInstance() {
if(INSTANCE == null) { // t2
// 首次访问会同步,而之后的使用没有 synchronized
synchronized(Singleton.class) {
if (INSTANCE == null) { // t1
INSTANCE = new Singleton();
}
}
}
return INSTANCE;
}
}
其实我们可以看到,他把获取实例的部分锁住了,但是问题在于我们刚才说的有序性,synchronized固然能保证内部的有序性,但上述代码synchronized的作用于较小,无法保证return不会再获取实例之前发生。
这就会导致:线程1还没构造完实例,这时候线程2过来发现实例非空,线程2过来就拿走一个没构造完成的实例,这就是有序性导致的并发问题(或者你让构造函数是个原子操作?这显然有些强人所难)。
我们为了确保有序性可以使用volatile来启动读写屏障保证不让指令跨屏障执行。
public final class Singleton {
private Singleton() { }
private static volatile Singleton INSTANCE = null;
public static Singleton getInstance() {
// 实例没创建,才会进入内部的 synchronized代码块
if (INSTANCE == null) {
synchronized (Singleton.class) { // t2
// 也许有其它线程已经创建实例,所以再判断一次
if (INSTANCE == null) { // t1
INSTANCE = new Singleton();
}
}
}
return INSTANCE;
}
}
volatile使用场景:单写多读保证可见性,dcl保证同步代码块外的代码有序性。
happens-before规则
happens-before 规定了对共享变量的写操作对其它线程的读操作可见,它是可见性与有序性的一套规则总结
- 线程解锁 m 之前对变量的写,对于接下来对 m 加锁的其它线程对该变量的读可见。
- 线程对 volatile 变量的写,对接下来其它线程对该变量的读可见。
- 线程 start 前对变量的写,对该线程开始后对该变量的读可见。
- 线程结束前对变量的写,对其它线程得知它结束后的读可见(比如其它线程调用 t1.isAlive() 或 t1.join()等待
它结束)。 - 线程 t1 打断 t2(interrupt)前对变量的写,对于其他线程得知 t2 被打断后对变量的读可见(通过
t2.interrupted 或 t2.isInterrupted)。 - 对变量默认值(0,false,null)的写,对其它线程对该变量的读可见具有传递性,如果 x hb-> y 并且 y hb-> z 那么有 x hb-> z ,配合 volatile 的防指令重排,有下面的例子。
线程安全习题
// 问题1:为什么加 final
//加入final后可以防止类被修改,符合开闭原则
// 问题2:如果实现了序列化接口, 还要做什么来防止反序列化破坏单例
//添加readResolve,序列化就是将对象变二进制流,反序列化就是二进制流变对象,添加这个方法后会优先用你返回的对象而不是反序列化一个
public final class Singleton implements Serializable {
// 问题3:为什么设置为私有? 是否能防止反射创建新的实例?
//不能,xml中只要定好了类的位置,并且把方法的accessible设置为true,就可以暴力获取
private Singleton() {}
// 问题4:这样初始化是否能保证单例对象创建时的线程安全?
//可以,静态成员变量的初始化由未加载阶段完成,jvm可以保证安全性
private static final Singleton INSTANCE = new Singleton();
// 问题5:为什么提供静态方法而不是直接将 INSTANCE 设置为 public, 说出你知道的理由
//方便之后的拓展,可以在后期通过函数使用泛型,提供更好的封装性等。
public static Singleton getInstance() {
return INSTANCE;
}
public Object readResolve() {
return INSTANCE;
}
}
无锁——乐观锁
CAS-Volatile实现无锁线程安全
我们之前在synchronized锁升级的时候就学到了CAS,是一种基于对比旧值成功后替换新值的修改方式,这种修改是乐观锁的修改方式。
CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果。
@Override
public void withdraw(Integer amount) {
while (true) {
int prev = balance.get();
int next = prev - amount;
if (balance.compareAndSet(prev, next)) {
break;
}
}
// 可以简化为下面的方法
// balance.addAndGet(-1 * amount);
}
如下是两个线程用cas和原子整数对共享变量进行修改。
为什么无锁快?
-
无锁情况下,当线程数小于CPU核心数,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻
-
但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,如果没有足够的CPU核心,虽然线程不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。
所以最使用的场景就是:线程数<=核心数,并且能够有值给你比对。
CAS 的特点
CAS 是基于乐观锁的思想:线程安全问题不常发生,重试不常发生。
synchronized 是基于悲观锁的思想:线程安全问题发生非常频繁,得防着其它线程来修改共享变量,我改完了解开锁,你们才有机会。
CAS 体现的是无锁并发、无阻塞并发
- 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
- 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响
原子类型
原子整数
AtomicInteger i = new AtomicInteger(0);
// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
System.out.println(i.getAndIncrement());
// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
System.out.println(i.incrementAndGet());
// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
System.out.println(i.decrementAndGet());
// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println(i.getAndDecrement());
// 获取并加值(i = 0, 结果 i = 5, 返回 0)
System.out.println(i.getAndAdd(5));
// 加值并获取(i = 5, 结果 i = 0, 返回 0)
System.out.println(i.addAndGet(-5));
// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.getAndUpdate(p -> p - 2));
// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.updateAndGet(p -> p + 2));
// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
原子引用
AtomicReference
需要保护原子性的的共享数据不一定是整数类型,这时可以选择使用原子引用来实现。
AtomicReference ref = new AtomicReference<>(bigDecimal);
ref.compareAndSet(prev, next)
class DecimalAccountSafeCas implements DecimalAccount {
AtomicReference<BigDecimal> ref;
public DecimalAccountSafeCas(BigDecimal balance) {
ref = new AtomicReference<>(balance);
}
@Override
public BigDecimal getBalance() {
return ref.get();
}
@Override
public void withdraw(BigDecimal amount) {
while (true) {
BigDecimal prev = ref.get();
BigDecimal next = prev.subtract(amount);
if (ref.compareAndSet(prev, next)) {
break;
}
}
}
}
AtomicStampedReference
ABA问题:当我们把原本的值A改变为B,然后又把B改变为A,这显然是不会让CAS返回对比的false,还是会成功的去修改。
当我们使用AtomicStampedReference就可以获取版本号这样的版本号就可以帮我们解决需要杜绝ABA的场景。
static AtomicStampedReference ref = new AtomicStampedReference<>(“A”, 0);
ref.compareAndSet(prev, “C”, stamp, stamp + 1)
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
public static void main(String[] args) throws InterruptedException {
log.debug("main start...");
// 获取值 A
String prev = ref.getReference();
// 获取版本号
int stamp = ref.getStamp();
log.debug("版本 {}", stamp);
// 如果中间有其它线程干扰,发生了 ABA 现象
other();
sleep(1);
// 尝试改为 C
log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}
private static void other() {
new Thread(() -> {
log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",
ref.getStamp(), ref.getStamp() + 1));
log.debug("更新版本为 {}", ref.getStamp());
}, "t1").start();
sleep(0.5);
new Thread(() -> {
log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",
ref.getStamp(), ref.getStamp() + 1));
log.debug("更新版本为 {}", ref.getStamp());
}, "t2").start();
}
AtomicMarkableReference
但是上述的AtomicStampedReference规定的太死了,有时候我们只关心值是否被修改了而不是被修改了几次。
// 静态内部类,封装了 变量引用 和 版本号
private static class Pair<T> {
final T reference; // 变量引用
final boolean mark; // 修改标识
private Pair(T reference, boolean mark) {
this.reference = reference;
this.mark = mark;
}
static <T> Pair<T> of(T reference, boolean mark) {
return new Pair<T>(reference, mark);
}
}
private volatile Pair<V> pair;
/**
*
初始化,构造成一个 Pair 对象,由于 pair 是用 volatile 修饰的所以在构造是线程安全的
* @param initialRef 初始化变量引用
* @param initialMark 修改标识
*/
public AtomicMarkableReference(V initialRef, boolean initialMark) {
pair = Pair.of(initialRef, initialMark);
}
// 构造函数,初始化引用和标记值
public AtomicMarkableReference(V initialRef, boolean initialMark)
// 以原子方式获取当前引用值
public V getReference()
// 以原子方式获取当前标记值
public int isMarked()
// 以原子方式获取当前引用值和标记值
public V get(boolean[] markHolder)
// 以原子的方式同时更新引用值和标记值
// 当期望引用值不等于当前引用值时,操作失败,返回false
// 当期望标记值不等于当前标记值时,操作失败,返回false
// 在期望引用值和期望标记值同时等于当前值的前提下
// 当期望引用值和期望标记值同时等于当前时,不更新,直接返回true
// 当期望引用值和期望标记值不等于当前值时,同时设置新的引用值和新的标记值,返回true
public boolean weakCompareAndSet(V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark)
// 以原子的方式同时更新引用值和标记值
// 当期望引用值不等于当前引用值时,操作失败,返回false
// 当期望标记值不等于当前标记值时,操作失败,返回false
// 在期望引用值和期望标记值同时等于当前值的前提下
// 当新的引用值和新的标记值同时等于当前值时,不更新,直接返回true
// 当新的引用值和新的标记值不同时等于当前值时,同时设置新的引用值和新的标记值,返回true
public boolean compareAndSet(V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark)
// 以原子方式设置引用的当前值为新值newReference
// 同时,以原子方式设置标记值的当前值为新值newMark
// 新引用值和新标记值只要有一个跟当前值不一样,就进行更新
public void set(V newReference, boolean newMark)
// 以原子方式设置标记值为新的值
// 前提:引用值保持不变
// 当期望的引用值与当前引用值不相同时,操作失败,返回fasle
// 当期望的引用值与当前引用值相同时,操作成功,返回true
public boolean attemptMark(V expectedReference, boolean newMark)
// 使用`sun.misc.Unsafe`类原子地交换两个对象
private boolean casPair(Pair<V> cmp, Pair<V> val)
这里去理解这个东西CAS,是在标记值和期望值都与原本不同时才会修改。
@Slf4j
public class TestABAAtomicMarkableReference {
public static void main(String[] args) throws InterruptedException {
GarbageBag bag = new GarbageBag("装满了垃圾");
// 参数2 mark 可以看作一个标记,表示垃圾袋满了
AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
log.debug("主线程 start...");
GarbageBag prev = ref.getReference();
log.debug(prev.toString());
new Thread(() -> {
log.debug("打扫卫生的线程 start...");
bag.setDesc("空垃圾袋");
while (!ref.compareAndSet(bag, bag, true, false)) {}
log.debug(bag.toString());
}).start();
Thread.sleep(1000);
log.debug("主线程想换一只新垃圾袋?");
boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
log.debug("换了么?" + success);
log.debug(ref.getReference().toString());
}
}
class GarbageBag {
String desc;
public GarbageBag(String desc) {
this.desc = desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return super.toString() + " " + desc;
}
}
原子数组
当我们想要修改的东西不是对象本身而是对象内部元素,例如数组时,我们需要用到原子数组来保证安全性。
/**
参数1,提供数组、可以是线程不安全数组或线程安全数组
参数2,获取数组长度的方法
参数3,自增方法,回传 array, index
参数4,打印数组的方法
*/
// supplier 提供者 无中生有 ()->结果
// function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
// consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->
private static <T> void demo(
Supplier<T> arraySupplier,
Function<T, Integer> lengthFun,
BiConsumer<T, Integer> putConsumer,
Consumer<T> printConsumer ) {
List<Thread> ts = new ArrayList<>();
T array = arraySupplier.get();
int length = lengthFun.apply(array);
for (int i = 0; i < length; i++) {
// 每个线程对数组作 10000 次操作
ts.add(new Thread(() -> {
for (int j = 0; j < 10000; j++) {
putConsumer.accept(array, j%length);
}
}));
}
ts.forEach(t -> t.start()); // 启动所有线程
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}); // 等所有线程结束
printConsumer.accept(array);
}
如下是正常数组和原子数组的使用函数式接口提供值的对比。
demo(
()->new int[10],
(array)->array.length,
(array, index) -> array[index]++,
array-> System.out.println(Arrays.toString(array))
);
demo(
()-> new AtomicIntegerArray(10),
(array) -> array.length(),
(array, index) -> array.getAndIncrement(index),
array -> System.out.println(array)
);
原子更新器
当我们尝试去更新一个类对象的成员变量的时候,更改的操作需要确保原子性,这个时候就有了原子更新器。
AtomicReferenceFieldUpdater
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
AtomicIntegerFieldUpdater fieldUpdater =AtomicIntegerFieldUpdater.newUpdater(Test5.class, “field”);
或者
AtomicIntegerFieldUpdater fieldUpdater =AtomicIntegerFieldUpdater.newUpdater(类对象,成员变量类对象, 成员变量名);
public class Test5 {
private volatile int field;
public static void main(String[] args) {
AtomicIntegerFieldUpdater fieldUpdater =AtomicIntegerFieldUpdater.newUpdater(Test5.class, "field");
Test5 test5 = new Test5();
fieldUpdater.compareAndSet(test5, 0, 10);
// 修改成功 field = 10
System.out.println(test5.field);
// 修改成功 field = 20
fieldUpdater.compareAndSet(test5, 10, 20);
System.out.println(test5.field);
// 修改失败 field = 20
fieldUpdater.compareAndSet(test5, 10, 30);
System.out.println(test5.field);
}
}
原子累加器
LongAdder
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
T adder = adderSupplier.get();
long start = System.nanoTime();
List<Thread> ts = new ArrayList<>();
// 4 个线程,每人累加 50 万
for (int i = 0; i < 4; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 500000; j++) {
action.accept(adder);
}
}));
}
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(adder + " cost:" + (end - start)/1000_000);
}
main方法里验证如下代码
for (int i = 0; i < 5; i++) {
demo(() -> new LongAdder(), adder -> adder.increment());
}
for (int i = 0; i < 5; i++) {
demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
}
原子累加器对于数字的累加性能会大大提升。
原理在于累加器对于不同线程的累加提供了多个“累加单元”,而这个累加单元当所有线程都调用完成后就会将结果汇总,这样就避免了多个线程争抢一个公共变量,从而造成多次CAS的失败和重试,导致性能的降低。
LongAdder加锁——cas锁
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;
这里的cellsBusy加锁是怎么做的呢?
实现原理如下。当我们把加锁标记置为1的时候,我们下面lock就会一直CAS比对失败,导致线程一直重试,直到unlock把cellsbusy置为0才继续操作别的指令。
这里的cellsBusy主要用于cells数组的创建的扩容保证线程安全。不要用于自己的生产。
缓存行伪共享
由于要提高IO效率引入了缓存,数据在缓存中是一行一行的,cells是一个数组,所以多个数据就会在一起。
当一个CPU核心修改了cell一个内存数据的时候,所在缓存行内所有数据都会失效,就需要重新读内存去完成CAS,而度内存就比读缓存慢得多。
但是显然这就不合理,一个数据修改了其他数据也要连坐,这时有一个@sun.misc.Contended注释可以让每个数值在缓存行存储时前面加上128字节的空白,从而造成一个数据一个缓存行的效果,就可以不用连坐啦。
// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
final boolean cas(long prev, long next) {
return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
}
// 省略不重要代码
}
add
public void add(long x) {
// as 为累加单元数组
// b 为基础值
// x 为累加值
Cell[] as; long b, v; int m; Cell a;
// 1. as 有值, 表示已经发生过竞争, 进入 if
// 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if
if ((as = cells) != null || !casBase(b = base, b + x)) {
// uncontended 表示 cell 没有竞争
boolean uncontended = true;
// as 还没有创建
if (as == null || (m = as.length - 1) < 0 ||
// 当前线程对应的 cell 还没有
(a = as[getProbe() & m]) == null ||
// cas 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )
!(uncontended = a.cas(v = a.value, v + x))
){// 进入 cell 数组创建、cell 创建的流程
longAccumulate(x, null, uncontended);
}
}
}
Unsafe类
unsafe是提供了直接操作内存的一系列方法,我们上述学的synchronized,reentrantlock,cas等都是通过调用这个类的放来实现的。
由于Unsafe的对象是私有的,我们获取unsafe对象只能通过暴力反射获取。
public class UnsafeAccessor {
static Unsafe unsafe;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
//Field是一个类,位于Java.lang.reflect包下,在Java反射中Field用于获取某个类的属性或该属性的属性值。
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);//这一步便是真的获取了成员变量
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new Error(e);
}
}
static Unsafe getUnsafe() {
return unsafe;
}
}
不可变类型
在对可变类对象操作时会出现线程安全问题,同时有很多不可变类型,使用这种类型的对象不会出现这种问题
- String
- Integer
- StringBuffer
- Random
- Vector
- Hashtable
- java.util.concurrent 包下的类
不可变类的设计
主要的思想有两个:
1.不可更改类内容
2.不共享对象
以String为例
String里的类加了大量final修饰,代表无子类,不可变。
- 属性用 final 修饰保证了该属性是只读的,不能修改
- 类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性
public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[];
/** Cache the hash code for the string */
private int hash; // Default to 0
// ...
}
当我们对String的对象进行修改的时候会发现String的这个方法实际上是又返回了一个新的对象。
public String substring(int beginIndex) {
if (beginIndex < 0) {
throw new StringIndexOutOfBoundsException(beginIndex);
}
int subLen = value.length - beginIndex;
if (subLen < 0) {
throw new StringIndexOutOfBoundsException(subLen);
}
return (beginIndex == 0) ? this : new String(value, beginIndex, subLen);
}
public String(char value[], int offset, int count) {
if (offset < 0) {
throw new StringIndexOutOfBoundsException(offset);
}
if (count <= 0) {
if (count < 0) {
throw new StringIndexOutOfBoundsException(count);
}
if (offset <= value.length) {
this.value = "".value;
return;
}
}
if (offset > value.length - count) {
throw new StringIndexOutOfBoundsException(offset + count);
}
this.value = Arrays.copyOfRange(value, offset, offset+count);
}
final
final 对于final变量,当给final变量赋值的时候,会在赋值后插入写屏障确保有序性和可见性。
赋值时
样在putfield指令之后也会加入写屏障,保证在其它线程读到它的值时不会出现为 0 的情况
获取时
获取时会有一个小优化。
final变量在获取时,以int类型为例,如果大小小于短整型的最大数值,那么会直接copy一份到类的栈内存,否则会使用LDC复制到类的常量池中。这样做较快,而且避免了共享。
不加final的成员变量在获取时则会从堆中获取。
线程池
ThreadPoolExecutor
如下是ThreadPoolExecutor的类图
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量,这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值。
构造方法如下
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize 核心线程数目 (最多保留的线程数)
- maximumPoolSize 最大线程数目
- keepAliveTime 生存时间 – 针对救急线程
- unit 时间单位 – 针对救急线程
- workQueue 阻塞队列
- threadFactory 线程工厂 – 可以为线程创建时起个好名字
- handler 拒绝策略
以上参数,中corePoolSize和maximumPoolSize的差值就是救急线程数目。
救急线程
救急线程是当你采用了有界的阻塞队列,如果等待任务数大于队列容量,那么则会开启救急线程来处理多出的任务,当救急线程将任务处理完毕keepAliveTime和unit将会规定救急线程还能存活多久。
注意,只有使用有界队列才会有救急线程。
工作过程
- 懒汉创建:线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
- 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。
- 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize – corePoolSize 数目的线程来救急。
- 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现。
- 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由
keepAliveTime 和 unit 来控制。
拒绝测略
- AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
- CallerRunsPolicy 让调用者运行任务
- DiscardPolicy 放弃本次任务
- DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
- Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方
便定位问题 - Netty 的实现,是创建一个新线程来执行任务
- ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
- PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
Executors
对于ThreadPoolExecutor的创建,JDK提供了许多的工厂方法。
固定大小线程池—— newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
阻塞队列是无界的,可以放任意数量的任务
适用于任务量已知,相对耗时的任务
带缓冲线程池—— newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
- 全部都是救急线程(60s 后可以回收)
- 救急线程可以无限创建
队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交
货)
适合任务数比较密集,但每个任务执行时间较短的情况
单线程线程池——newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 相比于自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作。
- Executors.newFixedThreadPool(1) 初始时为1,对外暴露的是 ThreadPoolExecutor 对象,可以强转后调setCorePoolSize 等方法进行修改。
- Executors.newSingleThreadExecutor() 线程个数始终为1不能修改,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法。
适用于希望多个任务串行执行
ThreadPoolExecutor方法
// 执行任务
void execute(Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
submit
invokeall
注意,这里的return是为了变成callable对象然后配合future使用。
不加return就是runnable无法配合future使用。
而future可以正确获取任务的异常,推荐使用future。
@Slf4j
public class Test {
public static void main(String[] args){
ExecutorService pool = Executors.newFixedThreadPool(3);
List<Future<String>> futures = pool.invokeAll(Arrays.asList(
() -> {
log.debug("begin");
Thread.sleep(1000);
return "1";
},
()->{
log.debug("begin");
Thread.sleep(1000);
return "2";
}
));
}
}
invokeAny
@Slf4j
public class Test {
public static void main(String[] args){
ExecutorService pool = Executors.newFixedThreadPool(3);
String result = pool.invokeAny(Arrays.asList(
() -> {
log.debug("begin");
Thread.sleep(1000);
return "1";
},
() -> {
log.debug("begin");
Thread.sleep(1000);
return "2";
}
));
log.debug("{}",result);
}
}
线程池关闭
shutdown
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行,除非调用此线程池shutdown方法的线程又使用了awaitTermination(time,unit)
源码如下
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(SHUTDOWN);
// 仅会打断空闲线程
interruptIdleWorkers();
onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等,让他们执行完所有任务后就自己结束)
tryTerminate();
}
shutdownNow
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
源码如下
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(STOP);
// 打断所有线程
interruptWorkers();
// 获取队列中剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终结
tryTerminate();//这里应该式必然会结束的,因为上面已经拒绝任务并且打断所有线程了
return tasks;
}
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事
情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
任务调度线程池——ScheduledThreadPoolExecutor
TImer
Timer是早期实现任务计划性执行的功能类。
Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor延时执行任务。
@Slf4j
public class Test {
public static void main(String[] args){
ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
pool.schedule(()->{
log.debug("task1");
},1,TimeUnit.SECONDS);
pool.schedule(()->{
log.debug("task2");
},1,TimeUnit.SECONDS);
}
}
定时执行任务。
@Slf4j
public class Test {
public static void main(String[] args){
ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
pool.scheduleAtFixedRate(()->{
log.debug("task1");
},1,1,TimeUnit.SECONDS);
pool.scheduleWithFixedDelay(()->{
log.debug("task2");
},1,1,TimeUnit.SECONDS);
}
}
scheduleAtFixedRate:delay的时间是从上一次任务开始的时候计时。
scheduleWithFixedDelay:delay的时间是从上一次任务结束的时候计时。
Tomcat线程池
以下是五个不同tomcat线程的分工:
- LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
- Acceptor 只负责【接收新的 socket 连接】
- Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
- 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
- Executor 线程池中的工作线程最终负责【处理请求】
而Tomcat线程池与ThreadPoolExecutor稍有不同
- 如果总线程数达到 maximumPoolSize,这时不会立刻抛 RejectedExecutionException 异常,而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常。
Connector配置:
acceptor一个线程足够了,这种线程活很少。
poller一个也够了,poller采用多路复用,一个线程就能监听多个channel
当使用executor时会覆盖minSpareThreads和maxThreads
Executor 线程配置.
注意这里默认守护线程。
tomcat的阻塞队列
Fork/Join线程池——分治线程池
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
@Slf4j
class Test extends RecursiveTask<Integer> {
int begin;
int end;
public Test(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
public String toString() {
return "{" + begin + "," + end + '}';
}
@Override
protected Integer compute() {
// 5, 5
if (begin == end) {
log.debug("join() {}", begin);
return begin;
}
// 4, 5
if (end - begin == 1) {
log.debug("join() {} + {} = {}", begin, end, end + begin);
return end + begin;
}
// 1 5
int mid = (end + begin) / 2; // 3
Test t1 = new Test(begin, mid); // 1,3
t1.fork();
Test t2 = new Test(mid + 1, end); // 4,5
t2.fork();
log.debug("fork() {} + {} = ?", t1, t2);
int result = t1.join() + t2.join();
log.debug("join() {} + {} = {}", t1, t2, result);
return result;
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new Test(1, 10)));
}
}
[ForkJoinPool-1-worker-0] – join() 1 + 2 = 3
[ForkJoinPool-1-worker-3] – join() 4 + 5 = 9
[ForkJoinPool-1-worker-0] – join() 3
[ForkJoinPool-1-worker-1] – fork() {1,3} + {4,5} = ?
[ForkJoinPool-1-worker-2] – fork() {1,2} + {3,3} = ?
[ForkJoinPool-1-worker-2] – join() {1,2} + {3,3} = 6
[ForkJoinPool-1-worker-1] – join() {1,3} + {4,5} = 15
15
原文地址:https://blog.csdn.net/qq_35866893/article/details/135814234
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_64109.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!