JUC并发编程详解
JUC
线程和进程
进程:一个程序,QQ.exe Music.exe程序的集合
一个进程可包含多个线程,至少包含一个
Java默认有两个线程,main,GC
线程:开了一个进程Typora,写字,自动保存(线程负责)
对于java而言:Thread,Runable ,Callable
Java可以开启一个线程吗?不能
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
//本地方法,调用底层c++,Java无法直接操作硬件
private native void start0();
并发,并行
并发(多个线程操作同一个资源)
并行(多个人一同行走)
-
CPU多核,多个线程可同时执行
public class Test1 {
public static void main(String[] args) {
//获取cpu核数
//cpu密集型,IO密集型
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
并发编程的本质:充分利用CPU资源
线程状态
//线程新生
NEW,
//运行
RUNNABLE,
//阻塞
BLOCKED,
//等待 ,死等
WAITING,
//超时等待,过时不候
TIMED_WAITING,
//终止
TERMINATED;
}
wait(),sleep()区别
-
来自不同的类 wait=>object
sleep=>Thread
-
关于锁的释放 wait会释放锁,sleep不会
-
使用范围不同 wait只能在同步代码块中 sleep任何地方都可以
-
是否需要捕获异常 wait不需要捕获异常 sleep需要捕获异常
Lock锁
sysnchronized
公平锁:十分公平,先来后到
非公平锁:十分不公平,可以插队(默认)
package com.dahan.demo1;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*
* 线程是一个单独的资源类,没有任何附属操作
* 1. 属性,方法
* */
public class Test1 {
public static void main(String[] args) {
//并发:多线程操作同一个资源
final Ticket ticket = new Ticket();
new Thread(() ->{
for (int i = 0; i <60 ; i++) {
ticket.sale();
}
},"a").start();
new Thread(() ->{
for (int i = 0; i <60 ; i++) {
ticket.sale();
}
},"b").start();
new Thread(() ->{
for (int i = 0; i <60 ; i++) {
ticket.sale();
}
},"c").start();
}
}
class Ticket{
//属性,方法
private int number = 50;
Lock lock = new ReentrantLock();
//买票的方式
public void sale(){
lock.lock();
try {
}catch (Exception e){
if (number>0){
System.out.println(Thread.currentThread().getName()+"卖出了第"+(number--)+"票,剩余"+number);
}
}finally {
lock.unlock();
}
}
}
synchronized和lock的区别
-
synchronized内置的java关键字,lock是一个Java类
-
synchronized无法判断获取的状态,lock可以判断是否获取到了锁
-
synchronized可自动释放锁,lock必须手动释放锁,如果不释放会造成死锁
-
synchronized线程1(获得锁,阻塞),线程2(等待);lock锁不一定会等下去
-
synchronized可重入(可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁)锁,不可终端的,非公平,lock可重入,可判断锁,非公平(可自己设置)
-
synchronized适合锁少量的代码同步问题,lock锁适合锁大量的同步代码
锁是什么,如何判断锁的是谁
生产者和消费者问题
package com.dahan.pc;
import java.util.Date;
/*
*
* */
public class Test2 {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
try {
data.increment();
}catch (Exception e){
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
try {
data.decrement();
}catch (Exception e){
e.printStackTrace();
}
}
},"B").start();
}
}
//判断等待,业务,通知
class Data{//数字,资源类
private int number=0;
//+1
public synchronized void increment() throws InterruptedException {
if (number!=0){
this.wait();
}
number++;
// 通知其他线程,我+1完毕
this.notify();
}
//-1
public synchronized void decrement() throws InterruptedException {
if (number==0){
//等待
this.wait();
}
number--;
// 通知其他线程,我-1完毕
this.notify();
}
}
问题存在 A,B,C,D4个线程!虚假唤醒
if只会判断一次,应使用while
线程可以唤醒,而不会被通知,中断或超时,即所谓的虚假唤醒。等待应该总是出现在循环中 如:
synchronized(obj){
while(<condition does not hold>)
obj.wait(timeout);
}
if 改为while判断
package com.dahan.pc;
import java.util.Date;
/*
*
* */
public class Test2 {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
try {
data.increment();
}catch (Exception e){
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
try {
data.decrement();
}catch (Exception e){
e.printStackTrace();
}
}
},"B").start();
}
}
//判断等待,业务,通知
class Data{//数字,资源类
private int number=0;
//+1
public synchronized void increment() throws InterruptedException {
while (number!=0){
this.wait();
}
number++;
// 通知其他线程,我+1完毕
this.notify();
}
//-1
public synchronized void decrement() throws InterruptedException {
while (number==0){
//等待
this.wait();
}
number--;
// 通知其他线程,我-1完毕
this.notify();
}
}
lock版生产者与消费者问题
package com.dahan.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestLock {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
}catch (Exception e){
e.printStackTrace();
}
}
},"a").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
}catch (Exception e){
e.printStackTrace();
}
}
},"b").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
}catch (Exception e){
e.printStackTrace();
}
}
},"c").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
}catch (Exception e){
e.printStackTrace();
}
}
},"d").start();
}
}
//判断等待,业务,通知
class Data2{//数字,资源类
private int number=0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
//+1
public void increment() throws InterruptedException {
// condition.await();//等待
// condition.signalAll();//唤醒全部
lock.lock();
try{
while (number!=0){
//等待
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName()+"=>"+number);
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
//-1
public void decrement() throws InterruptedException {
lock.lock();
try {
while (number==0){
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName()+"=>"+number);
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
condition实现精准唤醒
package com.dahan.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//a执行完调用b,b执行完调用c,c执行完调用a
public class TestContidion {
public static void main(String[] args) {
Data3 data = new Data3();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.print1();
}catch (Exception e){
e.printStackTrace();
}
}
},"a").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.print2();
}catch (Exception e){
e.printStackTrace();
}
}
},"b").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.print3();
}catch (Exception e){
e.printStackTrace();
}
}
},"c").start();
}
}
//判断等待,业务,通知
class Data3{//数字,资源类
private int number=1;//1执行a,2b,3c
private Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
//+1
public void print1() throws InterruptedException {
lock.lock();
try{
while (number!=1){
//等待
condition1.await();
}
System.out.println(Thread.currentThread().getName()+"=>"+number);
//唤醒指定的人
number=2;
condition2.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
//-1
public void print2() throws InterruptedException {
lock.lock();
try {
while (number!=2){
condition2.await();
}
System.out.println(Thread.currentThread().getName()+"=>"+number);
number=3;
condition3.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void print3() throws InterruptedException {
lock.lock();
try {
while (number!=3){
condition3.await();
}
System.out.println(Thread.currentThread().getName()+"=>"+number);
number=1;
condition1.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
集合不安全
list不安全
package com.dahan.pc;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
//java.util.ConcurrentModificationException 并发修改异常
public class unsafe {
public static void main(String[] args) {
//并发下ArrayList不安全
/*
解决方案
1.List<String> list = new Vector<>();
2. List<String> list = Collections.synchronizedList(new ArrayList<>());
*/
//CopyOnWrite 写入时复制
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
set不安全
package com.dahan.unsagfe;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
/*
java.util.ConcurrentModificationException
1.Set<String> set = Collections.synchronizedSet(new HashSet<>());
2.Set<String> set = new CopyOnWriteArraySet<>();
*/
public class TestSet {
public static void main(String[] args) {
// Set<String> set = new HashSet<>();
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 0; i < 20; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
hashset的底层是什么
public HashSet() {
map = new HashMap<>();
}
//add set 本质就是map key是无法重复的
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
private static final Object PRESENT = new Object();//不变的值
HashMap
package com.dahan.unsagfe;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class TestMap {
public static void main(String[] args) {
// Map<String, String> map = new HashMap<>();
Map<String,String> map = new ConcurrentHashMap<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
Callable
Callable
Callable接口类似Runnable,因为他们都是为其实例可能由另一个线程执行的类设计的,然而Runnable不返回结果,也不抛出异常
-
有返回值
-
可以抛出异常
-
方法不同,run()/call()
代码测试
package com.dahan.CallAble;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// new Thread(new Mythread()).start();
new Thread().start();
Mythread mythread = new Mythread();
FutureTask futureTask = new FutureTask(mythread);//适配类
new Thread(futureTask,"a").start();
new Thread(futureTask,"b").start();//执行两个线程时,结果会被缓存,效率高
Integer o = (Integer)futureTask.get();//获取callable的返回结果,get()方法可能会产生阻塞,应把它放在最后
System.out.println(o);
}
}
class Mythread implements Callable<Integer> {
@Override
public Integer call(){
System.out.println("call()");
return 1024;
}
}
常用辅助类
8.1CountDownlatch
package com.dahan.add;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//倒计时
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"Go");
countDownLatch.countDown();//数量-1
},String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println("关门");
}
}
countDownLatch.countDown()//数量-1
countDownLatch.await()//等待计数器归零,然后再向下执行
8.2 CycliBarrier
加法计数器
package com.dahan.add;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CycliBarrierDemo {
public static void main(String[] args) {
//召唤龙珠的线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->System.out.println("召唤神龙成功"));
for (int i = 0; i < 7; i++) {
final int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集"+temp+"个龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} finally {
}
}).start();
}
}
}
8.3 Semaphore
package com.dahan.add;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
//线程数量
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
//acquire 获得
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"获得车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// release 释放
semaphore.release();
}
}).start();
}
}
}
原理
semaphore.acquire();获得,假设已经满了,等待,等待被释放
semaphore.release();释放,会将当前的信号量释放+1 然后唤醒等待的线程
作用:多个共享资源互斥的使用,并发限流,控制最大的线程数
读写锁
ReadWriteLock
读可以被多个线程同时读,写的时候只能有一个线程去写
package com.dahan.rw;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDeom {
public static void main(String[] args) {
MyCacheLock myCache = new MyCacheLock();
for (int i = 0; i < 5; i++) {
final int temp = i;
new Thread(()->{
myCache.put(temp+"",temp+"");
},String.valueOf(i)).start();
}
for (int i = 0; i < 5; i++) {
final int temp = i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
/*
加锁的
*/
class MyCacheLock{
private volatile Map<String,Object> map = new HashMap<>();
//读写锁 更加细粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//存,写的时候,只希望有一个线程写
public void put(String key,Object value){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key, value);
System.out.println(Thread.currentThread().getName()+"写入ok");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
//读,取,所有人都可以读
public void get(String key){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取ok");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
/*
自定义缓存
*/
//class MyCache{
// private volatile Map<String,Object> map = new HashMap<>();
//
// //存,写
// public void put(String key,Object value){
// System.out.println(Thread.currentThread().getName()+"写入"+key);
// map.put(key, value);
// System.out.println(Thread.currentThread().getName()+"写入ok");
// }
// //读,取
// public void get(String key){
// System.out.println(Thread.currentThread().getName()+"读取"+key);
// Object o = map.get(key);
// System.out.println(Thread.currentThread().getName()+"读取ok");
// }
//}
阻塞队列
四组API
方式 | 抛出异常 | 不会抛出异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add(底层调用的还是offer) | offer | put | offer(,,) |
移除 | remove | poll | take | poll(,) |
检测队首元素 | element | peek |
-
抛出异常
-
不会抛出异常
-
阻塞等待
-
超时等待
同步队列
SynchronousQueue同步队列
没有容量,进去一个元素,必须等待取出来之后才能再往里面放一个元素
put take
package com.dahan.dq;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"put1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+"put1");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+"put1");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"a").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
线程池
线程池的好处:
-
降低资源的消耗
-
提高响应速度
-
方便管理 线程复用,可以控制最大并发数,管理线程
线程池:三大方法
package com.dahan.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// Executors 工具类 3大方法
public class Tset01 {
public static void main(String[] args) {
Executors.newSingleThreadExecutor();//单个线程
Executors.newFixedThreadPool(5);//创建一个固定的线程池的大小
ExecutorService service =Executors.newCachedThreadPool();//可伸缩的,遇强则强
try {
for (int i = 0; i <10 ; i++) {
//使用线程池来创建线程
service.execute(()->{
System.out.println(Thread.currentThread().getName()+"ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完,程序结束,关闭线程池
service.shutdown();
}
}
}
七大参数
方法源码分析
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//本质 ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
int maximumPoolSize,//最大核心线程池大小
long keepAliveTime,//超时了没有人调用就会释放
TimeUnit unit,//超时单位
BlockingQueue<Runnable> workQueue) {//阻塞队列
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(),//线程工厂,创建线程的,一般不用
defaultHandler);//拒绝策略
}
手动创建一个线程池,四种拒绝策略
package com.dahan.pool;
import java.util.concurrent.*;
public class Tset01 {
public static void main(String[] args) {
//自定义线程工作
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
3, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
// new ThreadPoolExecutor.AbortPolicy()//银行满了 还有人进,不处理这个人
// new ThreadPoolExecutor.CallerRunsPolicy()//哪来的去哪里
// new ThreadPoolExecutor.DiscardPolicy()//队列满了 不会抛出异常
new ThreadPoolExecutor.DiscardOldestPolicy()//队列满了 尝试和最早的竞争,也不会抛出异常
);
try {
//超出最大 抛出 RejectedExecutionException
for (int i = 0; i <10 ; i++) {
//使用线程池来创建线程
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+"ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完,程序结束,关闭线程池
threadPoolExecutor.shutdown();
}
}
}
小结 和拓展
//最大线程如何定义 //1. CPU 密集型 几核就是几,可以保持CPU的效率最高 //2。 IO 密集型 > 判断程序中十分耗io的线程 // 程序 15个大型任务,io十分占用资源
四大函数式接口
Function接口
package com.dahan.function;
import java.util.function.Function;
//函数型接口
//只要是函数式接口 可以用lambda表达式简化
public class Demo01 {
public static void main(String[] args) {
// Function function = new Function<String,String>(){
// @Override
// public String apply(String str){
// return str;
// }
// };
Function function = (str)->{return str};
System.out.println(function.apply("asd"));
}
//工具类 输出输入的值
}
Predicate 接口
package com.dahan.function;
import java.util.function.Predicate;
//断定型接口 有一个输入参数 返回值只能是布尔值
public class Demo02 {
public static void main(String[] args) {
// Predicate predicate = new Predicate<String>() {
// @Override
// public boolean test(String o) {
// if (o.isEmpty());
// return false;
// }
// };
Predicate<String> predicate = (o)->{return o.isEmpty();};
System.out.println(predicate.test(""));
}
}
Consumer消费性接口
package com.dahan.function;
import java.util.function.Consumer;
//消费性接口 只有输入没有返回值
public class Demo03 {
public static void main(String[] args) {
// Consumer consumer = new Consumer<String>() {
// @Override
// public void accept(String o) {
// System.out.println(o);
// }
// };
Consumer<String> consumer = (o)->{
System.out.println(o);
};
consumer.accept("dssd");
}
}
Supplier 供给型接口
package com.dahan.function;
import java.util.function.Supplier;
//Supplier 供给型接口,没有参数,只有返回值
public class Demo04 {
public static void main(String[] args) {
// Supplier<String> objectSupplier = new Supplier<String>(){
//
// @Override
// public String get() {
// System.out.println("get()");
// return "1024";
// }
// };
Supplier<String> objectSupplier = ()->{return "1024";};
System.out.println(objectSupplier.get());
}
}
Stream流式计算
什么是Stream流式计算
package com.dahan.Stream;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;
/*
1. id必须是偶数
2. 年龄必须大于23
3. 用户名转化为大写
4. 用户名字母倒着排序
5. 只输出一个用户
*/
public class Test {
public static void main(String[] args) {
User user1 = new User(1,"a",21);
User user2 = new User(2,"b",22);
User user3 = new User(3,"c",23);
User user4 = new User(4,"d",24);
User user5 = new User(5,"e",25);
//集合就是管理存储
List<User> list = Arrays.asList(user1,user2,user3,user4,user5);
//计算交给Stream流
list.stream()
.filter(u->{return u.getId()%2==0;})
.filter(u->{return u.getAge()>23;})
.map(u->{return u.getName().toUpperCase();})
.sorted((u1,u2)->{return u2.compareTo(u1);})
.limit(1)
.forEach(System.out::println);
}
}
Forkjoin
什么是Forkjoin
并行执行任务,提高效率,大数据量
把一个大任务拆成多个小任务并行执行
ForkJoinPool
线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTask
或RecursiveAction
。
使用Fork/Join模式可以进行并行计算以提高效率。
Forkjion特点:工作窃取
public class Main {
public static void main(String[] args) throws Exception {
// 创建2000个随机数组成的数组:
long[] array = new long[2000];
long expectedSum = 0;
for (int i = 0; i < array.length; i++) {
array[i] = random();
expectedSum += array[i];
}
System.out.println("Expected sum: " + expectedSum);
// fork/join:
ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
long startTime = System.currentTimeMillis();
Long result = ForkJoinPool.commonPool().invoke(task);
long endTime = System.currentTimeMillis();
System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}
static Random random = new Random(0);
static long random() {
return random.nextInt(10000);
}
}
class SumTask extends RecursiveTask<Long> {
static final int THRESHOLD = 500;
long[] array;
int start;
int end;
SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 如果任务足够小,直接计算:
long sum = 0;
for (int i = start; i < end; i++) {
sum += this.array[i];
// 故意放慢计算速度:
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
return sum;
}
// 任务太大,一分为二:
int middle = (end + start) / 2;
System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
SumTask subtask1 = new SumTask(this.array, start, middle);
SumTask subtask2 = new SumTask(this.array, middle, end);
invokeAll(subtask1, subtask2);
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
Long result = subresult1 + subresult2;
System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
return result;
}
}
异步回调
Future 设计的初衷:对将来的某个事件的结果进行建模
package com.dahan.future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class Test1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//没有返回值的
// CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println(Thread.currentThread().getName()+"runAsync=>Void");
// });
// System.out.println("111");
// completableFuture.get();//获取阻塞执行结果
//有返回值的supplyAsync异步回调
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
return 1024;
});
completableFuture.whenComplete((t,u)->{
System.out.println("t:"+t);//正常的返回结果
System.out.println("u:"+u);//错误信息
}).exceptionally((e)->{
e.printStackTrace();
return 233;//可以获取到错误的返回结果
});
}
}
JMM
对volatile的理解
volatile是java虚拟机提供轻量级的同步机制
-
保证可见性
-
不保证原子性
-
禁止指令重排
什么是JMM
JMM: Java内存模型,不存在的东西,是一个概念
关于JMM的一些同步的约定:
-
线程解锁前,必须把共享变量立刻刷回主存
-
线程加锁前,必须读取主存中的最新值到工作内存中
-
加锁和解锁是同一把锁
线程,工作内存,主内存
八种操作
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可再分的
-
-
lock(锁定):作用于主内存的变量,把一个变量标识为线程独占状态
-
unlock(解锁):作用于主内存的变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
-
read(读取):作用于主内存变量,他把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
-
load(载入):作用于工作内存的变量,他把read操作 从主内存中变量放入工作内存中
-
use(使用):作用于工作内存的变量,他把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
-
assign(赋值):作用于工作内存的变量,把一个从执行引擎中接受到的值放入工作内存的变量副本中
-
store(存储):作用于主内存中的变量,他把一个冲工作内存中一个变量的值传送到主内存中,以便后续的write使用
-
write(写入):作用于主内存中的变量,他把store操作从工作内存中得到的变量的放入主内存的变量中
-
-
JMM对这八种指令的使用,制定了如下规则
Volatile
保证可见性
package com.dahan.future;
import java.util.concurrent.TimeUnit;
public class JMMDemo {
//加volatile可以保证可见性
private static volatile int number = 0;
public static void main(String[] args) {
new Thread(()->{//线程1
while (number==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
number=1;
System.out.println(number);
}
}
不保证原子性
原子性:不可分割
线程A在执行任务时不能被打扰,也不能被分割,要么同时成功要么同时失败
package com.dahan.future;
//不保证原子性
public class VDemo1 {
private static volatile int number=0;
public static void add(){
number++;//不是一个原子性操作
}
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){//Java中默认main gc两个线程一直在运行 则线程数大于2时
Thread.yield();
}
System.out.println(number);
}
}
如果不加lock和synchronized 怎么保证原子性
使用原子类 解决原子性问题
package com.dahan.future;
import java.util.concurrent.atomic.AtomicInteger;
public class VDemo1 {
private static volatile AtomicInteger number=new AtomicInteger();
public static void add(){
number.getAndIncrement();//+1方法 底层用的CAS
}
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){//Java中默认main gc两个线程一直在运行 则线程数大于2时
Thread.yield();
}
System.out.println(number);
}
}
这些类的底层都直接和操作系统挂钩,在内存中修改值,Unsafe类是一个特殊存在
指令重排
什么是指令重排:你写的程序,计算机并不是按照你写的那样去执行的
处理器在进行指令重排的时候,考虑:数据之间的依赖性
线程A | 线程B |
---|---|
x=a | y=b |
b=1 | a=2 |
正常的结果:x=0 b=0,但是可能由于指令重排
线程A | 线程B |
---|---|
b=1 | a=2 |
x=a | y=b |
指令重排导致的诡异结果;x=2;y=1;
volatile可以避免指令重排
内存屏障,CPU指令,作用:
-
保持特定的操作的执行顺序
-
可以保持某些变量的内存可见性(利用这些特性volatile实现了可见性)
volatile可以保证可见性,不能保证原子性。由于内存屏障,可以保证避免指令重排的现象产生
单例模式
单例模式属于创建型模式,它提供了一种创建对象的最佳方式。
这种模式涉及到一个单一的类,该类负责创建自己的对象,同时确保只有单个对象被创建,这个类提供了一种访问其唯一的对象的方式,可以直接访问,不需要实例化该类的对象
注意:
-
单例类只能有一个实例
-
单例类必须自己创建自己的唯一实例
-
单例类必须给其他对象提供这一实例
优点:
-
在内存里只有一个实例,减少了内存的开销,尤其是频繁的创建和销毁实例
-
避免对资源的多重占用
缺点:没有接口,不能继承,与单一职责原则冲突,一个类应该只关心内部逻辑,而不关心外面怎么样来实例化
getInstance()方法:调用getInstance方法来得到对象,而getInstance保证了每次调用都返回相同的对象。
懒汉式
是否Lazy初始化:是
是否多线程安全:是
实现难度:易
描述:这种方式具备很好的lazy loading,能够在多线程中很好的工作,但是效率低
有点:第一次调用才初始化,避免内存浪费
缺点:必须加锁synchronized才能保证单例,但加锁会影响效率
getInstance()的性能对应用程序不是很关键(该方法使用不太频繁)
单例的实现主要是通过以下两个步骤:
-
将该类的构造方法定义为私有方法,这样其他处的代码就无法通过调用该类的构造方法来实例化该类的对象,只有通过该类提供的静态方法来得到该类的唯一实例;
-
在该类内提供一个静态方法,当我们调用这个方法时,如果类持有的引用不为空就返回这个引用,如果类保持的引用为空就创建该类的实例并将实例的引用赋予该类保持的引用。
public class Singleton{
// 指向自己实例的私有静态引用
private static Singleton instance;
// 私有的构造方法
private Singletion(){}
// 以自己实例为返回值的静态的公有方法,静态工厂方法
public static synchronized Singletion getInstance(){
// 被动创建,在真正需要使用时才去创建
if(instance==null){
instance = new Singletion();
}
return instance;
}
}
饿汉式
是否Lazy初始化:否
是否多线程安全:是
实现难度:易
描述:这种方式比较常用,但容易产生垃圾对象
优点:没有加锁,执行效率会提高。
缺点:类加载时就初始化,浪费内存
它基于 classloader 机制避免了多线程的同步问题,不过,instance 在类装载时就实例化,虽然导致类装载的原因有很多种,在单例模式中大多数都是调用 getInstance 方法, 但是也不能确定有其他的方式(或者其他的静态方法)导致类装载,这时候初始化 instance 显然没有达到 lazy loading 的效果。
public class Singleton{
// 指向自己实例的私有静态引用,主动创建
private static Singleton instance = new Singleton();
private Singleton(){}
public static Singleton getInstance(){
return instance;
}
}
双重校验锁(DCL)
是否Lazy初始化:是
是否多线程安全:是
实现难度:较复杂
描述:采用双锁机制,安全且在多线程情况下能保持高性能
getInstance()的性能对应用程序很关键
public class Singleton {
//程序运行时创建一个静态只读的进程辅助对象
private volatile static Singleton singleton;
private Singleton (){}
public static Singleton getSingleton() {
//先判断是否存在,不存在再加锁处理
if (singleton == null) {
//在同一个时刻加了锁的那部分程序只有一个线程可以进入
synchronized (Singleton.class) {
if (singleton == null) {
singleton = new Singleton();
}
}
}
return singleton;
}
}
登记式/静态内部类
是否 Lazy 初始化:是
是否多线程安全:是
实现难度:一般
描述:这种方式能达到双检锁方式一样的功效,但实现更简单。对静态域使用延迟初始化,应使用这种方式而不是双检锁方式。这种方式只适用于静态域的情况,双检锁方式可在实例域需要延迟初始化时使用。 这种方式同样利用了 classloader 机制来保证初始化 instance 时只有一个线程,它跟第 3 种方式不同的是:第 3 种方式只要 Singleton 类被装载了,那么 instance 就会被实例化(没有达到 lazy loading 效果),而这种方式是 Singleton 类被装载了,instance 不一定被初始化。因为 SingletonHolder 类没有被主动使用,只有通过显式调用 getInstance 方法时,才会显式装载 SingletonHolder 类,从而实例化 instance。想象一下,如果实例化 instance 很消耗资源,所以想让它延迟加载,另外一方面,又不希望在 Singleton 类加载时就实例化,因为不能确保 Singleton 类还可能在其他的地方被主动使用从而被加载,那么这个时候实例化 instance 显然是不合适的。这个时候,这种方式相比第 3 种方式就显得很合理。
public class Singleton {
private static class SingletonHolder {
//在第一次引用类的任何成员时创建实例,公共语言运行库负责处理变量初始化
private static final Singleton INSTANCE = new Singleton();
}
private Singleton (){}
public static final Singleton getInstance() {
return SingletonHolder.INSTANCE;
}
}
深入理解CAS
什么是CAS
package com.dahan.cas;
import java.util.concurrent.atomic.AtomicInteger;
public class CASDemp {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
//期望,更新
//public final boolean compareAndSet(int expect, int update)
//如果我期望的值达到了 那么就更新 否则 就不更新 CAS是CPU的并发原语
System.out.println(atomicInteger.compareAndSet(2020, 2021));//返回true
System.out.println(atomicInteger.get());
atomicInteger.getAndIncrement();
System.out.println(atomicInteger.compareAndSet(2020, 2021));//返回false
System.out.println(atomicInteger.get());
}
}
CAS:比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作,如果不是就一直循环
缺点:
-
循环会耗时
-
一次性只能保证一个共享变量的原子性
-
ABA问题
CAS:ANA问题(狸猫换太子)
package com.dahan.cas;
import java.util.concurrent.atomic.AtomicInteger;
public class CASDemp {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
//期望,更新
//public final boolean compareAndSet(int expect, int update)
//如果我期望的值达到了 那么就更新 否则 就不更新 CAS是CPU的并发原语
//===========捣乱的线程============
System.out.println(atomicInteger.compareAndSet(2020, 2021));//返回true
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2021, 2020));//返回true
System.out.println(atomicInteger.get());
//===========期望的线程============
System.out.println(atomicInteger.compareAndSet(2020, 2021));//返回false
System.out.println(atomicInteger.get());
}
}
原子引用
带版本号的原子操作
package com.dahan.cas;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;
public class CASDemp {
public static void main(String[] args) {
AtomicStampedReference<Integer> atomic = new AtomicStampedReference<>(123, 1);
new Thread(()->{
int stamp = atomic.getStamp();//获得版本号
System.out.println("a1=>"+stamp);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomic.compareAndSet(123,111,atomic.getStamp(),atomic.getStamp()+1);
System.out.println("a2=>"+atomic.getStamp());
atomic.compareAndSet(111,123,atomic.getStamp(),atomic.getStamp()+1);
System.out.println("a3=>"+atomic.getStamp());
},"a").start();
new Thread(()->{
int stamp = atomic.getStamp();//获得版本号
System.out.println("b1=>"+stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomic.compareAndSet(123,121,stamp,atomic.getStamp());
System.out.println("b2=>"+atomic.getStamp());
},"b").start();
}
}
注意
各种锁的理解
公平锁 非公平锁
公平锁:不能插队,必须先来后到
非公平锁:可以插队,默认就是非公平锁
public ReentrantLock(){
sync = new NonfairSync();
}
public ReentrantLock(boolean fair){
sync = fair ? new FairSync() : new NonfairSync();
}
可重入锁
可重入锁(递归锁)
自旋锁
自定义一个锁
package com.dahan.lock;
import java.util.concurrent.atomic.AtomicReference;
//自旋锁
public class SpinlockDemo {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void mylock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"==>mylock");
//自旋锁
while (!atomicReference.compareAndSet(null,thread)){
}
}
//解锁
public void myUnlock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"=>myUnlock");
atomicReference.compareAndSet(thread,null);
}
}
测试
package com.dahan.lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class TestSpinLock {
public static void main(String[] args) throws InterruptedException {
// ReentrantLock reentrantLock = new ReentrantLock();
// reentrantLock.lock();
// reentrantLock.unlock();
SpinlockDemo spinlockDemo = new SpinlockDemo();
new Thread(()->{
spinlockDemo.mylock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
spinlockDemo.myUnlock();
}
},"a").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
spinlockDemo.mylock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
spinlockDemo.myUnlock();
}
},"b").start();
}
}