当前位置: 首页 > article >正文

J.U.C(2)

目录

  • JUC
    • 一:stamedLock
      • 乐观读
    • 二:stamphore
      • 应用
      • 原理
    • 三:countDownLatch
        • 应用
    • 四:CyclicBarrier

JUC

一:stamedLock

  • 该类从jdk8加入,目的是进一步提高读的性能,他的特点是在使用读,写锁时都要配合【戳】来进行使用;

加解读锁:

long stamp = lock.readLock();
lock.unlockRead(stamp);

加解写锁:

long stamp = lock.writeLock();
lock.unlockWrite(stamp);

乐观读

  • 乐观读,stamedLock支持使用tryOptimisticRead方法进行乐观读,在使用之后还要进行检验,使用validate,如果检验成功,说明期间没有线程进行写操作,数据有效。如果检验失败,则要进行锁升级,重新获取读锁,保证数据安全
long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
    // 锁升级
}

示例:

class DataContainerStamped {
    
    private int data;
    private final StampedLock lock = new StampedLock();
    
    public DataContainerStamped(int data) {
        this.data = data;
    }
    
    public int read(int readTime) {
        long stamp = lock.tryOptimisticRead();
        log.debug("optimistic read locking...{}", stamp);
        sleep(readTime);
        if (lock.validate(stamp)) {
            log.debug("read finish...{}, data:{}", stamp, data);
            return data;
        }
        // 锁升级 - 读锁
        log.debug("updating to read lock... {}", stamp);
        try {
            stamp = lock.readLock();
            log.debug("read lock {}", stamp);
            sleep(readTime);
            log.debug("read finish...{}, data:{}", stamp, data);
            return data;
        } finally {
            log.debug("read unlock {}", stamp);
            lock.unlockRead(stamp);
        }
    }
    
    public void write(int newData) {
        long stamp = lock.writeLock();
        log.debug("write lock {}", stamp);
        try {
            sleep(2);
            this.data = newData;
        } finally {
            log.debug("write unlock {}", stamp);
            lock.unlockWrite(stamp);
        }
    }
    
}

读 - 读不影响并发:

public static void main(String[] args) {
    DataContainerStamped dataContainer = new DataContainerStamped(1);
    new Thread(() -> {
        dataContainer.read(1);
    }, "t1").start();
    
    sleep(0.5);
    new Thread(() -> {
        dataContainer.read(0);
    }, "t2").start();
}
15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256 
15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256 
15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1 
15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1

读-写的时候会进行锁升级:

public static void main(String[] args) {
    DataContainerStamped dataContainer = new DataContainerStamped(1);
    
    new Thread(() -> {
        dataContainer.read(1);
    }, "t1").start();
    
    sleep(0.5);
    new Thread(() -> {
        dataContainer.write(100);
    }, "t2").start();
}
15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256 
15:57:00.717 c.DataContainerStamped [t2] - write lock 384 
15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256 
15:57:02.719 c.DataContainerStamped [t2] - write unlock 384 
15:57:02.719 c.DataContainerStamped [t1] - read lock 513 
15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000 
15:57:03.719 c.DataContainerStamped [t1] - read unlock 513

虽然stamedLock在读的性能上较为优秀,但是还要几个缺点:

1:不可重入;

2:不支持条件变量;

二:stamphore

  • 中文是信号量,可以控制访问资源的线程数:
public static void main(String[] args) {
    // 1. 创建 semaphore 对象
    Semaphore semaphore = new Semaphore(3);
    // 2. 10个线程同时运行
    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            // 3. 获取许可
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                log.debug("running...");
                sleep(1);
                log.debug("end...");
            } finally {
                // 4. 释放许可
                semaphore.release();
            }
        }).start();
    }
 }
  • 这里初始化semaphore的大小是3,所以一次只能有三个线程获得许可,剩余的线程陷入阻塞;

输出:

07:35:15.485 c.TestSemaphore [Thread-2] - running... 
07:35:15.485 c.TestSemaphore [Thread-1] - running... 
07:35:15.485 c.TestSemaphore [Thread-0] - running... 
07:35:16.490 c.TestSemaphore [Thread-2] - end... 
07:35:16.490 c.TestSemaphore [Thread-0] - end... 
07:35:16.490 c.TestSemaphore [Thread-1] - end... 
07:35:16.490 c.TestSemaphore [Thread-3] - running... 
07:35:16.490 c.TestSemaphore [Thread-5] - running... 
07:35:16.490 c.TestSemaphore [Thread-4] - running... 
07:35:17.490 c.TestSemaphore [Thread-5] - end... 
07:35:17.490 c.TestSemaphore [Thread-4] - end... 
07:35:17.490 c.TestSemaphore [Thread-3] - end... 
07:35:17.490 c.TestSemaphore [Thread-6] - running... 
07:35:17.490 c.TestSemaphore [Thread-7] - running... 
07:35:17.490 c.TestSemaphore [Thread-9] - running... 
07:35:18.491 c.TestSemaphore [Thread-6] - end... 
07:35:18.491 c.TestSemaphore [Thread-7] - end... 
07:35:18.491 c.TestSemaphore [Thread-9] - end... 
07:35:18.491 c.TestSemaphore [Thread-8] - running... 
07:35:19.492 c.TestSemaphore [Thread-8] - end...

应用

  • semaphore可以用来限流,在访问高峰的时候,将请求阻塞住,等高峰过去之后再将线程释放,但只支持单机,并且这里限制的线程数,而不是资源数;

  • semaphore也可以实现简单的连接池,替代awit,signal,提高了性能和可读性;(线程数和数据库连接数相同)

@Slf4j(topic = "c.Pool")
class Pool {
    // 1. 连接池大小
    private final int poolSize;
    // 2. 连接对象数组
    private Connection[] connections;
    // 3. 连接状态数组 0 表示空闲, 1 表示繁忙
    private AtomicIntegerArray states;
    private Semaphore semaphore;
    // 4. 构造方法初始化
    public Pool(int poolSize) {
        this.poolSize = poolSize;
        // 让许可数与资源数一致
        this.semaphore = new Semaphore(poolSize);
        this.connections = new Connection[poolSize];
        this.states = new AtomicIntegerArray(new int[poolSize]);
        for (int i = 0; i < poolSize; i++) {
            connections[i] = new MockConnection("连接" + (i+1));
        }
    }
    // 5. 借连接
    public Connection borrow() {// t1, t2, t3
        // 获取许可
        try {
            semaphore.acquire(); // 没有许可的线程,在此等待
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < poolSize; i++) {
            // 获取空闲连接
            if(states.get(i) == 0) {
                if (states.compareAndSet(i, 0, 1)) {
                    log.debug("borrow {}", connections[i]);
                    return connections[i];
                }
            }
        }
        // 不会执行到这里
        return null;
    }
    // 6. 归还连接
    public void free(Connection conn) {
        for (int i = 0; i < poolSize; i++) {
            if (connections[i] == conn) {
                states.set(i, 0);
                log.debug("free {}", conn);
                semaphore.release();
                break;
            }
        }
    }
    
}

原理

  • aquire

aquire方法->acquireSharedInterruptibly方法->tryAcquireShared方法->nonfairTryAcquireShared方法

我们之前提起过,tryAcquireShared,-1是加锁失败,0成功,正数还剩多少state可以获取

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();//获取state
        int remaining = available - acquires;//state-1
        if (remaining < 0 ||//如果减了之后等于负数,直接返会负数。如果没有呢,就进行cas赋值
            compareAndSetState(available, remaining))
            return remaining;
    }
}
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

当tryAcquireShared返回值为负数时进入doAcquireSharedInterruptibly方法;

