当前位置: 首页 > article >正文

java实验6 J.U.C并发编程

实验6  J.U.C并发编程

要求:

1)严禁上网抄袭、互相抄袭和各种形式的抄袭(如代码抄袭,运行截图一图多用),一旦发现单次作业按零分处理

2)课程报告正文内容基本格式为:宋体,小五号,1.5 倍行距。

3)作业报告请务必保持排版的整洁规范,排版混乱者将直接判为不及格。

4)为避免办公软件兼容性导致的显示差异问题,要求在提交课程报告 WORD 文件的同

时提交相应的 PDF 版本。

5)在平台上传文件统一命名形如:实验{编号}-班级-2位数学号-姓名,例如:实验1-计算机231-01-张三.DOCX”和“实验1-计算机231-01-张三.PDF”。文件命名不合规范者,单次作业直接判为不及格。

一、实验目的

1. 掌握 Java 中的并发编程技术,包括线程创建、线程同步、线程池等。

2. 掌握原子操作、 synchronized、Lock、CountDownLatch、CyclicBarrier、ExecutorService 等并发工具的使用。

  • 实验内容

  1. 线程的创建与启动及线程池的使用。

要求:

1)使用两种方式创建并启动线程。

2)通过继承 Thread 类创建线程、通过实现 Runnable 接口创建线程。

3)线程池的使用。使用 ExecutorService 创建一个固定大小的线程池,并提交多个任务执行。

代码:import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

class MyThread extends Thread {

    @Override

    public void run() {

        System.out.println("Thread " + Thread.currentThread().getName() + " is running");

    }

}

class MyRunnable implements Runnable {

    @Override

    public void run() {

        System.out.println("Thread " + Thread.currentThread().getName() + " is running");

    }

}

public class Main {

    public static void main(String[] args) {

        // 使用继承 Thread 类的方式创建并启动线程

        MyThread myThread1 = new MyThread();

        myThread1.start();

        // 使用实现 Runnable 接口的方式创建并启动线程

        Thread myThread2 = new Thread(new MyRunnable());

        myThread2.start();

        // 创建一个固定大小为 3 的线程池

        ExecutorService executorService = Executors.newFixedThreadPool(3);

        // 提交多个任务到线程池执行

        for (int i = 0; i < 5; i++) {

            executorService.submit(new Runnable() {

                @Override

                public void run() {

                    System.out.println("Thread from pool " + Thread.currentThread().getName() + " is running");

                }

            });

        }

        // 关闭线程池

        executorService.shutdown();

    }

}

运行截图:

  1. CountDownLatch 和 CyclicBarrier的使用。

要求:

1)使用 CountDownLatch 实现多个线程同时开始执行。

2)CyclicBarrier 实现多个线程在同一时刻执行某个任务。

代码:// CountDownLatchExample.java

import java.util.concurrent.CountDownLatch;

class Worker extends Thread {

    private CountDownLatch startSignal;

    Worker(CountDownLatch startSignal) {

        this.startSignal = startSignal;

    }

    @Override

    public void run() {

        try {

            startSignal.await();  // 等待开始信号

            System.out.println(Thread.currentThread().getName() + " is running");

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

    }

}

public class CountDownLatchExample {

    public static void main(String[] args) throws InterruptedException {

        int threadCount = 5;

        CountDownLatch startSignal = new CountDownLatch(1);

        Worker[] workers = new Worker[threadCount];

        for (int i = 0; i < threadCount; i++) {

            workers[i] = new Worker(startSignal);

            workers[i].start();

        }

        System.out.println("All threads are ready. Starting all threads at once.");

        startSignal.countDown();  // 释放开始信号,所有等待的线程同时开始执行

    }

}

  import java.util.concurrent.BrokenBarrierException;

import java.util.concurrent.CyclicBarrier;

class Task extends Thread {

    private CyclicBarrier barrier;

    Task(CyclicBarrier barrier) {

        this.barrier = barrier;

    }

    @Override

