跳到主要内容

异步编排优化

创建线程

第一种方式:

public class ThreadDemo {
public static void main(String[] args) {
System.out.println("main .... start ....");
MyThread myThread = new MyThread();
myThread.start();
System.out.println("main .... end ....");
}

public static class MyThread extends Thread {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("i = " + i);
}
}
}

第二种方式:

public static void main(String[] args) {
System.out.println("main .... start ....");
Runnable01 runnable01 = new Runnable01();
Thread thread = new Thread(runnable01);
thread.start();
System.out.println("main .... end ....");
}

public static class Runnable01 implements Runnable {

@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("i = " + i);
}
}

第三种方式:

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main .... start ....");
FutureTask<Integer> task = new FutureTask<>(new Callable01());
new Thread(task, "A").start();
Integer i = task.get();
//get会阻塞,直到线程执行完毕
System.out.println("i = " + i);
System.out.println("main .... end ....");
}

public static class Callable01 implements Callable<Integer> {

@Override
public Integer call() throws Exception {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("i = " + i);
return i;
}
}

第四种方式:

public static ExecutorService service = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main .... start ....");
service.execute(new Runnable01());
System.out.println("main .... end ....");
}

线程池

线程池构造器:

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  1. corePoolSize(核心线程数):
    • 描述:线程池中始终保持存活的线程数,即使它们处于空闲状态。
  2. maximumPoolSize(最大线程数):
    • 描述:线程池中允许存在的最大线程数。
  3. keepAliveTime(线程空闲时间):
    • 描述:当线程池中的线程数超过核心线程数时,多余的空闲线程在被终止之前等待新任务的最长时间。
  4. unit(时间单位):
    • 描述:用于指定 keepAliveTime 的时间单位,可以是秒、毫秒等。
  5. workQueue(工作队列):
    • 描述:用于保存等待执行的任务的阻塞队列。类型:BlockingQueue<Runnable>
  6. threadFactory(线程工厂):
    • 描述:用于创建新线程的工厂。类型:ThreadFactory 接口的实现。
  7. handler(拒绝策略):
    • 描述:当工作队列已满,并且无法再接受新任务时,用于处理新任务的策略。类型:RejectedExecutionHandler 接口的实现。

面试题:一个线程池 core 7; max 20 ,queue:50,100 并发进来怎么分配的;

答案:先有 7 个能直接得到执行,接下来 50 个进入队列排队,在多开 13 个继续执行。现在70 个被安排上了。剩下 30 个默认拒绝策略。

常见线程池:

  1. FixedThreadPool (固定大小线程池):
    • FixedThreadPool 是一个具有固定线程数量的线程池。
    • 在执行任务时,如果线程池中的线程都在执行任务,新任务会被放入队列中等待。
    • 适用于并发任务数量可控的场景。
  2. CachedThreadPool (缓存线程池):
    • CachedThreadPool 是一个可根据需要创建新线程的线程池,线程池的大小可动态调整。
    • 在执行任务时,如果线程池中的线程都在执行任务,会创建新的线程来处理新任务。
    • 适用于短生命周期的异步任务。
  3. SingleThreadExecutor (单线程线程池):
    • SingleThreadExecutor 是一个仅包含一个线程的线程池。
    • 所有提交的任务都按顺序执行,保证不会有并发执行的情况。
    • 适用于需要保证任务按照顺序执行的场景。
  4. ScheduledThreadPool (定时任务线程池):
    • ScheduledThreadPool 是一个支持定时以及周期性执行任务的线程池。
    • 可以用于执行定时任务,例如定时执行任务、周期性执行任务等。
    • 适用于需要按照一定规律执行任务的场景。

这些线程池实现都是通过 Executors 工厂类创建的,提供了方便的线程池创建和管理方式。

CompletableFuture

创建异步对象

public static CompletableFuture<Void> runAsync(Runnable runnable) 
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

runAsync没有返回值,supply有返回值

runAsync

public static ExecutorService service = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start ...");
CompletableFuture<Void> future =CompletableFuture.runAsync(()->{
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("i = " + i);
},service);
System.out.println("end ...");
}

supplyAsync

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start ...");
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("i = " + i);
return i;
}, service);
Integer i = integerCompletableFuture.get();
System.out.println("i = " + i);
System.out.println("end2 ...");
}

完成时回调

whenComplete回调

public static ExecutorService service = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start ...");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("current thread: " + Thread.currentThread().getName());
return 10 / 2;
}, service).whenComplete((result, e) -> {
System.out.println("current thread: " + Thread.currentThread().getName());
if (e == null) {
System.out.println("result: " + result);
} else {
System.out.println("exception: " + e);
}
}).exceptionally(e -> {
System.out.println("exception: " + e);
return 0;
});
Integer i = future.get();
System.out.println("result2: " + i);
System.out.println("end ...");
}

