跳到主要内容

Gateway中Feign调用问题

问题背景

我们希望在Gateway中做一些登录鉴权,记录日志等操作,这些操作的实现可能不在Gateway中,可能需要Gateway自己去通过HTTP来调用其他服务以实现这些功能,但是在Gateway是基于Reactor模式的响应式编程,而Feign调用是阻塞式调用,这两者相矛盾,并且在官方文档里面Reactive-Support指出,由于OpenFeign 项目目前不支持响应式客户端,例如Spring WebClient ,Spring Cloud OpenFeign 也不支持。一旦核心项目可用,我们将在此处添加对其的支持。在此之前,我们建议使用feign-reactive来支持 Spring WebClient。

image-20240826233731386

问题复现

简单写一个FeignClient

@FeignClient(name = "testClient", url = "http://localhost:8080") // 请将URL替换为实际服务地址
public interface TestFeignClient {

@GetMapping("/test/get")
R get(@RequestParam("name") String name, @RequestParam("age") String age);

@PostMapping("/test/post")
R post(@RequestBody User user);
}

在过滤器中随便调用一个,模拟鉴权等操作

@Component
public class FirstFilter implements GlobalFilter {

@Autowired
private TestFeignClient testFeignClient;

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
User user = new User();
user.setAge(1);
user.setName("测试Feign调用");
System.out.println("线程:" + Thread.currentThread().getName() + ",调用Feign");
R post = testFeignClient.post(user);
System.out.println("线程:" + Thread.currentThread().getName() + ",调用结果:" + post);
return chain.filter(exchange);
}
}

报错:No qualifying bean of type 'org.springframework.boot.autoconfigure.http.HttpMessageConverters' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)}

image-20240826234812541

解决办法:

@Configuration
public class FeignConfig {

@Bean
public HttpMessageConverters messageConverters() {
List<HttpMessageConverter<?>> converters = new ArrayList<>();
converters.add(new MappingJackson2HttpMessageConverter()); // 根据需要添加其他的HttpMessageConverter
return new HttpMessageConverters(converters);
}
}

此时可以实现调用:

image-20240826235420017

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
User user = new User();
user.setAge(1);
user.setName("测试Feign调用");
System.out.println("线程:" + Thread.currentThread().getName() + ",调用Feign");
R post = testFeignClient.post(user);
System.out.println("线程:" + Thread.currentThread().getName() + ",调用结果:" + post);
return chain.filter(exchange);
}

也就是这样其实不会失败,为什么呢?

因为现在相当于直接进行HTTP调用,并没有负载均衡等操作,现在加上,为了更加真实,应该是从Nacos中拉取服务,然后自动负载选择一个实例进行调用:

<!--openFeign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<!--网关-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

<!-- nacos配置中心-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- nacos服务发现-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

此时再去调用就会发现报错了:

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2

image-20240827092537302

问题原因

在微服务场景下,服务间的调用可以使用Feign的方式,但是网关是Reactor模式,即异步调用模式,而Feign调用为同步方式,这里使用Feign调用,所以这两者之间就会出现矛盾。

解决办法

直接解决报错(不推荐)

自定义一个BlockingLoadBalancerClient.java Bean覆盖原有Bean

public class CustomBlockingLoadBalancerClient extends BlockingLoadBalancerClient {
private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;

public CustomBlockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory, LoadBalancerProperties properties) {
super(loadBalancerClientFactory, properties);
this.loadBalancerClientFactory = loadBalancerClientFactory;
}

@Override
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
}
CompletableFuture<Response<ServiceInstance>> f = CompletableFuture.supplyAsync(() -> {
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
return loadBalancerResponse;
});
Response<ServiceInstance> loadBalancerResponse = null;


try {
loadBalancerResponse = f.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
if (loadBalancerResponse == null) {
return null;
}
return loadBalancerResponse.getServer();
}
}

创建BlockingLoadBalancerClient类将自定义CustomBlockingLoadBalancerClient注入到容器中

