跳到主要内容

Gateway中Body读取问题

问题背景

网关作为所有服务的入口,会对所有的流量进行处理,转发等操作。我们希望在网关中可以完成一些通用的操作,例如,记录请求和响应日志,做权限校验,登录校验等,然而,在Spring Cloud Gateway中进行请求体的读取会有一些问题,在此记录。

问题复现

现在有两个拦截器,都想要读取请求体的内容做一些操作:

@Component
public class FirstGlobalFilter implements GlobalFilter, Ordered {

@Override
public int getOrder() {
return -1; //// 设置过滤器的优先级,数字越小,优先级越高
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 在这里拦截请求,执行一些逻辑,例如日志记录
System.out.println("第一个拦截器:Request intercepted: " + exchange.getRequest().getURI());
ServerHttpRequest request = exchange.getRequest();
String body = GatewayUtils.resolveBodyFromRequest(request);
System.out.println("第一个拦截器:Request body: " + body);
// 继续执行过滤器链
return chain.filter(exchange);
}
}

@Component
public class SecondGlobalFilter implements GlobalFilter, Ordered {
@Override
public int getOrder() {
return 0;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 在这里拦截请求,执行一些逻辑,例如日志记录
System.out.println("第二个拦截器:Request intercepted: " + exchange.getRequest().getURI());
ServerHttpRequest request = exchange.getRequest();
String body = GatewayUtils.resolveBodyFromRequest(request);
System.out.println("第二个拦截器:Request body: " + body);
// 继续执行过滤器链
return chain.filter(exchange);
}
}

最简单的读取方式,也就是环境搭建里用到的,我们在全局拦截器中获取:

public class GatewayUtils {
public static String resolveBodyFromRequest(ServerHttpRequest serverHttpRequest) {
Flux<DataBuffer> body = serverHttpRequest.getBody();
AtomicReference<String> bodyRef = new AtomicReference<>();
body.subscribe(buffer -> {
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());
DataBufferUtils.release(buffer);
bodyRef.set(charBuffer.toString());
});
return bodyRef.get();
}
}

然而这样读取请求体会有问题

运行报错reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Only one connection receive subscriber allowed.

image-20240826172319988

问题原因

spring-cloud-gateway反向代理的原理是,首先读取原请求的数据,然后构造一个新的请求,将原请求的数据封装到新的请求中,然后再转发出去。然而我们在他封装之前读取了一次request body,而request body只能读取一次。因此就出现了上面的错误。

  • 请求体无法正确解析:Spring WebFlux 中的请求体是一个 Flux<DataBuffer>,它是一个异步的流,不能在同步的代码块(如使用的 resolveBodyFromRequest 方法中)直接获取其内容。如果试图使用 AtomicReference 来保存请求体,但这种做法是同步的,无法正确处理异步的流。

  • 请求体只能读取一次ServerHttpRequest.getBody() 只能读取一次,读取后内容就会被消费掉,后续的过滤器将无法再次读取请求体。因此,直接通过多个过滤器读取请求体的做法是不可行的。

我们在拦截器中获取请求的一些信息

    @Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 在这里拦截请求,执行一些逻辑,例如日志记录
ServerHttpRequest request = exchange.getRequest();
System.out.println("第一个拦截器:Request intercepted: " + request.getURI());
HttpHeaders headers = request.getHeaders();
MultiValueMap<String, HttpCookie> cookies = request.getCookies();
MultiValueMap<String, String> queryParams = request.getQueryParams();
Flux<DataBuffer> body = request.getBody();
return chain.filter(exchange);
}

可以看到如果是取 Header、Cookie、Query Params 都很容易,但是如果想要取body,就不容易了。这里的Body返回的是一个Flux<DataBuffer>,即一个包含 0-N 个 DataBuffer 类型元素的异步序列。如果不考虑Request Body只能读取一次的问题(可以缓存),我们可以先把这个Flux转为字符串,最简单的办法是使用:block或者subscribe

  1. 使用block()
Flux<DataBuffer> body = request.getBody();
// 通过block()方法获取请求体中的数据
String bodyStr = body.blockLast().toString();
System.out.println("第一个拦截器:Request body: " + bodyStr);
return chain.filter(exchange);