因为没有位置可以使用了,就像之前的reentrantLock一样进入等待队列:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
  • 加入队列后,再尝试获取锁,然后就会将前驱节点的值设为-1之后就park阻塞住;

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 释放锁的原理

release方法->releaseShared方法->tryReleaseShared方法

tryReleaseShared方法中,会让state+1;

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

加一之后进入if块执行doReleaseShared方法:

  • 将AQS队列中的头节点的状态设为0,然后唤醒后继节点,然后要判断自己是否是老二节点,如果是就去尝试获取锁,就是让state减一,如果返回结果大于等于0,就将自己的节点作为头节点,然后再判断后面的节点是否是share类型的,如果是的话也会唤醒,尝试获取锁失败还是会陷入阻塞
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

三:countDownLatch

  • countDownLatch用于多线程的同步协作,等待全部线程倒计时
  • 构造参数是初始化的等待计数值,await方法使线程阻塞等待计数器的值归0,countDown用来将计数值减一;

基本用法示例:

public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(3);
    new Thread(() -> {
        log.debug("start...");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.debug("end.....");
        latch.countDown();
    }, "t1").start();
    new Thread(() -> {
        log.debug("start...");
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.debug("end.....");
        latch.countDown();
    }, "t2").start();
    new Thread(() -> {
        log.debug("start...");
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.debug("end.....");
        latch.countDown();
    }, "t3").start();
    log.debug("wait....");
    latch.await();
    log.debug("ok");
}

结果:

19:42:26.333 [t3] DEBUG com.hbu.aqs.testCount - start...
19:42:26.333 [t1] DEBUG com.hbu.aqs.testCount - start...
19:42:26.333 [t2] DEBUG com.hbu.aqs.testCount - start...
19:42:26.333 [main] DEBUG com.hbu.aqs.testCount - wait....
19:42:26.841 [t2] DEBUG com.hbu.aqs.testCount - end.....
19:42:27.340 [t1] DEBUG com.hbu.aqs.testCount - end.....
19:42:27.845 [t3] DEBUG com.hbu.aqs.testCount - end.....
19:42:27.846 [main] DEBUG com.hbu.aqs.testCount - ok

等待所有线程运行结束,即计数器减为0时,主线程才停止等待继续运行;

应用

等待所有线程运行完毕:

public static void main(String[] args) throws InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(10);
    CountDownLatch latch=new CountDownLatch(10);
    Random random=new Random();
    String[]all=new String[10];
    for (int j = 0; j < 10; j++) {
        int k = j;
        pool.submit(()->{
            for (int i = 0; i <= 100; i++) {
                all[k]=i+"%";
                try {
                    Thread.sleep(random.nextInt(80));
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.print("\r"+Arrays.toString(all));
            }
            latch.countDown();
        });
    }
    pool.shutdown();
    latch.await();
    System.out.println("\n"+"欢迎来到王者荣耀!");
}

这里面“\r”和随机睡眠时间是进度条显示的关键;

  • 应用之同步等待多个远程调用结束

每次调用都开一个线程,每个调用结束都countDown,最后所有的调用结束,主线程结束等待;

@RestController
public class TestCountDownlatchController {
    @GetMapping("/order/{id}")
    public Map<String, Object> order(@PathVariable int id) {
        HashMap<String, Object> map = new HashMap<>();
        map.put("id", id);
        map.put("total", "2300.00");
        sleep(2000);
        return map;
    }
    
    @GetMapping("/product/{id}")
    public Map<String, Object> product(@PathVariable int id) {
        HashMap<String, Object> map = new HashMap<>();
        if (id == 1) {
            map.put("name", "小爱音箱");
            map.put("price", 300);
        } else if (id == 2) {
            map.put("name", "小米手机");
            map.put("price", 2000);
        }
        map.put("id", id);
        sleep(1000);
        return map;
    }
    