    public void run() {

        try {

            System.out.println(Thread.currentThread().getName() + " is waiting at the barrier");

            barrier.await();  // 等待所有线程到达屏障

            System.out.println(Thread.currentThread().getName() + " has crossed the barrier");

        } catch (InterruptedException | BrokenBarrierException e) {

            e.printStackTrace();

        }

    }

}

public class CyclicBarrierExample {

    public static void main(String[] args) {

        int threadCount = 5;

        CyclicBarrier barrier = new CyclicBarrier(threadCount, new Runnable() {

            @Override

            public void run() {

                System.out.println("All threads have reached the barrier. Now proceeding...");

            }

        });

        Task[] tasks = new Task[threadCount];

        for (int i = 0; i < threadCount; i++) {

            tasks[i] = new Task(barrier);

            tasks[i].start();

        }

    }

}

运行截图:

  1. 通过socket实现文件的分片上传与合并

要求:

1)使用多线程技术,实现在本地对文件进行分片,通过client将文件上传到server端。

2)server端需要实现同步机制,等接收到client端的所有分片后对文件进行合并。

代码:import java.io.*;

import java.net.Socket;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class FileUploadClient {

    private static final int THREAD_COUNT = 5;

    private static final String SERVER_ADDRESS = "localhost";

    private static final int SERVER_PORT = 8082;

    private static final String FILE_PATH = System.getProperty("user.home") + "/Documents/input_file.txt"; // 用户文档目录下的文件

    public static void main(String[] args) throws IOException {

        File file = new File(FILE_PATH);

        if (!file.exists()) {

            System.err.println("File not found: " + FILE_PATH);

            return;

        }

        long fileSize = file.length();

        long chunkSize = fileSize / THREAD_COUNT;

        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);

        for (int i = 0; i < THREAD_COUNT; i++) {

            long start = i * chunkSize;

            long end = (i == THREAD_COUNT - 1) ? fileSize : start + chunkSize;

            executorService.execute(new FileUploadTask(SERVER_ADDRESS, SERVER_PORT, FILE_PATH, start, end, i));

        }

        executorService.shutdown();

    }

}

class FileUploadTask implements Runnable {

    private String serverAddress;

    private int serverPort;

    private String filePath;

    private long start;

    private long end;

    private int partNumber;

    public FileUploadTask(String serverAddress, int serverPort, String filePath, long start, long end, int partNumber) {

        this.serverAddress = serverAddress;

        this.serverPort = serverPort;

        this.filePath = filePath;

        this.start = start;

        this.end = end;

        this.partNumber = partNumber;

    }

    @Override

    public void run() {

        try (Socket socket = new Socket(serverAddress, serverPort);

             RandomAccessFile file = new RandomAccessFile(filePath, "r");

             OutputStream out = socket.getOutputStream();

             DataOutputStream dataOut = new DataOutputStream(out)) {

            dataOut.writeInt(partNumber);

            dataOut.writeLong(end - start);

            file.seek(start);

            byte[] buffer = new byte[1024];

            long bytesToRead = end - start;

            int bytesRead;

            while (bytesToRead > 0 && (bytesRead = file.read(buffer, 0, (int) Math.min(buffer.length, bytesToRead))) != -1) {

                out.write(buffer, 0, bytesRead);

                bytesToRead -= bytesRead;

            }

            System.out.println("Part " + partNumber + " uploaded.");

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

}

import java.io.*;

import java.net.ServerSocket;

import java.net.Socket;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.CountDownLatch;

public class FileUploadServer {

    private static final int PORT = 8082;

    private static final int PART_COUNT = 5;

    private static final String OUTPUT_FILE_PATH = System.getProperty("user.home") + "/Documents/output_file.txt"; // 用户文档目录下的输出文件

    private static ConcurrentHashMap<Integer, byte[]> fileParts = new ConcurrentHashMap<>();

    private static CountDownLatch latch = new CountDownLatch(PART_COUNT);

    public static void main(String[] args) {

        try (ServerSocket serverSocket = new ServerSocket(PORT)) {

            System.out.println("Server is running...");

            while (true) {

                Socket socket = serverSocket.accept();

                new Thread(new FileReceiveTask(socket)).start();

            }

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    static class FileReceiveTask implements Runnable {

        private Socket socket;

        public FileReceiveTask(Socket socket) {

            this.socket = socket;

        }

        @Override

        public void run() {

            try (InputStream in = socket.getInputStream();

                 DataInputStream dataIn = new DataInputStream(in)) {

                int partNumber = dataIn.readInt();

                long partSize = dataIn.readLong();

                byte[] buffer = new byte[(int) partSize];

                dataIn.readFully(buffer);

                fileParts.put(partNumber, buffer);

                latch.countDown();

                System.out.println("Received part " + partNumber);

                if (latch.getCount() == 0) {

                    mergeFileParts();

                }

            } catch (IOException e) {

                e.printStackTrace();

            }

        }

        private void mergeFileParts() {

            try (FileOutputStream fos = new FileOutputStream(OUTPUT_FILE_PATH)) {

                for (int i = 0; i < PART_COUNT; i++) {

                    fos.write(fileParts.get(i));

                }

                System.out.println("File merge completed.");

            } catch (IOException e) {

                e.printStackTrace();

            }

        }

    }

}

运行截图:

  1. 通过socket模拟实现Tensorflow分布式计算框架(基于PS架构)

要求:

1)实现两个Master节点,用于模拟计算任务的下发。

2)实现三个Worker节点,用于模拟实际计算任务的执行。

3)需要设计同步机制,当Master接收到所有Worker回传的计算结果后,进行结果统计;然后再下发任务

4)Master需要给Worker下发N个计算任务(N可以自由设置)

代码:import java.io.*;

import java.net.*;

import java.util.*;

import java.util.concurrent.*;

public class MasterNode {

    private static final int PORT = 8084;

    private static final int WORKER_COUNT = 3;

    private static final int TASK_COUNT = 10;

    private static CountDownLatch latch;

    private static List<Integer> results = Collections.synchronizedList(new ArrayList<>());

    public static void main(String[] args) throws IOException, InterruptedException {

        ServerSocket serverSocket = new ServerSocket(PORT);

        ExecutorService executorService = Executors.newFixedThreadPool(WORKER_COUNT);

        System.out.println("Master node is running...");

        for (int i = 0; i < WORKER_COUNT; i++) {

            Socket socket = serverSocket.accept();

            executorService.execute(new WorkerHandler(socket));

        }

        for (int i = 0; i < TASK_COUNT; i++) {

            System.out.println("Dispatching task batch " + (i + 1));

            latch = new CountDownLatch(WORKER_COUNT);

            results.clear();

            dispatchTasks();

            latch.await(); // 等待所有Worker节点返回结果

            System.out.println("All results received: " + results);

        }

        executorService.shutdown();

        serverSocket.close();

    }

    private static void dispatchTasks() throws IOException {

        for (int i = 0; i < WORKER_COUNT; i++) {

            Socket socket = new Socket("localhost", 8081 + i);

            PrintWriter out = new PrintWriter(socket.getOutputStream(), true);

            out.println("Task " + (i + 1));

            socket.close();

        }

    }

    private static class WorkerHandler implements Runnable {

        private Socket socket;

        public WorkerHandler(Socket socket) {

            this.socket = socket;

        }

        @Override

        public void run() {

            try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {

                String result;

                while ((result = in.readLine()) != null) {

                    results.add(Integer.parseInt(result));

                    latch.countDown();

                }

            } catch (IOException e) {

                e.printStackTrace();

            }

        }

    }

}

import java.io.*;

import java.net.*;

public class WorkerNode {

    private static final int MASTER_PORT = 8084;

    private static final int PORT = 8083; // 每个 Worker 节点端口不同