这种操作相当于把异步流转为了同步,然而Flux请求体的数据是以流的形式异步传递的。而 blockLast() 会阻塞当前线程,直到 Flux 中的所有数据都被处理完毕,并返回最后一个数据。这意味着将异步流的处理变成了同步处理会引起以下问题:

  • 阻塞性block()blockLast() 会阻塞当前线程,直到数据完全读取完毕。在响应式编程环境中,阻塞会降低系统的性能和响应性,尤其是在高并发的情况下
  • 违背响应式设计理念:Spring WebFlux 基于响应式编程模型,整个流程应当是异步和非阻塞的。引入阻塞操作违背了响应式设计理念,可能导致瓶颈和线程阻塞,无法充分利用非阻塞 I/O 的优势。

并且这种方式在运行也会报错: block()/blockFirst()/blockLast() are blocking

image-20240826204522799

  1. 使用subscirbe
        Flux<DataBuffer> body = request.getBody();
// 通过 subscribe 方法异步处理请求体
body.subscribe(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
String bodyStr = new String(bytes, StandardCharsets.UTF_8);
System.out.println("第一个拦截器:Request body: " + bodyStr);
});

这样确实可以读取到body的内容,但是如果说body的长度非常大,就会导致一次性读取不完整的情况,例如:

我这里大概有一万个a,导致一次性无法读取全部的请求体,因为请求体是流的形式。

image-20240826211835129

解决办法

异步处理(无法解决读取多次问题)

针对读取到的数据为空的情况:使用异步处理方式:将读取请求体的逻辑放在 MonoFlux 的异步流中,确保数据能够正确处理。这种方式其实和上面的subscribe差不多,只不过这里使用map

subscribe 是数据流的终端操作,触发流的执行并处理数据(类似于开始消费数据)。

map 是中间操作,用于转换数据流中的元素,返回的是一个新的数据流,不会触发流的执行。map 是惰性求值,只有当终端操作(例如 subscribe)被调用时,才会真正开始处理数据流。

public class GatewayUtils {
// 异步解析请求体
public static Mono<String> resolveBodyFromRequest(ServerWebExchange exchange) {
ServerHttpRequest request = exchange.getRequest();
return DataBufferUtils.join(request.getBody())
.map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return new String(bytes, StandardCharsets.UTF_8);
});
}
}


@Component
public class FirstGlobalFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 在这里拦截请求,执行一些逻辑,例如日志记录
System.out.println("第一个拦截器:Request intercepted: " + exchange.getRequest().getURI());
return GatewayUtils.resolveBodyFromRequest(exchange)
.flatMap(body -> {
System.out.println("第一个拦截器:Request body: " + body);
// 继续执行过滤器链
return chain.filter(exchange);
});
}
}

@Component
public class SecondGlobalFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 在这里拦截请求,执行一些逻辑,例如日志记录
System.out.println("第二个拦截器:Request intercepted: " + exchange.getRequest().getURI());
return GatewayUtils.resolveBodyFromRequest(exchange)
.flatMap(body -> {
System.out.println("第二个拦截器:Request body: " + body);
// 继续执行过滤器链
return chain.filter(exchange);
});
}
}

运行结果:这种方式可以读取到请求体,但是依然不可以保证多次读取

image-20240826173634358

包装请求(可行)

使用 ServerHttpRequestDecorator 来重新包装请求,使请求体可以再次读取,确保后续过滤器能够继续处理。先在全局过滤器中获取,然后再把request重新包装,继续向下传递传递

具体代码如下:

public class GatewayUtils {
// 装饰请求以确保后续过滤器可以读取请求体
public static ServerHttpRequest decorateRequest(ServerWebExchange exchange, String body) {
ServerHttpRequest request = exchange.getRequest();
return new ServerHttpRequestDecorator(request) {
@Override
public Flux<DataBuffer> getBody() {
DataBuffer buffer = new DefaultDataBufferFactory().wrap(body.getBytes(StandardCharsets.UTF_8));
return Flux.just(buffer);
}
};
}
}

@Component
public class FirstGlobalFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 在这里拦截请求,执行一些逻辑,例如日志记录
System.out.println("第一个拦截器:Request intercepted: " + exchange.getRequest().getURI());
return GatewayUtils.resolveBodyFromRequest(exchange)
.flatMap(body -> {
System.out.println("第一个拦截器:Request body: " + body);
// 继续执行过滤器链
ServerHttpRequest decoratedRequest = GatewayUtils.decorateRequest(exchange, body);
return chain.filter(exchange.mutate().request(decoratedRequest).build());
});
}
}