@Configuration
public class BlockingLoadBalancerClientConfig {

@Autowired
LoadBalancerClientFactory loadBalancerClientFactory;

@Autowired
LoadBalancerProperties properties;

@Bean
public LoadBalancerClient BlockingLoadBalancerClient() {
return new CustomBlockingLoadBalancerClient(loadBalancerClientFactory, properties);
}
}

测试结果:

image-20240827105238963

这种方式可以解决报错的问题。但是,在gateway网关中强行使用feignClient,同步调用,其实是有风险的一个事情。假设feignClient的下游服务,由于某些原因导致性能变慢。而gateway是同步阻塞式的调用。那么gateway的主线程也会被阻塞。由于gateway底层实际上就是netty的线程池,有两个线程池(主从多线程模型)这种模型使用一个独立的线程池来处理连接请求(Acceptor),而I/O操作则由另一个线程池处理。这种方式可以减少连接请求处理对I/O操作的干扰,提高系统并发性能。

image-20240827105328436

线程池转异步(不推荐)

我们使用线程池来将Feign同步调用转为异步调用:

private final ExecutorService executorService = Executors.newFixedThreadPool(10);  // 创建线程池

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
User user = new User();
user.setAge(1);
user.setName("测试Feign调用");
Future<R> future = executorService.submit(() -> {
System.out.println("线程:" + Thread.currentThread().getName() + ",调用Feign");
return testFeignClient.post(user);
});
try {
R post = future.get();
System.out.println("线程:" + Thread.currentThread().getName() + ",调用结果:" + post);
boolean flag = (Boolean) post.getData();
if (!flag) {
// 鉴权失败
return onError(exchange, "访问拒绝");
}
ServerHttpRequest request = exchange.getRequest();
ServerHttpRequest modifyRequest = request.mutate().header("FLAG", "true").build();
return chain.filter(exchange.mutate().request(modifyRequest).build());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

这种方式确实可行:

image-20240827093835656

但是这种方案并不是真正意义上的异步调用,只不过通过线程池强行提交了feign调用,而且获取feign调用返回结果的future.get()方法也是同步的;二是此种方式实在算不上优雅。

使用WebClient(推荐)

WebClient 是 Spring Reactor 的非阻塞、异步 HTTP 客户端,非常适合在 Spring WebFlux(Gateway 基于 WebFlux)应用中使用。它支持响应式编程模型,可以直接返回 MonoFlux,不需要额外处理阻塞问题。

配置WebClient,注意如果不加LoadBalanced,那么是不可以通过服务名来调用的,也就是不能配合Nacos的服务发现等功能

/**
* @author houyunfei
*/
@Configuration
public class WebClientConfig {
/**
* LoadBalanced 如果不添加,无法通过服务名进行调用,只能通过ip调用
*/
@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
return WebClient.builder();
}
}

进行调用:

@Autowired
private WebClient.Builder webClientBuilder; // 自动注入带有负载均衡的 WebClient.Builder


@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
User user = new User();
user.setAge(1);
user.setName("测试Feign调用");

// 使用 WebClient 异步调用远程服务
System.out.println("开始调用,线程id:" + Thread.currentThread().getId() + "线程名:" + Thread.currentThread().getName());
// 使用服务名称调用,并由负载均衡器处理
return webClientBuilder.build()
.post()
.uri("http://web-demo/test/post") // 使用服务名称
.bodyValue(user)
.retrieve()
.bodyToMono(R.class)
.flatMap(post -> {
System.out.println("调用结束,线程id:" + Thread.currentThread().getId() + "线程名:" + Thread.currentThread().getName());
System.out.println("调用结果:" + post);
boolean flag = (Boolean) post.getData();
if (!flag) {
// 鉴权失败
return onError(exchange, "访问拒绝");
}
// 修改请求头并继续执行过滤器链
ServerHttpRequest request = exchange.getRequest();
ServerHttpRequest modifyRequest = request.mutate().header("FLAG", "true").build();
return chain.filter(exchange.mutate().request(modifyRequest).build());
});
}

测试结果:

可以看到这样调用是可行的,并且都是由Reactor的线程来完成工作

image-20240827100048957

Feign-Reactive(推荐)

这是一个社区版的项目:https://github.com/PlaytikaOSS/feign-reactive,本质上就是通过WebClient来实现Feign的功能

Feign-Reactive 是 Feign 的一个扩展版本,允许你使用响应式编程模型来进行远程调用,返回 MonoFlux 对象,特别适合与 Spring WebFlux 一起使用。

<dependency>
<groupId>com.playtika.reactivefeign</groupId>
<artifactId>feign-reactor-spring-cloud-starter</artifactId>
<version>4.0.3</version>
</dependency>

如果是Macos的M系列芯片,可能会遇到以下问题:

是使用 Netty 库时遇到的 DNS 解析错误;此错误源于 Netty 无法访问本机 MacOS DNS 解析器。这样做的后果是可能出现不正确的 DNS 解析,这可能会导致应用程序中出现大量与网络相关的问题;

Netty 使用本机代码与系统的 DNS 解析器交互以获得最佳性能。在 MacOS 上,尤其是使用 M1 芯片的系统上,可能会缺少所需的本机库,从而导致上述错误。

解决方案是显式提供此本机库。

 Unable to load io.netty.resolver.dns.macos.MacOSDnsServerAddressStreamProvider, fallback to system defaults. This may result in incorrect DNS resolutions on MacOS. Check whether you have a dependency on 'io.netty:netty-resolver-dns-native-macos'

解决办法:

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver-dns-native-macos</artifactId>
<version>4.1.72.Final</version>
<classifier>osx-aarch_64</classifier>
</dependency>

开启EnableReactiveFeignClients

@SpringBootApplication
@EnableReactiveFeignClients
public class GatewayDemoApplication {

public static void main(String[] args) {
SpringApplication.run(GatewayDemoApplication.class, args);
}

}

定义需要调用的远程接口

@ReactiveFeignClient(name = "web-demo")  // 指定服务名称或服务ID
public interface ReactiveTestFeignClient {

@PostMapping("/test/post")
Mono<Object> post(@RequestBody User user); // 返回响应式类型 Mono
}

使用:

@Autowired
private ReactiveTestFeignClient reactiveTestFeignClient; // 注入 Feign 客户端

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
User user = new User();
user.setAge(1);
user.setName("测试Feign调用");

// 使用 WebClient 异步调用远程服务
System.out.println("开始调用,线程id:" + Thread.currentThread().getId() + "线程名:" + Thread.currentThread().getName());
// 使用服务名称调用,并由负载均衡器处理
// 使用 Reactive Feign 进行异步调用
return reactiveTestFeignClient.post(user)
.flatMap(post -> {
System.out.println("调用结束,线程id:" + Thread.currentThread().getId() + "线程名:" + Thread.currentThread().getName());
System.out.println("调用结果:" + post);
boolean flag = true;
if (!flag) {
// 鉴权失败
return onError(exchange, "访问拒绝");
}
// 修改请求头并继续执行过滤器链
ServerHttpRequest request = exchange.getRequest();
ServerHttpRequest modifyRequest = request.mutate().header("FLAG", "true").build();
return chain.filter(exchange.mutate().request(modifyRequest).build());
});
}

测试结果:

image-20240827103941295

使用注意点:

  • Reactive-feign和openFeign和我们平时的写法一样
  • 返回值需要用Mono封装。在处理请求结果时,也要用reactive的写法。
  • 通过这种reactive的写法,当我们下游feign调用的微服务变慢,并不会影响gateway的主线程,并不会拖垮网关

总结

Spring Cloud Gateway采用WebFlux响应式框架实现全异步处理,但由于其响应式编程代码难以理解,且直接调用同步的feign会出现问题。通过线程池将feign调用强制转为异步调用并非最佳方案,合格的程序员应深入探究并找到根本解决方案。使用ReactiveFeign可以优雅地解决Spring Cloud Gateway中feign同步调用的问题。

参考资料