异步编排优化
创建线程
第一种方式:
public class ThreadDemo {
public static void main(String[] args) {
System.out.println("main .... start ....");
MyThread myThread = new MyThread();
myThread.start();
System.out.println("main .... end ....");
}
public static class MyThread extends Thread {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("i = " + i);
}
}
}
第二种方式:
public static void main(String[] args) {
System.out.println("main .... start ....");
Runnable01 runnable01 = new Runnable01();
Thread thread = new Thread(runnable01);
thread.start();
System.out.println("main .... end ....");
}
public static class Runnable01 implements Runnable {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("i = " + i);
}
}
第三种方式:
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main .... start ....");
FutureTask<Integer> task = new FutureTask<>(new Callable01());
new Thread(task, "A").start();
Integer i = task.get();
//get会阻塞,直到线程执行完毕
System.out.println("i = " + i);
System.out.println("main .... end ....");
}
public static class Callable01 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("i = " + i);
return i;
}
}
第四种方式:
public static ExecutorService service = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main .... start ....");
service.execute(new Runnable01());
System.out.println("main .... end ....");
}
线程池
线程池构造器:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize
(核心线程数):
- 描述:线程池中始终保持存活的线程数,即使它们处于空闲状态。
maximumPoolSize
(最大线程数):
- 描述:线程池中允许存在的最大线程数。
keepAliveTime
(线程空闲时间):
- 描述:当线程池中的线程数超过核心线程数时,多余的空闲线程在被终止之前等待新任务的最长时间。
unit
(时间单位):
- 描述:用于指定 keepAliveTime 的时间单位,可以是秒、毫秒等。
workQueue
(工作队列):
- 描述:用于保存等待执行的任务的阻塞队列。类型:BlockingQueue
<Runnable>
。threadFactory
(线程工厂):
- 描述:用于创建新线程的工厂。类型:ThreadFactory 接口的实现。
handler
(拒绝策略):
- 描述:当工作队列已满,并且无法再接受新任务时,用于处理新任务的策略。类型:RejectedExecutionHandler 接口的 实现。
面试题:一个线程池 core 7; max 20 ,queue:50,100 并发进来怎么分配的;
答案:先有 7 个能直接得到执行,接下来 50 个进入队列排队,在多开 13 个继续执行。现在70 个被安排上了。剩下 30 个默认拒绝策略。
常见线程池:
- FixedThreadPool (固定大小线程池):
FixedThreadPool
是一个具有固定线程数量的线程池。- 在执行任务时,如果线程池中的线程都在执行任务,新任务会被放入队列中等待。
- 适用于并发任务数量可控的场景。
- CachedThreadPool (缓存线程池):
CachedThreadPool
是一个可根据需要创建新线程的线程池,线程池的大小可动态调整。- 在执行任务时,如果线程池中的线程都在执行任务,会创建新的线程来处理新任务。
- 适用于短生命周期的异步任务。
- SingleThreadExecutor (单线程线程池):
SingleThreadExecutor
是一个仅包含一个线程的线程池。- 所有提交的任务都按顺序执行,保证不会有并发执行的情况。
- 适用于需要保证任务按照顺序执行的场景。
- ScheduledThreadPool (定时任务线程池):
ScheduledThreadPool
是一个支持定时以及周期性执行任务的线程池。- 可以用于执行定时任务,例如定时执行任务、周期性执行任务等。
- 适用于需要按照一定规律执行任务的场景。
这些线程池实现都是通过 Executors
工厂类创建的,提供了方便的线程池创建和管理方式。
CompletableFuture
创建异步对象
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
runAsync没有返回值,supply有返回值
runAsync
public static ExecutorService service = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start ...");
CompletableFuture<Void> future =CompletableFuture.runAsync(()->{
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("i = " + i);
},service);
System.out.println("end ...");
}
supplyAsync
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start ...");
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("i = " + i);
return i;
}, service);
Integer i = integerCompletableFuture.get();
System.out.println("i = " + i);
System.out.println("end2 ...");
}
完成时回调
whenComplete回调
public static ExecutorService service = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start ...");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("current thread: " + Thread.currentThread().getName());
return 10 / 2;
}, service).whenComplete((result, e) -> {
System.out.println("current thread: " + Thread.currentThread().getName());
if (e == null) {
System.out.println("result: " + result);
} else {
System.out.println("exception: " + e);
}
}).exceptionally(e -> {
System.out.println("exception: " + e);
return 0;
});
Integer i = future.get();
System.out.println("result2: " + i);
System.out.println("end ...");
}
后续处理handle:
public static ExecutorService service = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start ...");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("current thread: " + Thread.currentThread().getName());
return 10 / 2;
}, service).handle((res, thr) -> {
System.out.println("current thread: " + Thread.currentThread().getName());
return res * 2;
});
Integer i = future.get();
System.out.println("result2: " + i);
System.out.println("end ...");
}
总结:
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(defaultExecutor(), action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(null, fn);
}
whenComplete 处理正常和异常的结果,exceptionally 处理异常情况。
whenComplete:是执行当前任务的线 程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行
handle:和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。
线程串行化方法
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(defaultExecutor(), fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
thenApply
: 这个方法表示当当前的CompletableFuture完成时,将执行提供的函数,并返回一个新的CompletableFuture,其结果是应用该函数的结果。thenApplyAsync
: 这是异步版本的thenApply
,它使用默认的Executor执行器执行提供的函数。thenApplyAsync
(带有Executor参数): 这是具有自定义Executor执行器的异步版本,允许你指定一个特定的执行器来执行提供的函数。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(defaultExecutor(), action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
thenAccept
: 当当前的CompletableFuture完成时,将执行提供的Consumer函数,但不返回新的结果。相反,返回一个CompletableFuture<Void>
,表示这个阶段的操作不产生结果。thenAcceptAsync
: 这是异步版本的thenAccept
,使用默认的Executor执行器执行提供的Consumer函数。thenAcceptAsync
(带有Executor参数): 这是具有自定义Executor执行器的异步版本,允许你指定一个特定的执行器来执行提供的Consumer函数。
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(defaultExecutor(), action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
thenRun
: 当前CompletableFuture
完成后,将执行提供的Runnable
操作,但不返回新的结果。相反,返回一个CompletableFuture<Void>
,表示这个阶段的操作不产生结果。thenRunAsync
: 这是thenRun
的异步版本,使用默认的Executor
执行器执行提供的Runnable
操作。thenRunAsync
(带有Executor
参数): 这是具有自定义Executor
执行器的异步版本,允许你指定一个特定的执行器来执行提供的Runnable
操作。
两任务组合
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(defaultExecutor(), other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(defaultExecutor(), other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
}
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action) {
return biRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action) {
return biRunStage(defaultExecutor(), other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
}
- thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值
- thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。
- runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个future 处理完任务后,处理该任务。
两任务组合完成一个
把上面的both换成either,当两个任务中,任意一个 future 任务完成的时候,执行任务。
多任务组合
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf:等待所有任务完成
anyOf:只要有一个任务完成
商品详情-异步编排
配置网关
- id: gulimall_host_route # gulimall.com
uri: lb://gulimall-product
predicates:
- Host=gulimall.com,item.gulimall.com
配置线程池:
@ConfigurationProperties(prefix = "gulimall.thread")
// @Component
@Data
public class ThreadPoolConfigProperties {
private Integer coreSize;
private Integer maxSize;
private Integer keepAliveTime;
}
在配置文件中输入这些值:
#config thread pool
gulimall.thread.coreSize=20
gulimall.thread.maxSize=200
gulimall.thread.keepAliveTime=10
配置线程池容器:
@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
return new ThreadPoolExecutor(
pool.getCoreSize(),
pool.getMaxSize(),
pool.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
}
}
使用:
@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
SkuItemVo skuItemVo = new SkuItemVo();
CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
//1、sku基本信息的获取 pms_sku_info
SkuInfoEntity info = this.getById(skuId);
skuItemVo.setInfo(info);
return info;
}, executor);
CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {
//3、获取spu的销售属性组合
List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());
skuItemVo.setSaleAttr(saleAttrVos);
}, executor);
CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
//4、获取spu的介绍 pms_spu_info_desc
SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
skuItemVo.setDesc(spuInfoDescEntity);
}, executor);
CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
//5、获取spu的规格参数信息
List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
skuItemVo.setGroupAttrs(attrGroupVos);
}, executor);
// Long spuId = info.getSpuId();
// Long catalogId = info.getCatalogId();
//2、sku的图片信息 pms_sku_images
CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);
skuItemVo.setImages(imagesEntities);
}, executor);
//等到所有任务都完成
CompletableFuture.allOf(saleAttrFuture,descFuture,baseAttrFuture,imageFuture).get();
return skuItemVo;
}