@Component
public class SecondGlobalFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 在这里拦截请求,执行一些逻辑,例如日志记录
System.out.println("第二个拦截器:Request intercepted: " + exchange.getRequest().getURI());
return GatewayUtils.resolveBodyFromRequest(exchange)
.flatMap(body -> {
System.out.println("第二个拦截器:Request body: " + body);
// 包装请求,确保后续过滤器能继续读取请求体
ServerHttpRequest decoratedRequest = GatewayUtils.decorateRequest(exchange, body);
return chain.filter(exchange.mutate().request(decoratedRequest).build());
});
}
}

运行结果,这种方式可以多次读取:

image-20240826173935826

为什么这种方式可以拿到完整的请求体,而上面subscribe的方式拿到的是不完整的?

在Spring WebFlux中,subscribe()flatMap() 的行为存在明显的区别,这也是为什么在 subscribe() 里拿不到完整的请求体,而使用 flatMap() 可以获取到完整的请求体。 DataBufferUtils.join()request.getBody()的区别如下:

  • DataBufferUtils.join(): 它会将请求体的所有 DataBuffer 拼接成一个完整的 DataBuffer,然后可以从这个 DataBuffer 中一次性读取到整个请求体。这使得 flatMap() 可以处理整个请求体,而不是每次处理一块。
  • request.getBody(): 这个方法返回的是一个 Flux<DataBuffer>,表示请求体可能是分块的,即多个 DataBuffer 组成的流。使用 subscribe() 时,每个块会分别触发 onNext,而不是处理整个请求体。因此,你每次只能拿到一部分数据,而不是整个请求体。

为什么 flatMap() 可以拿到完整的请求体?

因为:flatMap() 依赖于 MonoFlux 的链式操作。使用 DataBufferUtils.join() 将请求体的多个 DataBuffer 拼接成一个 Mono<DataBuffer>,再通过 flatMap() 处理完整的 DataBuffer。这是因为 join() 已经帮你收集了完整的请求体数据。而request.getBody() + subscribe() 只能处理每个到达的 DataBuffer,并且每次都处理一部分数据,这导致你无法在 subscribe() 中获取完整的请求体。

更为简便的方法(可行)

首先定义一个全局的过滤器来缓存起来body

@Slf4j
@Component
public class CacheBodyGlobalFilter implements Ordered, GatewayFilter, GlobalFilter {

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return ServerWebExchangeUtils
.cacheRequestBody(
exchange,
(serverHttpRequest) -> chain.filter(
exchange.mutate().request(serverHttpRequest).build()));
}


@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}

}

然后提供一个工具类,在需要用到的地方直接调用就能获取到body了

public class GatewayHttpUtil {
public static String getBodyContent(ServerWebExchange exchange) {
Object attribute = exchange.getAttribute(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR);
if (attribute == null || !(attribute instanceof NettyDataBuffer)) {
return null;
}
MediaType contentType = exchange.getRequest().getHeaders().getContentType();
if (contentType != null && contentType.toString().contains(MediaType.MULTIPART_FORM_DATA_VALUE)) {
return null;
}
NettyDataBuffer nettyDataBuffer = (NettyDataBuffer) attribute;
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(nettyDataBuffer.asByteBuffer());
return charBuffer.toString();
}
}

使用:

@Component
public class SecondFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String bodyContent = GatewayHttpUtil.getBodyContent(exchange);
System.out.println("SecondFilter bodyContent: " + bodyContent);
return chain.filter(exchange);
}

@Override
public int getOrder() {
return 0;
}
}

对于响应体的封装:

@Slf4j
@Component
public class RespBodyLogFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory bufferFactory = response.bufferFactory();
ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (getStatusCode().equals(HttpStatus.OK) && body instanceof Flux) {
// 获取响应 ContentType
String responseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
// 记录 JSON 格式数据的响应体
boolean flag = !StringUtil.isEmpty(responseContentType) && (MediaType.APPLICATION_JSON_VALUE.equals(responseContentType) || MediaType.TEXT_PLAIN_VALUE.equals(responseContentType));
if (flag) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer join = dataBufferFactory.join(dataBuffers);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
DataBufferUtils.release(join);
String responseData = new String(content, StandardCharsets.UTF_8);
// 响应体
System.out.println("响应体: " + responseData);
return bufferFactory.wrap(content);
}));
}
}
return super.writeWith(body);
}
};
return chain.filter(exchange.mutate().response(responseDecorator).build());
}


@Override
public int getOrder() {
return -1100;
}


}

参考源码实现(可行)

ModifyRequestBodyGatewayFilterFactory源码分析

参考资料