    public static void main(String[] args) throws IOException {

        ServerSocket serverSocket = new ServerSocket(PORT);

        System.out.println("Worker node is running on port " + PORT);

        while (true) {

            Socket masterSocket = serverSocket.accept();

            BufferedReader in = new BufferedReader(new InputStreamReader(masterSocket.getInputStream()));

            String task = in.readLine();

            System.out.println("Received task: " + task);

            // 模拟计算任务

            int result = performTask(task);

            // 将结果发送回 Master

            Socket resultSocket = new Socket("localhost", MASTER_PORT);

            PrintWriter out = new PrintWriter(resultSocket.getOutputStream(), true);

            out.println(result);

            resultSocket.close();

        }

    }

    private static int performTask(String task) {

        // 模拟计算,返回一个随机结果

        return new java.util.Random().nextInt(100);

    }

}

运行截图:

三、思考题

1. 简述锁和原子操作实现同步机制的区别。

    锁和原子操作是实现同步机制的两种方式。锁是通过加锁和解锁操作来控制多个线程对共享资源的访问,保证数据的一致性和完整性。而原子操作则是通过不可中断的操作来保证数据在多线程环境下的安全性,避免了因线程切换导致的数据不一致问题。总的来说,锁是一种更通用的同步机制,而原子操作则适用于一些简单的操作。                                     

  1. 简述并发编程中各种队列(如同步队列、延时队列)的区别。

     并发编程中常见的队列包括同步队列和延时队列。同步队列通过锁和条件变量实现线程同步,确保多个线程对共享资源的互斥访问。延时队列则允许元素在指定的延迟时间后才被取出,常用于实现定时任务。总的来说,同步队列主要用于实现线程间的同步和互斥,而延时队列则适用于需要延时处理的场景。                                    

  1. 简述Java中哪些机制通过乐观锁实现?

    Java中通过乐观锁实现的机制包括:

1. AtomicInteger:提供了原子操作,用于整数类型的自增和自减。

2. AtomicLong:提供了原子操作,用于长整型的自增和自减。

3. AtomicReference:提供了原子操作,用于引用类型的更新。

4. AtomicBoolean:提供了原子操作,用于布尔类型的更新。

5. AtomicStampedReference:提供了带有版本号的原子操作,用于引用类型的更新。

6. AtomicMarkableReference:提供了带有标记的原子操作,用于引用类型的更新。

7. LongAdder:提供了高并发下的计数器,适用于大量线程同时更新的场景。

8. ConcurrentHashMap:提供了线程安全的哈希表,支持并发读写操作。        


http://www.kler.cn/a/470998.html

相关文章:

  • HCIA-Access V2.5_8_2_EPON基本架构和关键参数
  • 均值聚类算法
  • 【CPU】页表项和叶子表项(个人草稿)
  • Golang开发-案例整理汇总
  • C# 检查一个字符串是否是科学计数法格式字符串 如 1.229266E+01
  • 企业网络性能监控
  • jEasyUI 创建页脚摘要
  • Linux驱动开发 gpio_get_value读取输出io的电平返回值一直为0的问题
  • 咖啡馆系统|Java|SSM|JSP|
  • [Unity Shader] 【游戏开发】Unity基础光照1-光照模型原理
  • QT中如何限制 限制QLineEdit只能输入字母,或数字,或某个范围内数字等限制约束?
  • 26考研资料分享 百度网盘
  • Chrome 浏览器下载安装教程,保姆级教程
  • linux系统(ubuntu,uos等)连接鸿蒙next(mate60)设备
  • 【prometheus】Pushgateway安装和使用
  • Devart dotConnect发布全新版本,支持EF Core 9、完全兼容 .NET 9 等!
  • Ubuntu24.04.1 LTS+Win11双系统安装记录
  • node.js之---内置模块
  • 信号处理-消除趋势项
  • VulnHub-Acid(1/100)
  • 前端面试题-(webpack基础)
  • 计算机网络常见面试题及解答
  • 在Linux中,zabbix如何监控脑裂?
  • 接口开发完后,个人对于接下来接口优化的一些思考
  • iOS - 自旋锁
  • Unity【Colliders碰撞器】和【Rigibody刚体】的应用——小球反弹效果