参考:[遇见狂神说]JUC视频笔记
什么是 JUC
JUC 就是 java.util.concurrent 下面的类包,专门用于多线程的开发。
线程和进程 线程和进程
java如何开启线程(开不了)
Java 没有权限开启线程 、Thread.start() 方法调用了一个 native 方法 start0(),它调用了底层 C++ 代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public synchronized void start () { if (threadStatus != 0 ) throw new IllegalThreadStateException(); group.add(this ); boolean started = false ; try { start0(); started = true ; } finally { try { if (!started) { group.threadStartFailed(this ); } } catch (Throwable ignore) { } } } private native void start0 () ;
并发与并行 并发编程(本质:充分利用CPU的资源):
并发(多线程操作同一个资源)
CPU 一核 ,模拟出来多条线程,天下武功,唯快不破,快速交替
并行(多个人一起行走)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 package com.yangl.study.juc;public class Demo01 { public static void main (String[] args) { System.out.println(Runtime.getRuntime().availableProcessors()); } }
线程的几个状态 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; }
wait/sleep 区别
wait
sleep
来自不同的类
Object
Thread
关于锁的释放
会释放锁
抱着锁睡觉,不会释放!
使用的范围是不同的
必须在同步代码块中
可以在任何地方睡
是否需要捕获异常
不需要
需要
Lock锁(重点) 传统 Synchronized 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class Demo02Synchronized { public static void main (String[] args) { Ticket ticket = new Ticket(); new Thread(()->{ for (int i=0 ; i < 50 ; i++) { ticket.sale(); } }, "thread-A" ).start(); new Thread(()->{ for (int i=0 ; i < 50 ; i++) { ticket.sale(); } }, "thread-B" ).start(); new Thread(()->{ for (int i=0 ; i < 50 ; i++) { ticket.sale(); } }, "thread-C" ).start(); } } class Ticket { private int number = 100 ; public void sale () { if (number > 0 ) { System.out.println(Thread.currentThread().getName() + ":正在出售" + number-- + "的票" ); } } }
Lock 接口
公平锁:十分公平:可以先来后到 非公平锁:十分不公平:可以插队 (默认)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class Demo03Lock { public static void main (String[] args) { Ticket2 ticket = new Ticket2(); new Thread(()->{ for (int i=0 ; i < 50 ; i++) {ticket.sale();} }, "thread-A" ).start(); new Thread(()->{ for (int i=0 ; i < 50 ; i++) { ticket.sale(); } }, "thread-B" ).start(); new Thread(()->{ for (int i=0 ; i < 50 ; i++) { ticket.sale(); } }, "thread-C" ).start(); } } class Ticket2 { private int number = 100 ; Lock lock = new ReentrantLock(); public void sale () { lock.lock(); try { if (number > 0 ) { System.out.println(Thread.currentThread().getName() + ":正在出售" + number-- + "的票" ); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
Synchronized 和 Lock 区别
Synchronized 内置的Java关键字, Lock 是一个Java类
Synchronized 无法判断获取锁的状态,Lock 可以判断是否获取到了锁
Synchronized 会自动释放锁,lock 必须要手动释放锁!如果不释放锁,死锁
Synchronized 线程 1(获得锁,阻塞)、线程2(等待,傻傻的等);Lock锁就不一定会等待下 去,lock会有一个trylock去尝试获取锁 ,不会造成长久的等待
Synchronized 可重入锁,不可以中断的,非公平;Lock ,可重入锁,可以 判断锁,非公平(可以 自己设置);
Synchronized 适合锁少量的代码同步问题,Lock 适合锁大量的同步代码!
生产者消费者问题 Synchronized版本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package com.yangl.study.juc;public class Demo04pcSynchronized { public static void main (String[] args) { Data01 data = new Data01(); new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "Thread-A" ).start(); new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "Thread-B" ).start(); } } class Data01 { private int number = 0 ; public synchronized void increment () throws InterruptedException { if (number != 0 ) { this .wait(); } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); this .notifyAll(); } public synchronized void decrement () throws InterruptedException { if (number == 0 ) { this .wait(); } number--; System.out.println(Thread.currentThread().getName() + "=>" + number); this .notifyAll(); } }
存在问题:虚假唤醒
if 改为 while 判断
结论:就是用if判断的话,唤醒后线程会从 wait 之后的代码开始运行,但是不会重新判断 if 条件,直接继续运行 if 代码块之后的代码,而如果使用 while 的话,也会从 wait 之后的代码运行,但是唤醒后会重新判断循环条件,如果不成立再执行 while 代码块之后的代码块,成立的话继续 wait 。
这也就是为什么用 while 而不用 if 的原因了,因为线程被唤醒后,执行开始的地方是 wait 之后
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 package com.yangl.study.juc;public class Demo04pcSynchronized { public static void main (String[] args) { Data01 data = new Data01(); new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "Thread-A" ).start(); new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "Thread-B" ).start(); } } class Data01 { private int number = 0 ; public synchronized void increment () throws InterruptedException { while (number != 0 ) { this .wait(); } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); this .notifyAll(); } public synchronized void decrement () throws InterruptedException { while (number == 0 ) { this .wait(); } number--; System.out.println(Thread.currentThread().getName() + "=>" + number); this .notifyAll(); } }
JUC版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 package com.yangl.study.juc;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class Demo05pcJuc { public static void main (String[] args) { Data02 data = new Data02(); new Thread(()->{ for (int i = 0 ; i < 10 ; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "Thread-A" ).start(); new Thread(()->{ for (int i = 0 ; i < 10 ; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "Thread-B" ).start(); new Thread(()->{ for (int i = 0 ; i < 10 ; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "Thread-C" ).start(); new Thread(()->{ for (int i = 0 ; i < 10 ; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "Thread-D" ).start(); } } class Data02 { private int number; Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void increment () throws InterruptedException { lock.lock(); try { while (number != 0 ) { condition.await(); } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); condition.signalAll(); } finally { lock.unlock(); } } public void decrement () throws InterruptedException { lock.lock(); try { while (number == 0 ) { condition.wait(); } number--; System.out.println(Thread.currentThread().getName() +"=>" +number); condition.signalAll(); } finally { lock.unlock(); } } }
Condition的优势
精准的通知和唤醒的线程!
如果我们要指定通知的下一个进行顺序怎么办呢? 我们可以使用Condition来指定通知进程~
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 package com.yangl.study.juc;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class Demo06pcCondition { public static void main (String[] args) { Data03 data = new Data03(); new Thread(()->{ for (int i=0 ; i < 10 ; i++) { data.printA(); } }, "Thread-A" ).start(); new Thread(()->{ for (int i=0 ; i < 10 ; i++) { data.printB(); } }, "Thread-B" ).start(); new Thread(()->{ for (int i=0 ; i < 10 ; i++) { data.printC(); } }, "Thread-C" ).start(); } } class Data03 { private int number = 1 ; Lock lock = new ReentrantLock(); Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); Condition condition3 = lock.newCondition(); public void printA () { lock.lock(); try { while (number != 1 ) { condition1.await(); } number++; System.out.println(Thread.currentThread().getName() +"-> AAAA" ); condition2.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB () { lock.lock(); try { while (number != 2 ) { condition2.await(); } number++; System.out.println(Thread.currentThread().getName() +"-> BBBB" ); condition3.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC () { lock.lock(); try { while (number != 3 ) { condition3.await(); } number = 1 ; System.out.println(Thread.currentThread().getName() +"-> CCCC" ); condition1.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
输出结果
8锁现象
如何判断锁的是谁!永远的知道什么锁,锁到底锁的是谁!
深刻理解我们的锁
问题1 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.yangl.study.lock8;import java.util.concurrent.TimeUnit;public class Demo01 { public static void main (String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.sendSms(); }, "Thread-A" ).start(); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone.call(); }, "Thread-B" ).start(); } } class Phone { public synchronized void sendSms () { System.out.println("发短信" ); } public synchronized void call () { System.out.println("打电话" ); } }
输出结果
标准情况下,两个线程先打印 发短信还是 打电话? 1/发短信 2/打电话
发短信
打电话
问题2 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.yangl.study.lock8;import java.util.concurrent.TimeUnit;public class Demo01 { public static void main (String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.sendSms(); }, "Thread-A" ).start(); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone.call(); }, "Thread-B" ).start(); } } class Phone { public synchronized void sendSms () { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信" ); } public synchronized void call () { System.out.println("打电话" ); } }
输出结果
sendSms延迟4秒,两个线程先打印 发短信还是 打电话? 1/发短信 2/打电话
发短信
打电话
总结
synchronized 锁的对象是方法的调用者!
两个方法用的是同一个锁,谁先拿到谁执行!
问题3 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package com.yangl.study.lock8;import java.util.concurrent.TimeUnit;public class Demo01 { public static void main (String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.sendSms(); }, "Thread-A" ).start(); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone.hello(); }, "Thread-B" ).start(); } } class Phone { public synchronized void sendSms () { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信" ); } public synchronized void call () { System.out.println("打电话" ); } public void hello () { System.out.println("hello" ); } }
输出结果
新增一个普通方法,不加 synchronized?
hello
发短信
总结
新增方法没有锁!不是同步方法,不受锁的影响,故最先直接输出
问题4 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package com.yangl.study.lock8;import java.util.concurrent.TimeUnit;public class Demo01 { public static void main (String[] args) { Phone phone1 = new Phone(); Phone phone2 = new Phone(); new Thread(()->{ phone1.sendSms(); }, "Thread-A" ).start(); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone2.call(); }, "Thread-B" ).start(); } } class Phone { public synchronized void sendSms () { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信" ); } public synchronized void call () { System.out.println("打电话" ); } }
输出结果
两个对象,两个同步方法, 发短信还是 打电话?
打电话
发短信
总结
问题5 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package com.yangl.study.lock8;import java.util.concurrent.TimeUnit;public class Demo01 { public static void main (String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.sendSms(); }, "Thread-A" ).start(); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone.call(); }, "Thread-B" ).start(); } } class Phone { public static synchronized void sendSms () { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信" ); } public static synchronized void call () { System.out.println("打电话" ); } }
输出结果
增加两个静态的同步方法,只有一个对象,先打印 发短信?打电话?
发短信
打电话
问题6 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package com.yangl.study.lock8;import java.util.concurrent.TimeUnit;public class Demo01 { public static void main (String[] args) { Phone phone1 = new Phone(); Phone phone2 = new Phone(); new Thread(()->{ phone1.sendSms(); }, "Thread-A" ).start(); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone2.call(); }, "Thread-B" ).start(); } } class Phone { public static synchronized void sendSms () { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信" ); } public static synchronized void call () { System.out.println("打电话" ); } }
输出结果
两个对象!增加两个静态的同步方法, 先打印 发短信?打电话?
发短信
打电话
总结
synchronized 锁的对象是方法的调用者!
static 静态方法。 类一加载就有了!锁的是Class
问题7 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package com.yangl.study.lock8;import java.util.concurrent.TimeUnit;public class Demo01 { public static void main (String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.sendSms(); }, "Thread-A" ).start(); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone.call(); }, "Thread-B" ).start(); } } class Phone { public synchronized void sendSms () { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信" ); } public static synchronized void call () { System.out.println("打电话" ); } }
输出结果
1个静态的同步方法,1个普通的同步方法 ,一个对象,先打印 发短信?打电话?
打电话
发短信
总结
问题8 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package com.yangl.study.lock8;import java.util.concurrent.TimeUnit;public class Demo01 { public static void main (String[] args) { Phone phone1 = new Phone(); Phone phone2 = new Phone(); new Thread(()->{ phone1.sendSms(); }, "Thread-A" ).start(); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone2.call(); }, "Thread-B" ).start(); } } class Phone { public synchronized void sendSms () { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信" ); } public static synchronized void call () { System.out.println("打电话" ); } }
输出结果
1个静态的同步方法,1个普通的同步方法 ,两个对象,先打印 发短信?打电话?
打电话
发短信
总结
8锁现象总结 只有同一把锁,才会等待
static + synchronized 修饰 :锁定的是整个 class
非 static 修饰 : 锁定的是 实例
集合不安全 List不安全 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package com.yangl.study.unsafe;import java.util.ArrayList;import java.util.List;import java.util.UUID;public class Demo01List { public static void main (String[] args) { List<Object> arrayList = new ArrayList<>(); for (int i = 1 ; i <= 10 ; i++) { new Thread(() -> { arrayList.add(UUID.randomUUID().toString().substring(0 , 5 )); System.out.println(arrayList); }, String.valueOf(i)).start(); } } }
会导致 java.util.ConcurrentModificationException 并发修改异常!
解决方案
List list = new Vector<>();
List list = Collections.synchronizedList(new ArrayList<>());
List list = new CopyOnWriteArrayList<>();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.yangl.study.unsafe;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.UUID;import java.util.Vector;import java.util.concurrent.CopyOnWriteArrayList;public class Demo01List { public static void main (String[] args) { List<Object> arrayList = new CopyOnWriteArrayList<>(); for (int i = 1 ; i <= 10 ; i++) { new Thread(() -> { arrayList.add(UUID.randomUUID().toString().substring(0 , 5 )); System.out.println(arrayList); }, String.valueOf(i)).start(); } } }
小结
CopyOnWriteArrayList :写入时复制! COW 计算机程序设计领域的一种优化策略
Vector 底层是使用synchronized 关键字来实现的:效率特别低下。
CopyOnWriteArrayList 使用的是Lock锁,效率会更加高效!
Set不安全 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package com.yangl.study.unsafe;import java.util.HashSet;import java.util.Set;import java.util.UUID;public class Demo02Set { public static void main (String[] args) { Set<Object> set = new HashSet<Object>(); for (int i = 1 ; i <= 30 ; i++) { new Thread(() -> { set.add(UUID.randomUUID().toString().substring(0 , 5 )); System.out.println(set); }, String.valueOf(i)).start(); } } }
解决方案
Set set = Collections.synchronizedSet(new HashSet<>());
Set set = new CopyOnWriteArraySet<>();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.yangl.study.unsafe;import java.util.Collections;import java.util.HashSet;import java.util.Set;import java.util.UUID;import java.util.concurrent.CopyOnWriteArraySet;public class Demo02Set { public static void main (String[] args) { Set<Object> set = new CopyOnWriteArraySet<>(); for (int i = 1 ; i <= 30 ; i++) { new Thread(() -> { set.add(UUID.randomUUID().toString().substring(0 , 5 )); System.out.println(set); }, String.valueOf(i)).start(); } } }
Map不安全 ConcurrentHashMap 源码理解
Callable
可以有返回值
可以抛出异常
方法不同,run()/ call()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.yangl.study.callable;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;public class Demo01Callable { public static void main (String[] args) throws ExecutionException, InterruptedException { for (int i=0 ; i < 10 ; i++) { MyThread myThread = new MyThread(); FutureTask futureTask = new FutureTask(myThread); new Thread(futureTask, String.valueOf(i)).start(); int a = (int ) futureTask.get(); System.out.println(a); } } } class MyThread implements Callable <Integer > { @Override public Integer call () throws Exception { System.out.println(Thread.currentThread().getName() + " is called" ); return Integer.parseInt(Thread.currentThread().getName()); } }
细节: 1、有缓存 2、结果可能需要等待,会阻塞!
常用的辅助类(必会) CountDownLatch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package com.yangl.study.common;import java.util.concurrent.CountDownLatch;public class Demo01CountDownLatch { public static void main (String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(6 ); for (int i = 0 ; i < 6 ; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + ":Go out" ); countDownLatch.countDown(); }, String.valueOf(i)).start(); } countDownLatch.await(); System.out.println("Close Door" ); } }
解析(减法计数器)
每次有线程调用 countDown() 数量 -1,假设计数器变为 0,countDownLatch.await() 就会被唤醒,继续执行!
CyclicBarrier
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package com.yangl.study.common;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class Demo02CyclicBarrier { public static void main (String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(7 , () -> { System.out.println("召唤神龙!" ); }); for (int i = 0 ; i < 7 ; i++) { final int tmp = i + 1 ; new Thread(() -> { System.out.println(Thread.currentThread().getName() + "收到" + tmp + "星龙珠" ); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }, String.valueOf(i)).start(); } } }
解析(加法计数器) Semaphore
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.yangl.study.common;import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;public class Demo03Semaphore { public static void main (String[] args) { Semaphore semaphore = new Semaphore(3 ); for (int i = 0 ; i < 6 ; i++) { new Thread(()->{ try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "获取到车位" ); TimeUnit.SECONDS.sleep(2 ); System.out.println(Thread.currentThread().getName() + "离开车位" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }, String.valueOf(i)).start(); } } }
解析(信号量)
作用: 多个共享资源互斥的使用! 并发限流,控制最大的线程数!
读写锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 package com.yangl.study.rw;import java.util.HashMap;import java.util.Map;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;public class Demo01ReadWriteLock { public static void main (String[] args) { Cache2 cache = new Cache2(); int num = 5 ; for (int i = 0 ; i < num; i++) { int finalI = i; new Thread(() -> { cache.write(String.valueOf(finalI), String.valueOf(finalI)); }).start(); } for (int i = 0 ; i < num; i++) { int finalI = i; new Thread(() -> { cache.read(String.valueOf(finalI)); }).start(); } } } class Cache1 { private volatile Map<String, String> cacheMap = new HashMap<>(); public void write (String key, String value) { System.out.println(Thread.currentThread().getName() + ":开始写入" ); cacheMap.put(key, value); System.out.println(Thread.currentThread().getName() + ":写入OK" ); } public void read (String key) { System.out.println(Thread.currentThread().getName() + ":开始读取" ); String value = cacheMap.get(key); System.out.println(Thread.currentThread().getName() + ":读取Ok" ); } } class Cache2 { private volatile Map<String, String> cacheMap = new HashMap<>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public void write (String key, String value) { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + ":开始写入" ); cacheMap.put(key, value); System.out.println(Thread.currentThread().getName() + ":写入OK" ); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } public void read (String key) { readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + ":开始读取" ); String value = cacheMap.get(key); System.out.println(Thread.currentThread().getName() + ":读取Ok" ); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } } }
阻塞队列 BlockingQueue
阻塞队列:
队列:
图解
什么情况下我们会使用 阻塞队列:多线程并发处理,线程池!
BlockingQueue 的四组api
抛出异常
不会抛出异常
阻塞、等待
超时等待
添加
add
offer
put
offer(timenum.timeUnit)
移除
remove
poll
take
poll(timenum,timeUnit)
判断队首元素
element
peek
-
-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 package com.yangl.study.queue;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.TimeUnit;public class Demo01BlockingQueue { public static void main (String[] args) throws InterruptedException { test4(); } public static void test1 () { ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3 ); System.out.println(blockingQueue.add("a" )); System.out.println(blockingQueue.add("b" )); System.out.println(blockingQueue.add("b" )); System.out.println(blockingQueue.add("d" )); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); } public static void test2 () { ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3 ); System.out.println(blockingQueue.offer("a" )); System.out.println(blockingQueue.offer("b" )); System.out.println(blockingQueue.offer("c" )); System.out.println(blockingQueue.offer("d" )); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); } public static void test3 () throws InterruptedException { ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3 ); blockingQueue.put("a" ); blockingQueue.put("b" ); blockingQueue.put("c" ); blockingQueue.put("d" ); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); } public static void test4 () throws InterruptedException { ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3 ); blockingQueue.offer("a" ); blockingQueue.offer("b" ); blockingQueue.offer("c" ); System.out.println("开始等待" ); System.out.println(blockingQueue.offer("d" ,2 , TimeUnit.SECONDS)); System.out.println("结束等待" ); System.out.println("=========== 取值 ===========" ); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println("开始等待" ); System.out.println(blockingQueue.poll(2 ,TimeUnit.SECONDS)); System.out.println("结束等待" ); } }
同步队列 SynchronousQueue
没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.yangl.study.queue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.TimeUnit;public class Demo02SynchronousQueue { public static void main (String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + ": put A" ); blockingQueue.put("A" ); System.out.println(Thread.currentThread().getName() + ": put B" ); blockingQueue.put("B" ); System.out.println(Thread.currentThread().getName() + ": put C" ); blockingQueue.put("C" ); } catch (InterruptedException e) { e.printStackTrace(); } }, "Thread-A" ).start(); new Thread(() -> { try { TimeUnit.SECONDS.sleep(2 ); System.out.println(Thread.currentThread().getName() + ": get " + blockingQueue.take()); TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName() + ": get " + blockingQueue.take()); TimeUnit.SECONDS.sleep(10 ); System.out.println(Thread.currentThread().getName() + ": get " + blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }, "Thread-B" ).start(); } }
线程池
池化技术
程序的运行,本质:占用系统的资源! 优化资源的使用!=>池化技术
线程池、连接池、内存池、对象池///….. 创建、销毁。十分浪费资源
池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我。
线程池的好处:(线程复用、可以控制最大并发数、管理线程)
降低资源的消耗
提高响应的速度
方便管理。
三大方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.yangl.study.pool;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Demo01ThreadPoolCreate { public static void main (String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); try { for (int i = 0 ; i < 10 ; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + " OK" ); }); } } finally { threadPool.shutdown(); } } }
七大参数 源码解析(本质 ThreadPoolExecutor
) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException(); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
参数
含义
int corePoolSize
核心线程池大小
int maximumPoolSize
最大核心线程池大小
long keepAliveTime
超时了没有人调用就会释放
TimeUnit unit
超时单位
BlockingQueue workQueue
阻塞队列
ThreadFactory threadFactory
线程工厂:创建线程的,一般不用动
RejectedExecutionHandler handler
拒绝策略
四种拒接策略
new ThreadPoolExecutor.AbortPolicy(): //该拒绝策略为:银行满了,还有人进来,不处理这个人的,并抛出异常
超出最大承载,就会抛出异常:队列容量大小 + maxPoolSize
new ThreadPoolExecutor.CallerRunsPolicy(): //该拒绝策略为:哪来的去哪里 main线程进行处理
new ThreadPoolExecutor.DiscardPolicy(): //该拒绝策略为:队列满了,丢掉异常,不会抛出异常。
new ThreadPoolExecutor.DiscardOldestPolicy(): //该拒绝策略为:队列满了,尝试去和最早的进程竞争,不会抛出异常
四大函数式接口(必需掌握) Function 函数型接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.yangl.study.function;import java.util.function.Function;public class Demo01Function { public static void main (String[] args) { Function<String, String> function = new Function<String, String>() { @Override public String apply (String s) { return s; } }; System.out.println(function.apply("function" )); Function<String, String> function1 = (str) -> { return str; }; System.out.println(function1.apply("function1" )); } }
Predicate 断定型接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package com.yangl.study.function;import java.util.function.Predicate;public class Demo02Predicate { public static void main (String[] args) { Predicate<String> predicate = new Predicate<String>() { @Override public boolean test (String s) { return s.isEmpty(); } }; System.out.println(predicate.test("hello" )); Predicate<String> predicate1 = (str) -> { return str.isEmpty(); }; System.out.println(predicate1.test("yangl" )); } }
Consummer 消费型接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.yangl.study.function;import java.util.function.Consumer;public class Demo03Consummer { public static void main (String[] args) { Consumer<String> consumer = new Consumer<String>() { @Override public void accept (String s) { System.out.println(s); } }; consumer.accept("test" ); Consumer<String> consumer1 = (str)->{ System.out.println(str); }; consumer1.accept("yangl" ); } }
Suppier 供给型接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.yangl.study.function;import java.util.function.Supplier;public class Demo04Supplier { public static void main (String[] args) { Supplier supplier = new Supplier<Integer>() { @Override public Integer get () { return 1024 ; } }; System.out.println(supplier.get()); Supplier supplier1 = ()->{ return 1024 ; }; System.out.println(supplier1.get()); } }
Stream流式计算
大数据:存储 + 计算
集合、MySQL 本质就是存储东西的;
计算都应该交给流来操作!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 package com.yangl.study.stream;import java.util.Arrays;import java.util.List;public class Demo01Stream { public static void main (String[] args) { User u1 = new User(1 ,"a" ,21 ); User u2 = new User(2 ,"b" ,22 ); User u3 = new User(3 ,"c" ,23 ); User u4 = new User(4 ,"d" ,24 ); User u5 = new User(5 ,"e" ,25 ); User u6 = new User(6 ,"f" ,26 ); List<User> list = Arrays.asList(u1, u2, u3, u4, u5, u6); list.stream() .filter(user -> {return user.getId()%2 == 0 ;}) .filter(user -> {return user.getAge() > 23 ;}) .map(user -> {return user.getName().toUpperCase();}) .sorted((user1, user2) -> {return user2.compareToIgnoreCase(user1);}) .limit(1 ) .forEach(System.out::println); } } class User { private Integer id; private String name; private Integer age; public User (Integer id, String name, Integer age) { this .id = id; this .name = name; this .age = age; } public Integer getId () { return id; } public void setId (Integer id) { this .id = id; } public String getName () { return name; } public void setName (String name) { this .name = name; } public Integer getAge () { return age; } public void setAge (Integer age) { this .age = age; } }
ForkJoin
ForkJoin 特点:工作窃取
ForkJoin
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package com.yangl.study.forkjoin;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.stream.LongStream;public class Demo01Forkjoin { public static void main (String[] args) throws ExecutionException, InterruptedException { System.out.println("************ 普通自加 *********" ); test1(); System.out.println("************ ForkJoin *********" ); test2(); System.out.println("************ Stream *********" ); test3(); } public static void test1 () { Long sum = 0L ; long start = System.currentTimeMillis(); for (Long i = 0L ; i <= 10_0000_0000 ; i++) { sum +=i; } System.out.println("sum=" + sum + " 时间:" + (System.currentTimeMillis() - start)); } public static void test2 () throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinHelper(0L , 10_0000_0000L ); ForkJoinTask<Long> submit = forkJoinPool.submit(task); Long sum = submit.get(); System.out.println("sum=" + sum + " 时间:" + (System.currentTimeMillis() - start)); } public static void test3 () { long start = System.currentTimeMillis(); long sum = LongStream.rangeClosed(0L , 10_0000_0000L ).parallel().reduce(0 , Long::sum); System.out.println("sum=" + sum + " 时间:" + (System.currentTimeMillis() - start)); } }
异步回调
Future 设计的初衷: 对将来的某个事件的结果进行建模
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package com.yangl.study.future;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;public class Demo01CompletableFuture { public static void main (String[] args) throws ExecutionException, InterruptedException { System.out.println(test2()); } public static void test1 () throws ExecutionException, InterruptedException { CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName() + " start: runAsync=>Void" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " end: runAsync=>Void" ); }); completableFuture.get(); } public static Integer test2 () throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " start: supplyAsync=>Integer" ); int i = 10 / 0 ; return 1024 ; }); return completableFuture.whenComplete((t, u) -> { System.out.println("正常信息返回=>" + t); System.out.println("异常信息返回=>" + u); }).exceptionally((e) -> { System.out.println("异常信息:" + e.getMessage()); return 404 ; }).get(); } }
JMM
请你谈谈你对 Volatile 的理解
什么是JMM
Java 内存模型,不存在的东西,概念!约定!
关于JMM的一些同步的约定
线程解锁前,必须把共享变量立刻刷回主存。
线程加锁前,必须读取主存中的最新值到工作内存中!
加锁和解锁是同一把锁
Volatile 保证可见性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.yangl.study.jmm;import java.util.concurrent.TimeUnit;public class Demo01Volatile { private volatile static int num = 0 ; public static void main (String[] args) { new Thread(() -> { while (num == 0 ) { } System.out.println("num的值为:" + num); }).start(); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } num = 1 ; System.out.println("num设值完成" ); System.out.println(num); } }
不保证原子性
原子性:不可分割;
线程A在执行任务的时候,不能被打扰的,也不能被分割的,要么同时成功,要么同时失败。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.yangl.study.jmm;import java.util.concurrent.TimeUnit;public class Demo02Volatile { private volatile static int num = 0 ; public static void add () { num++; } public static void main (String[] args) { for (int i = 1 ; i <= 20 ; i++) { new Thread(() -> { for (int j = 1 ; j <= 1000 ; j++) { add(); } }).start(); } while (Thread.activeCount() > 2 ) { Thread.yield(); } System.out.println(Thread.currentThread().getName() + ",num=" + num); } }
如果不加 lock 和 synchronized,怎么样保证原子性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.yangl.study.jmm;import java.util.concurrent.atomic.AtomicInteger;public class Demo03Volatile { private volatile static AtomicInteger num = new AtomicInteger(); public static void add () { num.incrementAndGet(); } public static void main (String[] args) { for (int i = 1 ; i <= 20 ; i++) { new Thread(() -> { for (int j = 1 ; j <= 1000 ; j++) { add(); } }).start(); } while (Thread.activeCount() > 2 ) { Thread.yield(); } System.out.println(Thread.currentThread().getName() + ",num=" + num); } }
指令重排
什么是 指令重排:你写的程序,计算机并不是按照你写的那样去执行的。
源代码–>编译器优化的重排–> 指令并行也可能会重排–> 内存系统也会重排—> 执行
volatile可以避免指令重排
Volatile 是可以保持 可见性。不能保证原子性,由于内存屏障,可以保证避免指令重排的现象产生!
内存屏障。CPU指令。作用: 1、保证特定的操作的执行顺序! 2、可以保证某些变量的内存可见性 (利用这些特性volatile实现了可见性)
彻底玩转单例模式 饿汉式 DCL 懒汉式 静态内部类 单例不安全,反射 枚举 深入理解CAS 原子引用