    @GetMapping("/logistics/{id}")
    public Map<String, Object> logistics(@PathVariable int id) {
        HashMap<String, Object> map = new HashMap<>();
        map.put("id", id);
        map.put("name", "中通快递");
        sleep(2500);
        return map;
    }
    
    private void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

可以使用future阻塞接收有返回值的submit。

RestTemplate restTemplate = new RestTemplate();
log.debug("begin");
ExecutorService service = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(4);

Future<Map<String,Object>> f1 = service.submit(() -> {
    Map<String, Object> r =
        restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
    latch.countDown();
    return r;
});
Future<Map<String, Object>> f2 = service.submit(() -> {
    Map<String, Object> r =
        restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
    latch.countDown();
    return r;
});
Future<Map<String, Object>> f3 = service.submit(() -> {
    Map<String, Object> r =
        restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2);
    latch.countDown();
    return r;
});
Future<Map<String, Object>> f4 = service.submit(() -> {
    Map<String, Object> r =
        restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1);
    latch.countDown();
    return r;
});
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
System.out.println(f4.get());

latch.await();
log.debug("执行完毕");
service.shutdown();
19:51:39.711 c.TestCountDownLatch [main] - begin 
{total=2300.00, id=1} 
{price=300, name=小爱音箱, id=1} 
{price=2000, name=小米手机, id=2} 
{name=中通快递, id=1} 
19:51:42.407 c.TestCountDownLatch [main] - 执行完毕

四:CyclicBarrier

[ˈsaɪklɪk ˈbæriɚ] 循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行.

调用await方法减一,直到计数器为0,才会调用构造参数中的runnable方法,然后调用完了还会重置计数器的参数;

计数器的初始值要与线程数保持一致;

public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        CyclicBarrier cyclicBarrier=new CyclicBarrier(2,()->{
            log.debug("task 1.task2 running again...");
        });
        for (int i = 0; i < 3; i++) {
            pool.submit(()->{
                log.debug("task1 ..begin");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                try {
                    log.debug("task1..end");
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    throw new RuntimeException(e);
                }

            },"t1");
            pool.submit(()->{
                log.debug("task2 ..begin");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                try {
                    log.debug("task2..end");
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    throw new RuntimeException(e);
                }

            },"t2");
        }

        pool.shutdown();
    }

http://www.kler.cn/a/504226.html

相关文章:

  • 智能物流升级利器——SAIL-RK3576核心板AI边缘计算网关设计方案(一)
  • 【绝对无坑】Mongodb获取集合的字段以及数据类型信息
  • 如何提高自动化测试覆盖率和效率
  • 龙蜥Linux系统部署docker21.1.3版本
  • Web前端界面开发
  • ElasticSearch 同义词匹配
  • JSON简介与使用
  • golang 环境变量配置
  • CSS语言的计算机基础
  • 在 Azure 100 学生订阅中新建 Ubuntu VPS 并通过 Docker 部署 Mastodon 服务器
  • C#委托(Delegate)基本用法
  • GD32F470Z外部晶振不起振
  • 【Hive】新增字段(column)后,旧分区无法更新数据问题
  • 1.13 多线程编程
  • 社群团购项目运营策略的深度剖析:融合链动2+1模式、AI智能名片与S2B2C商城小程序的综合应用
  • EasyExcel - 行合并策略(二级列表)
  • Elasticsearch:向量数据库基础设施类别的兴衰
  • 易我视频照片修复EaseUS Fixo Technician
  • 简聊MySQL并发事务中幻读、虚读问题的解决方案
  • GPU算力平台|在GPU算力平台部署Qwen-2通义千问大模型的教程
  • sniffer 日志分析吞吐问题
  • 结合night compute分析 利用tensor core 优化K值较大的矩阵乘(超过cublas50%)
  • vue的KeepAlive应用(针对全部页面及单一页面进行缓存)
  • 精通Python (10)
  • 【Linux】8.Linux基础开发工具使用(2)
  • React中的key有什么作用?