JUC并发编程
JUC
一.什么是JUC
二.基础知识
Java进阶八—多线程_java开启多线程处理多任务-CSDN博客
Java默认有两个线程:main,gc
并发编程的本质:并发编程的本质是充分利用cpu资源。
Java真的可以开启线程吗:Java创建Thread类调用start方法,底层是把线程放到一个组里面,然后调用一个本地方法start0;方法底层是C++;Java无法操作硬件。
三.多线程回顾
1.线程的几种状态
public enum State {
/**
* 尚未启动的线程的线程状态。新生
*/
NEW,
/**
* 可运行线程的线程状态。
* 处于可运行状态的线程,正在Java虚拟机中执行,
* 但它可能正在等待来自操作系统的其他资源,例如处理器。运行
*/
RUNNABLE,
/**
* 等待监视器锁而阻塞的线程的线程状态。
* 处于阻塞状态的线程正在等待监视器锁进入同步块/方法,或者在调用后重新进入同步块/方法。阻塞
*/
BLOCKED,
/**
* 等待线程的线程状态。
* 线程处于等待状态,因为调用了一个以下方法:
* Object.wait、Thread.join、LockSupport.park。等待,死死的等待
*/
WAITING,
/**
* 具有指定等待时间的等待线程的线程状态。
* 线程处于定时等待状态,因为调用了以下方法与指定的正等待时间:
* Thread.sleep、Object.wait(long)、Thread.join(long)、LockSupport.parkNanos、LockSupport.parkUntil
* 超时等待
*/
TIMED_WAITING,
/**
* 终止线程的线程状态。
* 线程已完成执行。终止
*/
TERMINATED;
}
2.sleep和wait的区别
sleep是Thread类的本地方法;wait是Object类的方法。
sleep不释放锁;wait释放锁。
sleep不需要和synchronized关键字一起使用;wait必须和synchronized代码块一起使用。
sleep不需要被唤醒(时间到了自动退出阻塞);wait需要被唤醒。
sleep一般用于当前线程休眠,或者轮循暂停操作;wait则多用于多线程之间的通信。
四.Lock锁
1.串行执行
synchronized
和 Lock
都是用于控制多个线程对共享资源的访问,确保在同一时刻只有一个线程可以访问特定的代码段。这使得线程从并行执行变为串行执行,从而避免资源竞争和数据不一致的问题。
串行执行(Serial Execution)是指多个任务或指令按照顺序,一个接一个地执行,而不是同时进行。在串行执行中,一个任务必须在下一个任务开始之前完全完成。这种执行方式与并行执行(Parallel Execution)相对,后者允许多个任务同时进行。
synchronized必须在循环的里面,如果在外面会导致串行执行变成一个线程单一执行。
当一个线程进入 sale
方法时,它会持有锁,直到 sale
方法执行完毕。
public class SaleTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
Thread thread1 = new Thread(ticket, "A");
Thread thread2 = new Thread(ticket, "B");
thread1.start();
thread2.start();
}
}
class Ticket implements Runnable {
// 票数
private int number = 30;
@Override
public void run() {
sale();
}
public synchronized void sale() {
while (number > 0) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " 卖出 " + (number--) + " 票");
}
}
}
public class SaleTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
Thread thread1 = new Thread(ticket, "A");
Thread thread2 = new Thread(ticket, "B");
thread1.start();
thread2.start();
}
}
class Ticket implements Runnable {
// 票数
private int number = 30;
@Override
public void run() {
while (number > 0) {
sale();
}
}
public synchronized void sale() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " 卖出 " + (number--) + " 票");
}
}
如果不睡眠也会导致,只有一个线程在执行:
问题在于 run
方法中包含一个 while
循环,这个循环会不断调用 sale
方法,直到 number
变为 0。由于 sale
方法是同步的,当一个线程进入 sale
方法时,它会持有锁,直到 sale
方法执行完毕。如果 sale
方法执行得非常快,那么其他线程可能根本没有机会进入 sale
方法,因为锁一直被持有。
public class SaleTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
Thread thread1 = new Thread(ticket, "A");
Thread thread2 = new Thread(ticket, "B");
thread1.start();
thread2.start();
}
}
class Ticket implements Runnable {
// 票数
private int number = 30;
@Override
public void run() {
while (number > 0) {
sale();
}
}
public synchronized void sale() {
System.out.println(Thread.currentThread().getName() + " 卖出 " + (number--) + " 票");
}
}
也可以让每一个线程调用不同Runnable接口的循环,就可以解决以上问题
package JUC;
/*
* 真正的多线程开发,公司中的开发,降低耦合型
* 线程就是一个单独的资源类,没有任何附属的操作!
* 1. 属性 方法
* */
public class SaleTicket {
public static void main(String[] args) {
//并发:多线程操作同一个资源类,把资源丢入线程
Ticket ticket = new Ticket();
//@FunctionalInterface 函数式接口,jkd1.8 lambda 表达式(参数)->{代码}
new Thread(()->{
for (int i = 0; i < 30; i++) {
ticket.sale();
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 30; i++) {
ticket.sale();
}
},"B").start();
}
}
//资源类OOP
class Ticket{
//属性 方法
private int number = 30;
//卖票的方式
public synchronized void sale(){
if (number>0){
System.out.println(Thread.currentThread().getName()+"卖出了"+(number--)+"票,剩余"+number);
}
}
}
2.Lock锁
公平锁:十分公平,不能插队。
非公平锁:十分不公平,可以插队。(默认非公平锁)
3.Lock锁和synchronized的区别
- Synchronized是内置Java关键字;Lock是一个Java类。
- Synchronized无法判断获取锁的状态;Lock可以判断是否获取到了锁。(boolean b = lock.tryLock();)
- Synchronized会自动释放锁;Lock必须要手动释放锁,如果不释放锁,死锁。
- Synchronized线程1获得锁阻塞时,线程2会一直等待下去;Lock锁线程1获得锁阻塞时,线程2等待足够长的时间后中断等待,去做其他的事。
- Synchronized可重入锁,不可以中断的,非公平;Lock,可重入锁,可以判断锁,非公平(可以自己设置)。
lock.lockInterruptibly();方法:当两个线程同时通过该方法想获取某个锁时,假若此时线程A获取到了锁,而线程B只有在等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程。 - Synchronized适合锁少量的代码同步问题;Lock适合锁大量的同步代码。
4.虚假唤醒问题
if判断改成while判断
5.生产者和消费者问题:JUC版
public class Main {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.Producer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.Customer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.Producer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.Customer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
// 判断等待,业务,通知
class Data {
private boolean carStatus = false;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void Producer() throws InterruptedException {
try {
lock.lock();
while (carStatus == true) {
condition.await();
}
carStatus = true;
System.out.println(Thread.currentThread().getName() + "=>" + carStatus);
condition.signalAll();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
public void Customer() throws InterruptedException {
try {
lock.lock();
while (carStatus == false){
condition.await();
}
carStatus = false;
System.out.println(Thread.currentThread().getName() + "=>" + carStatus);
condition.signalAll();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
6.Condition实现精准通知唤醒
public class Main {
public static void main(String[] args) {
Data01 data01 = new Data01();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data01.A();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data01.B();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data01.C();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
}
}
// 判断等待,业务,通知
//A执行完调用B,B执行完调用C,C执行完调用A
class Data01 {
private int num = 1;// 1A 2B 3C
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void A() throws InterruptedException {
lock.lock();
try {
// 业务代码,判断=>执行=>通知!
while (num!=1){
condition1.await();
}
System.out.println(Thread.currentThread().getName()+"=>AAAAA");
num = 2;
// 唤醒指定的线程,B
condition2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void B() throws InterruptedException {
lock.lock();
try {
while (num!=2){
condition2.await();
}
num = 3;
System.out.println(Thread.currentThread().getName()+"=>BBBBB");
// 唤醒指定的线程,C
condition3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void C() throws InterruptedException {
lock.lock();
try {
while (num!=3){
condition3.await();
}
num = 1;
System.out.println(Thread.currentThread().getName()+"=>CCCCC");
// 唤醒指定的线程,A
condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
五.8锁现象(锁的八个问题)
package lock;
import java.util.concurrent.TimeUnit;
/*
* 1.标准情况下,两个线程先打印发短信还是打电话?1.发短信﹑2.打电话
* 2.sendSms延迟4秒,两个线程先打印 发短信还是 打电话?1.发短信 2.打电话
* */
public class Demo1 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
//锁的存在
new Thread(()->{
phone.sendSms();
},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
phone.call();
},"B").start();
}
}
class Phone{
//synchronized 锁的对象是方法的调用者
//两个方法是同一个锁,谁先拿到谁先执行
public synchronized void sendSms(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
}
package lock;
import java.util.concurrent.TimeUnit;
/*
* 3.增加了一个普通方法,先发短信还是Hello
* 4.两个对象,两个同步方法,先发短信还是先打电话
* */
public class Demo1 {
public static void main(String[] args) throws InterruptedException {
//两个对象,两个调用者,两把锁!
Phone phone = new Phone();
Phone phone2 = new Phone();
//锁的存在
new Thread(()->{
phone.sendSms();
},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
phone2.call();
},"B").start();
}
}
class Phone{
//synchronized 锁的对象是方法的调用者
//两个方法是同一个锁,谁先拿到谁先执行
public synchronized void sendSms(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
//这里没有锁,不是同步方法,不受锁的影响
public void hello(){
System.out.println("hello");
}
}
package lock;
import java.util.concurrent.TimeUnit;
/*
* 5.增加两个静态的同步方法,只有一个对象,先打印 发短信还是打电话
* 6.两个对象!增加两个静态的同步方法, 先打印 发短信还是打电话
* */
public class Demo1 {
public static void main(String[] args) throws InterruptedException {
//两个对象的Class类模板只有一个,static,锁的是Class
Phone phone = new Phone();
Phone phone2 = new Phone();
//锁的存在
new Thread(()->{
phone.sendSms();
},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
phone2.call();
},"B").start();
}
}
//Phone唯一的一个Class对象
class Phone{
//synchronized 锁的对象是方法的调用者
//static 静态方法
//类一加载就有了!锁的是Class
public static synchronized void sendSms(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public static synchronized void call(){
System.out.println("打电话");
}
}
package lock;
import java.util.concurrent.TimeUnit;
/*
* 7.1个静态的同步方法,1个普通的同步方法,1个对象,先打印谁
* 8.1个静态的同步方法,1个普通的同步方法,2个对象,先打印谁
* */
public class Demo1 {
public static void main(String[] args) throws InterruptedException {
//两个对象的Class类模板只有一个,static,锁的是Class
Phone phone = new Phone();
Phone phone2 = new Phone();
//锁的存在
new Thread(() -> {
phone.sendSms();
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
//Phone唯一的一个Class对象
class Phone {
//静态的同步方法 锁的是Class类模板
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
//普通的同步方法 锁的调用者
public synchronized void call() {
System.out.println("打电话");
}
}
package lock;
import java.util.concurrent.TimeUnit;
/*
* 7.1个静态的同步方法,1个普通的同步方法,1个对象,先打印谁
* 8.1个静态的同步方法,1个普通的同步方法,2个对象,先打印谁
* */
public class Demo1 {
public static void main(String[] args) throws InterruptedException {
//两个对象的Class类模板只有一个,static,锁的是Class
Phone phone = new Phone();
Phone phone2 = new Phone();
//锁的存在
new Thread(() -> {
phone.sendSms();
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
//Phone唯一的一个Class对象
class Phone {
//静态的同步方法 锁的是Class类模板
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
//普通的同步方法 锁的调用者
public synchronized void call() {
System.out.println("打电话");
}
}
六.集合类不安全
1.ArryList集合
public class Test {
public static void main(String[] args) {
List<String> strings = new ArrayList<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
strings.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(strings);
}).start();
}
}
}
多线程下不安全;可能会报错:java.util.ConcurrentModificationException(并发修改异常)
解决方案:
- List<String> list = new Vector<>();
- List<String> strings = Collections.synchronizedList(new ArrayList<>());
- List<String> strings = new CopyOnWriteArrayList<>();
概念:CopyOnWrite写入时复制,计算机程序设计语言的一种优化策略。(保证效率和性能问题)
2.HashSet集合
public class Test {
public static void main(String[] args) {
HashSet<String> strings = new HashSet<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
strings.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(strings);
}).start();
}
}
}
多线程下不安全;可能会报错:java.util.ConcurrentModificationException(并发修改异常)
解决方案:
- Set<String> strings = Collections.synchronizedSet(new HashSet<>());
- Set<String> strings = new CopyOnWriteArraySet<>();
hashset集合的底层是hashmap的key
3.HashMap集合
public class Test {
public static void main(String[] args) {
Map<String, String> map = new HashMap<>(16, 0.75F);
for (int i = 0; i < 10; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
}).start();
}
}
}
多线程下不安全;可能会报错:java.util.ConcurrentModificationException(并发修改异常)
解决方案:
- 使用Map<String, String> concurrentHashMap = new ConcurrentHashMap<>();
七.Callable接口
Callable接口类似于Runnable接口,线程第三种创建方式。
- 可以抛出异常。
- 可以有返回值。
- 方法不同与Runnable接口。Call方法
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 适配类
FutureTask futureTask = new FutureTask(new MyThread());
new Thread(futureTask,"A").start();
// 打印一个Call,结果会被缓存,提高效率
new Thread(futureTask,"B").start();
// 拿返回值,get方法可能会产生阻塞
String str = (String) futureTask.get();
System.out.println(str);
}
}
class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("Hello Call");
return "Call";
}
}
八.常用辅助类(AbstractQueuedSynchronizer(AQS))
1.CountDownLatch
应用场景:1.多线程任务汇总。2.多线程任务阻塞住,等待发令枪响,一起执行。
每次有线程调用,数量-1,当计数器归零,countDownLatch.await()就会被唤醒向下执行。
public static void main(String[] args) throws InterruptedException {
// 总数是6,必须要是执行任务的时候使用
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"=>Go Out");
countDownLatch.countDown();// 数量-1
}).start();
}
countDownLatch.await();// 等待计数器归零,然后再往下执行
System.out.println("关门");
}
2.CyclicBarrier
public static void main(String[] args) {
// 集齐七颗龙珠召唤神龙
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {// 如果计数器为7,线程只有6个,则会等待,不进行召唤神龙
System.out.println("召唤神龙");
});
for (int i = 0; i < 7; i++) {
final int temp = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "收集" + temp + "个龙珠!");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
3.Semaphore
Semaphore:信号量
public static void main(String[] args) {
// 线程数量:停车位!限流
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
semaphore.acquire();// 得到
System.out.println(Thread.currentThread().getName()+"抢到车位!");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车位!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();// 释放
}
}).start();
}
}
原理:
semaphore.acquire();获得,假设已经满了则等待,等待其他线程释放。
semaphore.release();释放,会将当前的信号量释放+1,然后唤醒等待的线程
九.读写锁
ReadWriteLock接口有一个实现类ReentrantReadWriteLock类。
读可以被多个线程同时读,写的时候只能有一个线程去写
/**
* 独占锁(写锁):一次只能被一个线程占有
* 共享锁(读锁):多个线程可以同时占有
* ReentrantLock:
* 读-读:可以共存
* 读-写:不可以共存
* 写-写:不可以共存
*/
public class ReentrantLockDemo {
public static void main(String[] args) {
MyCacheLock myCache = new MyCacheLock();
// 5个线程写
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.put(temp + "", temp + "");
}, String.valueOf(i)).start();
}
// 5个线程读
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.get(temp + "");
}, String.valueOf(i)).start();
}
}
}
class MyCacheLock {
private volatile Map<String, Object> map = new HashMap<>();
// 读写锁,更加细粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 写,同时只有一个线程写
public void put(String key, Object obj) {
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入");
map.put(key, obj);
System.out.println(Thread.currentThread().getName() + "写入OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
// 读,所有线程都可以读
public void get(String key) {
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读取");
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
十.阻塞队列BlockingQueue
1.介绍
2.四组API
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞,一直等待 | 阻塞,超时等待 |
---|---|---|---|---|
添加 | add() | offer() | put() | offer(,,) |
移除 | remove() | pull() | take() | pull(,) |
检测队首元素 | element() | peek() | - | - |
// 队列的大小为3
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
// add()方法返回boolean值
boolean flag1 = blockingQueue.add("a");
boolean flag2 = blockingQueue.add("b");
boolean flag3 = blockingQueue.add("c");
// add添加元素超过队列的长度
// boolean flag4 = blockingQueue.add("d");
// 会抛出异常java.lang.IllegalStateException: Queue full
System.out.println(blockingQueue.element());// 获得队首元素
// remove()返回本次移除的元素
Object e1 = blockingQueue.remove();
Object e2 = blockingQueue.remove();
Object e3 = blockingQueue.remove();
// 队列中没有元素仍继续移除元素
// Object e4 = blockingQueue.remove();
// 会抛出异常java.util.NoSuchElementException
// 队列的大小为3
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
// offer返回boolean值
boolean flag1 = blockingQueue.offer("a");
boolean flag2 = blockingQueue.offer("b");
boolean flag3 = blockingQueue.offer("c");
// boolean flag4 = blockingQueue.offer("d");
// offer添加元素超过队列的长度会返回false
System.out.println(blockingQueue.peek());// 获得队首元素
// poll()返回本次移除的元素
Object poll1 = blockingQueue.poll();
Object poll2 = blockingQueue.poll();
Object poll3 = blockingQueue.poll();
// Object poll4 = blockingQueue.poll();
// 队列中没有元素仍继续移除元素会打印出null
// 队列的大小为3
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
// put没有返回值
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
blockingQueue.put("d");
// put添加元素超过队列的长度会一直等待
// take()返回本次移除的元素
Object take1 = blockingQueue.take();
Object take2 = blockingQueue.take();
Object take3 = blockingQueue.take();
Object take4 = blockingQueue.take();
// 队列中没有元素仍继续移除元素会一直等待
// 队列的大小为3
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
// offer返回boolean值
boolean flag1 = blockingQueue.offer("a");
boolean flag2 = blockingQueue.offer("b");
boolean flag3 = blockingQueue.offer("c");
// offer添加元素超过队列的长度会返回false;并且等待指定时间后推出,向下执行
boolean flag4 = blockingQueue.offer("d", 2, TimeUnit.SECONDS);
// poll()返回本次移除的元素
Object poll1 = blockingQueue.poll();
Object poll2 = blockingQueue.poll();
Object poll3 = blockingQueue.poll();
// 队列中没有元素仍继续移除元素会打印出null,等待指定之间后退出。
Object poll4 = blockingQueue.poll(2,TimeUnit.SECONDS);
3.SynchronousQueue同步队列
进去一个元素,必须等待取出这个元素后,才能放下一个元素。put()、take()
十一.线程池
1.基本概念
程序运行的本质是占用系统资源,为了优化资源的使用,引入了池化技术
比如线程池、连接池。内存池、对象池
jdbc连接池会有一个最小的池(就是默认有几个连接的),有一个最大的池因为连接跟关闭的时候是非常消耗资源的
池化技术:事先准备好一些资源,有人要用,就来拿,用完就归还
线程池的好处:降低资源消耗,提高响应速度,方便管理线程,线程可以复用,可以控制最大并发数
2. 三大方法
ExecutorService threadPool = Executors.newSingleThreadExecutor();
ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建一个固定的线程池的大小
ExecutorService threadPool = Executors.newCachedThreadPool(); // 可伸缩的,遇强则强,遇弱则弱
ExecutorService threadPool = Executors.newSingleThreadExecutor();
try {
for(int i=0;i<10;i++)
{
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();// 线程池用完,程序结束,关闭线程池
//为了保证关闭线程池,把关闭的语句放在finally里面
}
}
ExecutorService threadPool = Executors.newFixedThreadPool(5);
try {
for(int i=0;i<10;i++)
{
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();// 线程池用完,程序结束,关闭线程池
//为了保证关闭线程池,把关闭的语句放在finally里面
}
ExecutorService threadPool = Executors.newCachedThreadPool();
try {
for(int i=0;i<10;i++)
{
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();// 线程池用完,程序结束,关闭线程池
//为了保证关闭线程池,把关闭的语句放在finally里面
}
3.七大参数
// newSingleThreadExecutor()源码分析
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// newFixedThreadPool()源码分析
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// newCachedThreadPool()源码分析
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可以看到,三个方法的底层都是new ThreadPoolExecutor
这个时候我们再回到阿里巴巴开发手册
4.四种拒绝策略
下图,银行柜台和候客区都满了,现在还有客户来,就只能拒绝了
手动创建一个线程池
模拟上面的银行业务
核心线程大小设为2:就是一直工作的窗口
最大线程设为5:就是银行最多的工作窗口
keepAliveTime设置为1小时:如果1小时都没有业务,就关闭窗口
候客区:new LinkedBlockingQueue(3),假设候客区最多3个人
线程工厂:就用默认的,Executors.defaultThreaFactory()
拒绝策略: 可以发现有4种拒绝策略,用默认的AbortPolicy()//银行满了,但是还有人进来,就不处理这个人,并抛出异常
new ThreadPoolExecutor.AbortPolicy());
ThreadPoolExecutor threadPool= new ThreadPoolExecutor(2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
try {
for(int i=1;i<=3;i++)
{
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
当只有3个任务的时候,没有触发最大线程,即没有额外开银行窗口
增加到6个人的时候,触发了一个额外窗口
增加到8个人的时候,触发了全部窗口,即一共5个线程
增加到10个人就抛出异常了,超过了最大承载
现在改变拒绝策略ThreadPoolExecutor.CallerRunsPolicy()
ThreadPoolExecutor.DiscardPolicy()队列满了,不抛出异常,也不处理任务可以看到,只处理了8个任务
ThreadPoolExecutor.DiscardOldestPolicy()
队列满了,就尝试和最早的任务竞争,如果竞争失败,这个任务就没了,也不会抛出异常,如果竞争成功,这个任务就可以执行
5.如何设置线程池的大小
CPU密集型:电脑的核数是几核就选择几;选择maximunPoolSize的大小
// 获取cpu 的核数
int max = Runtime.getRuntime().availableProcessors();
ExecutorService service =new ThreadPoolExecutor(
2,
max,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
I/O密集型:
在程序中有15个大型任务,io十分占用资源;I/O密集型就是判断我们程序中十分耗I/O的线程数量,大约是最大I/O数的一倍到两倍之间。
十二.四大函数式接口
新时代的程序员:lambda表达式、链式编程、函数式接口、Stream流式计算
函数式接口:只有一个方法的接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
1.Function 函数型接口
public class FunctionDemo {
public static void main(String[] args) {
Function<String, String> function = (str) -> {return str;};
System.out.println(function.apply("aaaaaaaaaa"));
}
}
2.Predicate 断定型接口
public class PredicateDemo {
public static void main(String[] args) {
Predicate<String> predicate = (str) -> {return str.isEmpty();};
// false
System.out.println(predicate.test("aaa"));
// true
System.out.println(predicate.test(""));
}
}
3.Consummer 消费型接口
/**
* 消费型接口 没有返回值!只有输入!
*/
public class Demo3 {
public static void main(String[] args) {
Consumer<String> consumer = (str)->{
System.out.println(str);
};
consumer.accept("abc");
}
}
4.Supplier 供给型接口
/**
* 供给型接口,只返回,不输入
*/
public class Demo4 {
public static void main(String[] args) {
Supplier<String> supplier = ()->{return "1024";};
System.out.println(supplier.get());
}
}
十三.Stream 流式计算
// 一.java.util Interface Collection<E>\
// default Stream<E> stream() 返回以此集合作为源的顺序 Stream.
// 二.java.util Interface Comparator<T>
// int compare(T o1, T o2) 比较其两个参数的顺序。
// 三.java.util.stream.Stream
// Stream<T> filter(Predicate<? super T> predicate) 返回由与此给定谓词匹配的此流的元素组成的流。
// <R> Stream<R> map(Function<? super T,? extends R> mapper) 返回由给定函数应用于此流的元素的结果组成的流。
// Stream<T> sorted(Comparator<? super T> comparator) 返回由该流的元素组成的流,根据提供的 Comparator进行排序
// Stream<T> limit(long maxSize) 返回由此流的元素组成的流,截短长度不能超过 maxSize 。
// void forEach(Consumer<? super T> action) 对此流的每个元素执行操作。
public class StreamDemo {
public static void main(String[] args) {
User u1 = new User(1,"a",21);
User u2 = new User(2,"b",22);
User u3 = new User(3,"c",23);
User u4 = new User(4,"d",24);
User u5 = new User(5,"e",25);
List<User> list = Arrays.asList(u1,u2,u3,u4,u5);
//计算交给Stream流
//lambda表达式,链式编程,函数式接口,Stream流式计算
list.stream()
.filter(u->{return u.getId()%2==0;})
.filter(u->{return u.getAge()>23;})
.map(u->{return u.getName().toUpperCase();})
.sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
.limit(1)
.forEach(System.out::println);
}
}
class User{
private int id;
private String name;
private int age;
public User(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
十四.ForkJoin
ForkJoin 在JDK1.7,并行执行任务!提高效率~。在大数据量速率会更快!
大数据中:MapReduce 核心思想->把大任务拆分为小任务!
1.特点: 工作窃取!
实现原理是:双端队列!从上面和下面都可以去拿到任务进行执行
2.如何使用ForkJoin?
-
1、通过ForkJoinPool来执行
-
2、计算任务 execute(ForkJoinTask<?> task)
-
3、计算类要去继承ForkJoinTask;
ForkJoinTask
方法摘要 | |
---|---|
ForkJoinTask < V > fork() | 在当前任务正在运行的池中异步执行此任务(如果适用) |
V get() | 等待计算完成,然后检索其结果 |
ForkJoinPool
方法摘要 | |
---|---|
< T > ForkJoinTask< T > submit(ForkJoinTask< T > task) | 提交一个ForkJoinTask来执行 |
LongStream
方法摘要 | |
---|---|
< T > ForkJoinTask< T > submit(ForkJoinTask< T > task) | 提交一个ForkJoinTask来执行 |
LongStream parallel() | 返回平行的等效流 |
static LongStream rangeClosed(long startInclusive, long endInclusive) | 返回有序顺序 LongStream从 startInclusive (含)至 endInclusive通过的递增步长(含) 1 |
long reduce(long identity, LongBinaryOperator op) | 使用提供的身份值和 associative累积功能对此流的元素执行 reduction ,并返回减小的值 |
ForkJoin 的计算类
package com.marchsoft.forkjoin;
import java.util.concurrent.RecursiveTask;
public class ForkJoinDemo extends RecursiveTask<Long> {
private long star;
private long end;
/** 临界值 */
private long temp = 1000000L;
public ForkJoinDemo(long star, long end) {
this.star = star;
this.end = end;
}
/**
* 计算方法
* @return
*/
@Override
protected Long compute() {
if ((end - star) < temp) {
Long sum = 0L;
for (Long i = star; i < end; i++) {
sum += i;
}
return sum;
}else {
// 使用ForkJoin 分而治之 计算
//1 . 计算平均值
long middle = (star + end) / 2;
ForkJoinDemo forkJoinDemo1 = new ForkJoinDemo(star, middle);
// 拆分任务,把线程压入线程队列
forkJoinDemo1.fork();
ForkJoinDemo forkJoinDemo2 = new ForkJoinDemo(middle, end);
forkJoinDemo2.fork();
long taskSum = forkJoinDemo1.join() + forkJoinDemo2.join();
return taskSum;
}
}
}
package com.marchsoft.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class ForkJoinTest {
private static final long SUM = 20_0000_0000;
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();
test2();
test3();
}
/**
* 使用普通方法
*/
public static void test1() {
long star = System.currentTimeMillis();
long sum = 0L;
for (long i = 1; i < SUM ; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println(sum);
System.out.println("时间:" + (end - star));
System.out.println("----------------------");
}
/**
* 使用ForkJoin 方法
*/
public static void test2() throws ExecutionException, InterruptedException {
long star = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0L, SUM);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long along = submit.get();
System.out.println(along);
long end = System.currentTimeMillis();
System.out.println("时间:" + (end - star));
System.out.println("-----------");
}
/**
* 使用 Stream 流计算
*/
public static void test3() {
long star = System.currentTimeMillis();
long sum = LongStream.range(0L, 20_0000_0000L).parallel().reduce(0, Long::sum);
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println("时间:" + (end - star));
System.out.println("-----------");
}
}
.parallel().reduce(0, Long::sum)使用一个并行流去计算整个计算,提高效率。
十五.异步回调
Future 设计的初衷:对将来的某个事件结果进行建模!
其实就是前端 --> 发送ajax异步请求给后端
但是我们平时都使用CompletableFuture
1.没有返回值的runAsync异步回调
public static void main(String[] args) throws ExecutionException, InterruptedException
{
// 发起 一个 请求
System.out.println(System.currentTimeMillis());
System.out.println("---------------------");
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
//发起一个异步任务
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+".....");
});
System.out.println(System.currentTimeMillis());
System.out.println("------------------------------");
//输出执行结果
System.out.println(future.get()); //获取执行结果
}
2.有返回值的异步回调supplyAsync
//有返回值的异步回调
CompletableFuture<Integer> completableFuture=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
int i=1/0;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1024;
});
System.out.println(completableFuture.whenComplete((t, u) -> {
//success 回调
System.out.println("t=>" + t); //正常的返回结果
System.out.println("u=>" + u); //抛出异常的 错误信息
}).exceptionally((e) -> {
//error回调
System.out.println(e.getMessage());
return 404;
}).get());
whenComplete: 有两个参数,一个是t 一个是u
T:是代表的 正常返回的结果;
U:是代表的 抛出异常的错误信息;
如果发生了异常,get可以获取到exceptionally返回的值;
十六.JMM
1.对Volatile 的理解
Volatile 是 Java 虚拟机提供 轻量级的同步机制
1、保证可见性
2、不保证原子性
3、禁止指令重排
如何实现可见性
volatile变量修饰的共享变量在进行写操作的时候回多出一行汇编:
0x01a3de1d:movb $0×0,0×1104800(%esi);0x01a3de24**:lock** addl $0×0,(%esp);
Lock前缀的指令在多核处理器下会引发两件事情。
1)将当前处理器缓存行的数据写回到系统内存。
2)这个写回内存的操作会使其他cpu里缓存了该内存地址的数据无效。
多处理器总线嗅探:
为了提高处理速度,处理器不直接和内存进行通信,而是先将系统内存的数据读到内部缓存后再进行操作,但操作不知道何时会写到内存。如果对声明了volatile的变量进行写操作,JVM就会向处理器发送一条lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存。但是在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己的缓存值是不是过期了,如果处理器发现自己缓存行对应的内存地址呗修改,就会将当前处理器的缓存行设置无效状态,当处理器对这个数据进行修改操作的时候,会重新从系统内存中把数据库读到处理器缓存中。
2.什么是JMM?
JMM:JAVA内存模型,不存在的东西,是一个概念,也是一个约定!
关于JMM的一些同步的约定:
1、线程解锁前,必须把共享变量立刻刷回主存;
2、线程加锁前,必须读取主存中的最新值到工作内存中;
3、加锁和解锁是同一把锁;
线程中分为 工作内存、主内存
8种操作:
Read(读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用;
load(载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中;
Use(使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令;
assign(赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中;
store(存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用;
write(写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中;
lock(锁定):作用于主内存的变量,把一个变量标识为线程独占状态;
unlock(解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定;
JMM对这8种操作给了相应的规定:
不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
不允许一个线程将没有assign的数据从工作内存同步回主内存
一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是对变量实施use、store操作之前,必须经过assign和load操作
一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
对一个变量进行unlock操作之前,必须把此变量同步回主内存
遇到问题:程序不知道主存中的值已经被修改过了!;
十七.volatile
1.保证可见性
public class JMMDemo01 {
// 如果不加volatile 程序会死循环
// 加了volatile是可以保证可见性的
private volatile static Integer number = 0;
public static void main(String[] args) {
//main线程
//子线程1
new Thread(()->{
while (number==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//子线程2
new Thread(()->{
while (number==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
number=1;
System.out.println(number);
}
}
2.不保证原子性
原子性:不可分割;
线程A在执行任务的时候,不能被打扰的,也不能被分割的,要么同时成功,要么同时失败。
/**
* 不保证原子性
* number <=2w
*
*/
public class VDemo02 {
private static volatile int number = 0;
public static void add(){
number++;
//++ 不是一个原子性操作,是两个~3个操作
//
}
public static void main(String[] args) {
//理论上number === 20000
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 1; j <= 1000 ; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){
//main gc
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+",num="+number);
}
}
如果不加lock和synchronized ,怎么样保证原子性?
使用原子类
public class VDemo02 {
private static volatile AtomicInteger number = new AtomicInteger();
public static void add(){
// number++;
number.incrementAndGet(); //底层是CAS保证的原子性
}
public static void main(String[] args) {
//理论上number === 20000
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 1; j <= 1000 ; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){
//main gc
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+",num="+number);
}
}
这些类的底层都直接和操作系统挂钩!是在内存中修改值。
Unsafe类是一个很特殊的存在;
原子类为什么这么高级?
3.禁止指令重排
什么是指令重排?
我们写的程序,计算机并不是按照我们自己写的那样去执行的
源代码–>编译器优化重排–>指令并行也可能会重排–>内存系统也会重排–>执行
处理器在进行指令重排的时候,会考虑数据之间的依赖性!
int x=1; //1
int y=2; //2
x=x+5; //3
y=x*x; //4
//我们期望的执行顺序是 1_2_3_4 可能执行的顺序会变成2134 1324
//可不可能是 4123? 不可能的
1234567
可能造成的影响结果:前提:a b x y这四个值 默认都是0
线程A 线程B
x=a y=b
b=1 a=2
正常的结果: x = 0; y =0;
线程A 线程B
b=1 a=2
x=a y=b
可能在线程A中会出现,先执行b=1,然后再执行x=a;
在B线程中可能会出现,先执行a=2,然后执行y=b;
那么就有可能结果如下:x=2; y=1.
volatile可以避免指令重排:
volatile中会加一道内存的屏障,这个内存屏障可以保证在这个屏障中的指令顺序。
内存屏障:CPU指令。作用:
1、保证特定的操作的执行顺序;
2、可以保证某些变量的内存可见性(利用这些特性,就可以保证volatile实现的可见性)
4.总结
volatile可以保证可见性;
不能保证原子性
由于内存屏障,可以保证避免指令重排的现象产生
面试官:那么你知道在哪里用这个内存屏障用得最多呢?单例模式
十八.玩转单例模式
1.饿汉式
/**
* 饿汉式单例
*/
public class Hungry {
/**
* 可能会浪费空间
*/
private byte[] data1=new byte[1024*1024];
private byte[] data2=new byte[1024*1024];
private byte[] data3=new byte[1024*1024];
private byte[] data4=new byte[1024*1024];
private Hungry(){
}
private final static Hungry hungry = new Hungry();
public static Hungry getInstance(){
return hungry;
}
}
2.DCL懒汉式
//懒汉式单例模式
public class LazyMan {
private static boolean key = false;
private LazyMan(){
synchronized (LazyMan.class){
if (key==false){
key=true;
}
else{
throw new RuntimeException("不要试图使用反射破坏异常");
}
}
System.out.println(Thread.currentThread().getName()+" ok");
}
private volatile static LazyMan lazyMan;
//双重检测锁模式 简称DCL懒汉式
public static LazyMan getInstance(){
//需要加锁
if(lazyMan==null){
synchronized (LazyMan.class){
if(lazyMan==null){
lazyMan=new LazyMan();
/**
* 1、分配内存空间
* 2、执行构造方法,初始化对象
* 3、把这个对象指向这个空间
*
* 就有可能出现指令重排问题
* 比如执行的顺序是1 3 2 等
* 我们就可以添加volatile保证指令重排问题
*/
}
}
}
return lazyMan;
}
//单线程下 是ok的
//但是如果是并发的
public static void main(String[] args) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException, NoSuchFieldException {
//Java中有反射
// LazyMan instance = LazyMan.getInstance();
Field key = LazyMan.class.getDeclaredField("key");
key.setAccessible(true);
Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);
declaredConstructor.setAccessible(true); //无视了私有的构造器
LazyMan lazyMan1 = declaredConstructor.newInstance();
key.set(lazyMan1,false);
LazyMan instance = declaredConstructor.newInstance();
System.out.println(instance);
System.out.println(lazyMan1);
System.out.println(instance == lazyMan1);
}
}
3.静态内部类
//静态内部类
public class Holder {
private Holder(){
}
public static Holder getInstance(){
return InnerClass.holder;
}
public static class InnerClass{
private static final Holder holder = new Holder();
}
}
单例不安全, 因为反射
4.枚举
//enum 是什么? enum本身就是一个Class 类
public enum EnumSingle {
INSTANCE;
public EnumSingle getInstance(){
return INSTANCE;
}
}
class Test{
public static void main(String[] args) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
EnumSingle instance1 = EnumSingle.INSTANCE;
Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class,int.class);
declaredConstructor.setAccessible(true);
//java.lang.NoSuchMethodException: com.ogj.single.EnumSingle.<init>()
EnumSingle instance2 = declaredConstructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
}
使用枚举,我们就可以防止反射破坏了。
枚举类型的最终反编译源码:
public final class EnumSingle extends Enum
{
public static EnumSingle[] values()
{
return (EnumSingle[])$VALUES.clone();
}
public static EnumSingle valueOf(String name)
{
return (EnumSingle)Enum.valueOf(com/ogj/single/EnumSingle, name);
}
private EnumSingle(String s, int i)
{
super(s, i);
}
public EnumSingle getInstance()
{
return INSTANCE;
}
public static final EnumSingle INSTANCE;
private static final EnumSingle $VALUES[];
static
{
INSTANCE = new EnumSingle("INSTANCE", 0);
$VALUES = (new EnumSingle[] {
INSTANCE
});
}
}
十九.深入理解CAS
1.什么是CAS?
public class casDemo {
//CAS : compareAndSet 比较并交换
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
//boolean compareAndSet(int expect, int update)
//期望值、更新值
//如果实际值 和 我的期望值相同,那么就更新
//如果实际值 和 我的期望值不同,那么就不更新
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
//因为期望值是2020 实际值却变成了2021 所以会修改失败
//CAS 是CPU的并发原语
atomicInteger.getAndIncrement(); //++操作
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
}
}
Unsafe 类
2.总结
CAS:比较当前工作内存中的值 和 主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就一直循环,使用的是自旋锁。
缺点:
- 循环会耗时;
- 一次性只能保证一个共享变量的原子性;
- 它会存在ABA问题
3.ABA问题?(狸猫换太子)
线程1:期望值是1,要变成2;
线程2:两个操作:
- 1、期望值是1,变成3
- 2、期望是3,变成1
所以对于线程1来说,A的值还是1,所以就出现了问题,骗过了线程1;
public class casDemo {
//CAS : compareAndSet 比较并交换
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
//boolean compareAndSet(int expect, int update)
//期望值、更新值
//如果实际值 和 我的期望值相同,那么就更新
//如果实际值 和 我的期望值不同,那么就不更新
System.out.println(atomicInteger.compareAndSet(2021, 2020));
System.out.println(atomicInteger.get());
//因为期望值是2020 实际值却变成了2021 所以会修改失败
//CAS 是CPU的并发原语
// atomicInteger.getAndIncrement(); //++操作
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
}
}
二十.原子引用
解决ABA问题,对应的思想:就是使用了乐观锁~
带版本号的 原子操作!
Integer 使用了对象缓存机制,默认范围是-128~127,推荐使用静态工厂方法valueOf获取对象实例,而不是new,因为valueOf使用缓存,而new一定会创建新的对象分配新的内存空间。
带版本号的原子操作
package com.marchsoft.lockdemo;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* Description:
*
* @author jiaoqianjin
* Date: 2020/8/12 22:07
**/
public class CASDemo {
/**AtomicStampedReference 注意,如果泛型是一个包装类,注意对象的引用问题
* 正常在业务操作,这里面比较的都是一个个对象
*/
static AtomicStampedReference<Integer> atomicStampedReference = new
AtomicStampedReference<>(1, 1);
// CAS compareAndSet : 比较并交换!
public static void main(String[] args) {
new Thread(() -> {
int stamp = atomicStampedReference.getStamp(); // 获得版本号
System.out.println("a1=>" + stamp);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 修改操作时,版本号更新 + 1
atomicStampedReference.compareAndSet(1, 2,
atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1);
System.out.println("a2=>" + atomicStampedReference.getStamp());
// 重新把值改回去, 版本号更新 + 1
System.out.println(atomicStampedReference.compareAndSet(2, 1,
atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1));
System.out.println("a3=>" + atomicStampedReference.getStamp());
}, "a").start();
// 乐观锁的原理相同!
new Thread(() -> {
int stamp = atomicStampedReference.getStamp(); // 获得版本号
System.out.println("b1=>" + stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicStampedReference.compareAndSet(1, 3,
stamp, stamp + 1));
System.out.println("b2=>" + atomicStampedReference.getStamp());
}, "b").start();
}
}
二十一.各种锁的理解
1.公平锁,非公平锁
公平锁:非常公平,不能插队,必须先来后到
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
非公平锁:非常不公平,允许插队,可以改变顺序(默认)
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
2.可重入锁
Synchonized 锁
public class Demo01 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sms();
},"A").start();
new Thread(()->{
phone.sms();
},"B").start();
}
}
class Phone{
public synchronized void sms(){
System.out.println(Thread.currentThread().getName()+"=> sms");
call();//这里也有一把锁
}
public synchronized void call(){
System.out.println(Thread.currentThread().getName()+"=> call");
}
}
Lock 锁
//lock
public class Demo02 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
new Thread(()->{
phone.sms();
},"A").start();
new Thread(()->{
phone.sms();
},"B").start();
}
}
class Phone2{
Lock lock=new ReentrantLock();
public void sms(){
lock.lock(); //细节:这个是两把锁,两个钥匙
//lock锁必须配对,否则就会死锁在里面
try {
System.out.println(Thread.currentThread().getName()+"=> sms");
call();//这里也有一把锁
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void call(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "=> call");
}catch (Exception e){
e.printStackTrace();
}
finally {
lock.unlock();
}
}
}
- lock锁必须配对,相当于lock和 unlock 必须数量相同;
- 在外面加的锁,也可以在里面解锁;在里面加的锁,在外面也可以解锁;
3.自旋锁
spinlock
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
自我设计自旋锁
public class SpinlockDemo {
// 默认
// int 0
//thread null
AtomicReference<Thread> atomicReference=new AtomicReference<>();
//加锁
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(thread.getName()+"===> mylock");
//自旋锁
while (!atomicReference.compareAndSet(null,thread)){
System.out.println(Thread.currentThread().getName()+" ==> 自旋中~");
}
}
//解锁
public void myUnlock(){
Thread thread=Thread.currentThread();
System.out.println(thread.getName()+"===> myUnlock");
atomicReference.compareAndSet(thread,null);
}
}
public class TestSpinLock {
public static void main(String[] args) throws InterruptedException {
ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
reentrantLock.unlock();
//使用CAS实现自旋锁
SpinlockDemo spinlockDemo=new SpinlockDemo();
new Thread(()->{
spinlockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
spinlockDemo.myUnlock();
}
},"t1").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
spinlockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
spinlockDemo.myUnlock();
}
},"t2").start();
}
}
运行结果:
t2进程必须等待t1进程Unlock后,才能Unlock,在这之前进行自旋等待。。。。
4.死锁
package com.ogj.lock;
import java.util.concurrent.TimeUnit;
public class DeadLock {
public static void main(String[] args) {
String lockA= "lockA";
String lockB= "lockB";
new Thread(new MyThread(lockA,lockB),"t1").start();
new Thread(new MyThread(lockB,lockA),"t2").start();
}
}
class MyThread implements Runnable{
private String lockA;
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA){
System.out.println(Thread.currentThread().getName()+" lock"+lockA+"===>get"+lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB){
System.out.println(Thread.currentThread().getName()+" lock"+lockB+"===>get"+lockA);
}
}
}
}
如何解开死锁
1、使用jps定位进程号,jdk的bin目录下: 有一个jps
命令:jps -l
2、使用jstack
进程进程号 找到死锁信息
一般情况信息在最后: