java实验6 J.U.C并发编程
实验6 J.U.C并发编程
要求:
1)严禁上网抄袭、互相抄袭和各种形式的抄袭(如代码抄袭,运行截图一图多用),一旦发现单次作业按零分处理!
2)课程报告正文内容基本格式为:宋体,小五号,1.5 倍行距。
3)作业报告请务必保持排版的整洁规范,排版混乱者将直接判为不及格。
4)为避免办公软件兼容性导致的显示差异问题,要求在提交课程报告 WORD 文件的同
时提交相应的 PDF 版本。
5)在平台上传文件统一命名形如:实验{编号}-班级-2位数学号-姓名,例如:“实验1-计算机231-01-张三.DOCX”和“实验1-计算机231-01-张三.PDF”。文件命名不合规范者,单次作业直接判为不及格。
一、实验目的
1. 掌握 Java 中的并发编程技术,包括线程创建、线程同步、线程池等。
2. 掌握原子操作、 synchronized、Lock、CountDownLatch、CyclicBarrier、ExecutorService 等并发工具的使用。
- 实验内容
- 线程的创建与启动及线程池的使用。
要求:
1)使用两种方式创建并启动线程。
2)通过继承 Thread 类创建线程、通过实现 Runnable 接口创建线程。
3)线程池的使用。使用 ExecutorService 创建一个固定大小的线程池,并提交多个任务执行。
代码:import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread " + Thread.currentThread().getName() + " is running");
}
}
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Thread " + Thread.currentThread().getName() + " is running");
}
}
public class Main {
public static void main(String[] args) {
// 使用继承 Thread 类的方式创建并启动线程
MyThread myThread1 = new MyThread();
myThread1.start();
// 使用实现 Runnable 接口的方式创建并启动线程
Thread myThread2 = new Thread(new MyRunnable());
myThread2.start();
// 创建一个固定大小为 3 的线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 提交多个任务到线程池执行
for (int i = 0; i < 5; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("Thread from pool " + Thread.currentThread().getName() + " is running");
}
});
}
// 关闭线程池
executorService.shutdown();
}
}
运行截图:
- CountDownLatch 和 CyclicBarrier的使用。
要求:
1)使用 CountDownLatch 实现多个线程同时开始执行。
2)CyclicBarrier 实现多个线程在同一时刻执行某个任务。
代码:// CountDownLatchExample.java
import java.util.concurrent.CountDownLatch;
class Worker extends Thread {
private CountDownLatch startSignal;
Worker(CountDownLatch startSignal) {
this.startSignal = startSignal;
}
@Override
public void run() {
try {
startSignal.await(); // 等待开始信号
System.out.println(Thread.currentThread().getName() + " is running");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch startSignal = new CountDownLatch(1);
Worker[] workers = new Worker[threadCount];
for (int i = 0; i < threadCount; i++) {
workers[i] = new Worker(startSignal);
workers[i].start();
}
System.out.println("All threads are ready. Starting all threads at once.");
startSignal.countDown(); // 释放开始信号,所有等待的线程同时开始执行
}
}
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
class Task extends Thread {
private CyclicBarrier barrier;
Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " is waiting at the barrier");
barrier.await(); // 等待所有线程到达屏障
System.out.println(Thread.currentThread().getName() + " has crossed the barrier");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public class CyclicBarrierExample {
public static void main(String[] args) {
int threadCount = 5;
CyclicBarrier barrier = new CyclicBarrier(threadCount, new Runnable() {
@Override
public void run() {
System.out.println("All threads have reached the barrier. Now proceeding...");
}
});
Task[] tasks = new Task[threadCount];
for (int i = 0; i < threadCount; i++) {
tasks[i] = new Task(barrier);
tasks[i].start();
}
}
}
运行截图:
- 通过socket实现文件的分片上传与合并
要求:
1)使用多线程技术,实现在本地对文件进行分片,通过client将文件上传到server端。
2)server端需要实现同步机制,等接收到client端的所有分片后对文件进行合并。
代码:import java.io.*;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FileUploadClient {
private static final int THREAD_COUNT = 5;
private static final String SERVER_ADDRESS = "localhost";
private static final int SERVER_PORT = 8082;
private static final String FILE_PATH = System.getProperty("user.home") + "/Documents/input_file.txt"; // 用户文档目录下的文件
public static void main(String[] args) throws IOException {
File file = new File(FILE_PATH);
if (!file.exists()) {
System.err.println("File not found: " + FILE_PATH);
return;
}
long fileSize = file.length();
long chunkSize = fileSize / THREAD_COUNT;
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
long start = i * chunkSize;
long end = (i == THREAD_COUNT - 1) ? fileSize : start + chunkSize;
executorService.execute(new FileUploadTask(SERVER_ADDRESS, SERVER_PORT, FILE_PATH, start, end, i));
}
executorService.shutdown();
}
}
class FileUploadTask implements Runnable {
private String serverAddress;
private int serverPort;
private String filePath;
private long start;
private long end;
private int partNumber;
public FileUploadTask(String serverAddress, int serverPort, String filePath, long start, long end, int partNumber) {
this.serverAddress = serverAddress;
this.serverPort = serverPort;
this.filePath = filePath;
this.start = start;
this.end = end;
this.partNumber = partNumber;
}
@Override
public void run() {
try (Socket socket = new Socket(serverAddress, serverPort);
RandomAccessFile file = new RandomAccessFile(filePath, "r");
OutputStream out = socket.getOutputStream();
DataOutputStream dataOut = new DataOutputStream(out)) {
dataOut.writeInt(partNumber);
dataOut.writeLong(end - start);
file.seek(start);
byte[] buffer = new byte[1024];
long bytesToRead = end - start;
int bytesRead;
while (bytesToRead > 0 && (bytesRead = file.read(buffer, 0, (int) Math.min(buffer.length, bytesToRead))) != -1) {
out.write(buffer, 0, bytesRead);
bytesToRead -= bytesRead;
}
System.out.println("Part " + partNumber + " uploaded.");
} catch (IOException e) {
e.printStackTrace();
}
}
}
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
public class FileUploadServer {
private static final int PORT = 8082;
private static final int PART_COUNT = 5;
private static final String OUTPUT_FILE_PATH = System.getProperty("user.home") + "/Documents/output_file.txt"; // 用户文档目录下的输出文件
private static ConcurrentHashMap<Integer, byte[]> fileParts = new ConcurrentHashMap<>();
private static CountDownLatch latch = new CountDownLatch(PART_COUNT);
public static void main(String[] args) {
try (ServerSocket serverSocket = new ServerSocket(PORT)) {
System.out.println("Server is running...");
while (true) {
Socket socket = serverSocket.accept();
new Thread(new FileReceiveTask(socket)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
static class FileReceiveTask implements Runnable {
private Socket socket;
public FileReceiveTask(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try (InputStream in = socket.getInputStream();
DataInputStream dataIn = new DataInputStream(in)) {
int partNumber = dataIn.readInt();
long partSize = dataIn.readLong();
byte[] buffer = new byte[(int) partSize];
dataIn.readFully(buffer);
fileParts.put(partNumber, buffer);
latch.countDown();
System.out.println("Received part " + partNumber);
if (latch.getCount() == 0) {
mergeFileParts();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void mergeFileParts() {
try (FileOutputStream fos = new FileOutputStream(OUTPUT_FILE_PATH)) {
for (int i = 0; i < PART_COUNT; i++) {
fos.write(fileParts.get(i));
}
System.out.println("File merge completed.");
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
运行截图:
- 通过socket模拟实现Tensorflow分布式计算框架(基于PS架构)
要求:
1)实现两个Master节点,用于模拟计算任务的下发。
2)实现三个Worker节点,用于模拟实际计算任务的执行。
3)需要设计同步机制,当Master接收到所有Worker回传的计算结果后,进行结果统计;然后再下发任务
4)Master需要给Worker下发N个计算任务(N可以自由设置)
代码:import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
public class MasterNode {
private static final int PORT = 8084;
private static final int WORKER_COUNT = 3;
private static final int TASK_COUNT = 10;
private static CountDownLatch latch;
private static List<Integer> results = Collections.synchronizedList(new ArrayList<>());
public static void main(String[] args) throws IOException, InterruptedException {
ServerSocket serverSocket = new ServerSocket(PORT);
ExecutorService executorService = Executors.newFixedThreadPool(WORKER_COUNT);
System.out.println("Master node is running...");
for (int i = 0; i < WORKER_COUNT; i++) {
Socket socket = serverSocket.accept();
executorService.execute(new WorkerHandler(socket));
}
for (int i = 0; i < TASK_COUNT; i++) {
System.out.println("Dispatching task batch " + (i + 1));
latch = new CountDownLatch(WORKER_COUNT);
results.clear();
dispatchTasks();
latch.await(); // 等待所有Worker节点返回结果
System.out.println("All results received: " + results);
}
executorService.shutdown();
serverSocket.close();
}
private static void dispatchTasks() throws IOException {
for (int i = 0; i < WORKER_COUNT; i++) {
Socket socket = new Socket("localhost", 8081 + i);
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
out.println("Task " + (i + 1));
socket.close();
}
}
private static class WorkerHandler implements Runnable {
private Socket socket;
public WorkerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
String result;
while ((result = in.readLine()) != null) {
results.add(Integer.parseInt(result));
latch.countDown();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
import java.io.*;
import java.net.*;
public class WorkerNode {
private static final int MASTER_PORT = 8084;
private static final int PORT = 8083; // 每个 Worker 节点端口不同
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(PORT);
System.out.println("Worker node is running on port " + PORT);
while (true) {
Socket masterSocket = serverSocket.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(masterSocket.getInputStream()));
String task = in.readLine();
System.out.println("Received task: " + task);
// 模拟计算任务
int result = performTask(task);
// 将结果发送回 Master
Socket resultSocket = new Socket("localhost", MASTER_PORT);
PrintWriter out = new PrintWriter(resultSocket.getOutputStream(), true);
out.println(result);
resultSocket.close();
}
}
private static int performTask(String task) {
// 模拟计算,返回一个随机结果
return new java.util.Random().nextInt(100);
}
}
运行截图:
三、思考题
1. 简述锁和原子操作实现同步机制的区别。
锁和原子操作是实现同步机制的两种方式。锁是通过加锁和解锁操作来控制多个线程对共享资源的访问,保证数据的一致性和完整性。而原子操作则是通过不可中断的操作来保证数据在多线程环境下的安全性,避免了因线程切换导致的数据不一致问题。总的来说,锁是一种更通用的同步机制,而原子操作则适用于一些简单的操作。
- 简述并发编程中各种队列(如同步队列、延时队列)的区别。
并发编程中常见的队列包括同步队列和延时队列。同步队列通过锁和条件变量实现线程同步,确保多个线程对共享资源的互斥访问。延时队列则允许元素在指定的延迟时间后才被取出,常用于实现定时任务。总的来说,同步队列主要用于实现线程间的同步和互斥,而延时队列则适用于需要延时处理的场景。
- 简述Java中哪些机制通过乐观锁实现?
Java中通过乐观锁实现的机制包括:
1. AtomicInteger:提供了原子操作,用于整数类型的自增和自减。
2. AtomicLong:提供了原子操作,用于长整型的自增和自减。
3. AtomicReference:提供了原子操作,用于引用类型的更新。
4. AtomicBoolean:提供了原子操作,用于布尔类型的更新。
5. AtomicStampedReference:提供了带有版本号的原子操作,用于引用类型的更新。
6. AtomicMarkableReference:提供了带有标记的原子操作,用于引用类型的更新。
7. LongAdder:提供了高并发下的计数器,适用于大量线程同时更新的场景。
8. ConcurrentHashMap:提供了线程安全的哈希表,支持并发读写操作。