后续处理handle:

public static ExecutorService service = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start ...");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("current thread: " + Thread.currentThread().getName());
return 10 / 2;
}, service).handle((res, thr) -> {
System.out.println("current thread: " + Thread.currentThread().getName());
return res * 2;
});
Integer i = future.get();
System.out.println("result2: " + i);
System.out.println("end ...");
}

总结:

public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}

public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(defaultExecutor(), action);
}

public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}

public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(null, fn);
}

whenComplete 处理正常和异常的结果,exceptionally 处理异常情况。

whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。

whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行

handle:和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。

线程串行化方法

    public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}

public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(defaultExecutor(), fn);
}

public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}

  • thenApply: 这个方法表示当当前的CompletableFuture完成时,将执行提供的函数,并返回一个新的CompletableFuture,其结果是应用该函数的结果。
  • thenApplyAsync: 这是异步版本的thenApply,它使用默认的Executor执行器执行提供的函数。
  • thenApplyAsync(带有Executor参数): 这是具有自定义Executor执行器的异步版本,允许你指定一个特定的执行器来执行提供的函数。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(defaultExecutor(), action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
  • thenAccept: 当当前的CompletableFuture完成时,将执行提供的Consumer函数,但不返回新的结果。相反,返回一个CompletableFuture<Void>,表示这个阶段的操作不产生结果。
  • thenAcceptAsync: 这是异步版本的thenAccept,使用默认的Executor执行器执行提供的Consumer函数。
  • thenAcceptAsync(带有Executor参数): 这是具有自定义Executor执行器的异步版本,允许你指定一个特定的执行器来执行提供的Consumer函数。
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(defaultExecutor(), action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
  • thenRun: 当前CompletableFuture完成后,将执行提供的Runnable操作,但不返回新的结果。相反,返回一个CompletableFuture<Void>,表示这个阶段的操作不产生结果。
  • thenRunAsync: 这是thenRun的异步版本,使用默认的Executor执行器执行提供的Runnable操作。
  • thenRunAsync(带有Executor参数): 这是具有自定义Executor执行器的异步版本,允许你指定一个特定的执行器来执行提供的Runnable操作。

两任务组合

public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}

public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(defaultExecutor(), other, fn);
}

public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}

public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}

public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(defaultExecutor(), other, action);
}

public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
}

public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action) {
return biRunStage(null, other, action);
}

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action) {
return biRunStage(defaultExecutor(), other, action);
}

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
}
  1. thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值
  2. thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。
  3. runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个future 处理完任务后,处理该任务。

两任务组合完成一个

把上面的both换成either,当两个任务中,任意一个 future 任务完成的时候,执行任务。

多任务组合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

allOf:等待所有任务完成

anyOf:只要有一个任务完成

商品详情-异步编排

配置网关

- id: gulimall_host_route # gulimall.com
uri: lb://gulimall-product
predicates:
- Host=gulimall.com,item.gulimall.com

配置线程池:

@ConfigurationProperties(prefix = "gulimall.thread")
// @Component
@Data
public class ThreadPoolConfigProperties {

private Integer coreSize;

private Integer maxSize;

private Integer keepAliveTime;
}

在配置文件中输入这些值:

#config thread pool
gulimall.thread.coreSize=20
gulimall.thread.maxSize=200
gulimall.thread.keepAliveTime=10

配置线程池容器:

@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
return new ThreadPoolExecutor(
pool.getCoreSize(),
pool.getMaxSize(),
pool.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
}
}

使用:

@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {

SkuItemVo skuItemVo = new SkuItemVo();

CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
//1、sku基本信息的获取 pms_sku_info
SkuInfoEntity info = this.getById(skuId);
skuItemVo.setInfo(info);
return info;
}, executor);


CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {
//3、获取spu的销售属性组合
List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());
skuItemVo.setSaleAttr(saleAttrVos);
}, executor);


CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
//4、获取spu的介绍 pms_spu_info_desc
SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
skuItemVo.setDesc(spuInfoDescEntity);
}, executor);


CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
//5、获取spu的规格参数信息
List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
skuItemVo.setGroupAttrs(attrGroupVos);
}, executor);


// Long spuId = info.getSpuId();
// Long catalogId = info.getCatalogId();

//2、sku的图片信息 pms_sku_images
CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);
skuItemVo.setImages(imagesEntities);
}, executor);
//等到所有任务都完成
CompletableFuture.allOf(saleAttrFuture,descFuture,baseAttrFuture,imageFuture).get();

return skuItemVo;
}