JUC并发编程与源码分析
线程基础知识复习
Java开启一个线程的源码:
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */ if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */ group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */ }
}
}
private native void start0();
可以发现调用的start0是native,底层是c++实现的,此时需要去下载Java底层源码
链接:https://github.com/openjdk/jdk8
Thread.java对应的源码就是Thread.c,start0就是JVM_StartThread,可以在jvm.h中找到声明,jvm.cpp中实现
位置:jdk/src/share/native/java/lang/Thread.c
jdk/src/share/javavm/export/jvm.h
hotspot/src/share/vm/prims/jvm.cpp
hotspot/src/share/vm/runtime/thread.cpp
jvm.cpp中
在thread.cpp中:
CompletableFuture
Future
接口是 Java 并发编程中用于表示异步计算结果的接口。它允许你提交一个任务并在将来某个时候获取任务的执行结果。Future
接口提供了一种异步获取计算结果的机制,可以在任务执行完成之前进行其他操作,避免了阻塞等待计算结果的情况。
比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙其它事情或者先执行完,过了一会才去获取子任务的执行结果或变更的任务状态。
FutureTask
FutureTask
是 Java 并发包中的一个类,实现了 RunnableFuture
接口,而 RunnableFuture
接口又扩展自 Runnable
和 Future
接口。它是一个可取消的异步计算任务,允许在计算完成之前进行取消操作,同时也可以通过实现 Callable
接口来支持有返回值的任务。主要有两个构造函数,分别是可以传入Callable和Runnable接口。
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
* * @param callable the callable task
* @throws NullPointerException if the callable is null
*/public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion. * * @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using * constructions of the form: * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
api调用:
public class Test {
public static void main(String[] args) {
FutureTask<String> futureTask = new FutureTask<>(new MyThread());
Thread a = new Thread(futureTask, "A");
a.start();
try {
System.out.println(futureTask.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("---come in call()");
return "Hello";
}
}
使用多线程和单个线程的区别:
import java.util.concurrent.*;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
m1();
m2();
}
private static void m2() throws InterruptedException, ExecutionException {
long startTime=System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(3);
FutureTask<String> futureTask = new FutureTask<>(()->{
try{TimeUnit.MILLISECONDS.sleep(500);}catch (InterruptedException e){e.printStackTrace();}
return "task1 end";
});
executorService.submit(futureTask);
FutureTask<String> futureTask2 = new FutureTask<>(()->{
try{TimeUnit.MILLISECONDS.sleep(300);}catch (InterruptedException e){e.printStackTrace();}
return "task2 end";
});
executorService.submit(futureTask2);
FutureTask<String> futureTask3 = new FutureTask<>(()->{
try{TimeUnit.MILLISECONDS.sleep(200);}catch (InterruptedException e){e.printStackTrace();}
return "task3 end";
});
executorService.submit(futureTask3);
System.out.println("futureTask.get() = " + futureTask.get());
System.out.println("futureTask2.get() = " + futureTask2.get());
System.out.println("futureTask3.get() = " + futureTask3.get());
executorService.shutdown();
long endTime=System.currentTimeMillis();
System.out.println("thread Time: "+(endTime-startTime));
}
private static void m1() {
//只用一个线程处理
long startTime=System.currentTimeMillis();
try{TimeUnit.MILLISECONDS.sleep(500);}catch (InterruptedException e){e.printStackTrace();}
try{TimeUnit.MILLISECONDS.sleep(300);}catch (InterruptedException e){e.printStackTrace();}
try{TimeUnit.MILLISECONDS.sleep(200);}catch (InterruptedException e){e.printStackTrace();}
long endTime=System.currentTimeMillis();
System.out.println("dont use thread Time: "+(endTime-startTime));
}
}
运行结果: 可以看到,速度快了一倍左右
问题:阻塞等待结果: 在调用 get
方法获取任务的执行结果时,如果任务尚未完成,get
方法会阻塞等待任务完成。这可能导致程序在获取结果时被阻塞,影响整体性能。为了避免这种情况,可以使用带有超时参数的 get
方法,或者结合其他机制来处理。
轮询耗费CPU:
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println("Hello, World!");
return "take over";
});
Thread thread = new Thread(futureTask);
thread.start();
while (true) {
if (futureTask.isDone()) {
System.out.println(futureTask.get());
break; } else {
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("waiting...");
}
}
结论:Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。 对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果。 改进:使用CompletableFuture
CompletableFuture
CompletableFuture
类是 Java 并发编程中提供的一个强大的工具,用于处理异步操作。它支持通过回调函数(观察者模式)的方式处理异步计算的结果。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能 力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法。 它可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些 动作。它实现了Future和CompletionStage接口,尽量不要使用new构建 创建CompletableFuture类:
- 无返回值:runAsync
- 有返回值:completedFuture
CompletableFuture<Void> completableFuture=CompletableFuture.runAsync(()->{
System.out.println("Hello World");
});
System.out.println(completableFuture.get());
测试CompletableFuture:
如果没有ExecutorService线程池,那么程序会直接结束,不会等待异步线程
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
System.out.println("Thread: " + Thread.currentThread().getName() + " is running");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("after 1 second sleep,get result" + 1);
return 1;
},executorService).whenComplete((v,e)->{
if (e==null){
System.out.println("Thread: " + Thread.currentThread().getName() + " is running");
System.out.println("result is " + v);
}
}).exceptionally(e->{
e.printStackTrace();
System.out.println("exception is "+e.getCause());
return null; });
System.out.println(Thread.currentThread().getName()+"is running");
executorService.shutdown();
}
函数式接口
函数式接口 | 方法名 | 参数 | 返回类型 | 示例 |
---|---|---|---|---|
Runnable | run | 无 | void | Runnable myRunnable = () -> { /* 任务执行 */ }; |
Function<T, R> | apply | T | R | Function<Integer, String> intToString = (integer) -> "数字: " + integer; |
Consumer<T> | accept | T | void | Consumer<String> printUpperCase = (str) -> System.out.println(str.toUpperCase()); |
Supplier<T> | get | 无 | T | Supplier<Double> randomNumber = () -> Math.random(); |
BiConsumer<T, U> | accept | T, U | void | BiConsumer<Integer, String> printKeyValue = (key, value) -> System.out.println(key + ": " + value); |
真实案例:
- 需求说明
- 同一款产品,同时搜索出同款产品在各大电商平台的售价;
- 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
- 输出返回:
出来结果希望是同款产品的在不同地方的价格清单列表,返回一个
List<String>
- 《mysql》)in jd price is88.05
- 《mysql》)in dangdang price is86.11
- 《mysql》)in taobao price is90.43
- 解决方案,比对同一个商品在各个平台上的价格,要求获得一个清单列表,
- step by step,按部就班,查完京东查淘宝,查完淘宝查天猫.
- all in 万箭齐发,一口气多线程异步任务同时查询。
单步查询:
public class Test {
static List<NetMall> list = Arrays.asList(
new NetMall("jd"),
new NetMall("taobao"),
new NetMall("dangdang")
);
public static List<String> getPrice(List<NetMall> list, String productName) {
return list.stream()
.map(netMall -> {
return String.format(productName + "in %s price is %.2f", netMall.getNetMallName(), netMall.calPrice(productName));
}).toList();
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
System.out.println(getPrice(list, "mysql"));
System.out.println("Done in " + (System.currentTimeMillis() - start) + "ms");
}
}
class NetMall {
private String netMallName;
public NetMall(String netMallName) {
this.netMallName = netMallName;
}
public String getNetMallName() {
return netMallName;
}
public double calPrice(String productName) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
}
}
修改: 使用CompletableFuture提高性能
public static List<String> getPriceByCompletableFuture(List<NetMall> list, String productName) {
return list.stream().map(netMall -> {
return CompletableFuture.supplyAsync(() -> {
return String.format(productName + "in %s price is %.2f", netMall.getNetMallName(), netMall.calPrice(productName));
});
}).collect(Collectors.toList())
.stream().map(s -> s.join())
.collect(Collectors.toList());
}
结果如下:
几个获取异步结果的对比: