Gateway中Body读取问题
问题背景
网关作为所有服务的入口,会对所有的流量进行处理,转发等操作。我们希望在网关中可以完成一些通用的操作,例如,记录请求和响应日志,做权限校验,登录校验等,然而,在Spring Cloud Gateway中进行请求体的读取会有一些问题,在此记录。
问题复现
现在有两个拦截器,都想要读取请求体的内容做一些操作:
@Component
public class FirstGlobalFilter implements GlobalFilter, Ordered {
@Override
public int getOrder() {
return -1; //// 设置过滤器的优先级,数字越小,优先级越高
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 在这里拦截请求,执行一些逻辑,例如日志记录
System.out.println("第一个拦截器:Request intercepted: " + exchange.getRequest().getURI());
ServerHttpRequest request = exchange.getRequest();
String body = GatewayUtils.resolveBodyFromRequest(request);
System.out.println("第一个拦截器:Request body: " + body);
// 继续执行过滤器链
return chain.filter(exchange);
}
}
@Component
public class SecondGlobalFilter implements GlobalFilter, Ordered {
@Override
public int getOrder() {
return 0;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 在这里拦截请求,执行一些逻辑,例如日志记录
System.out.println("第二个拦截器:Request intercepted: " + exchange.getRequest().getURI());
ServerHttpRequest request = exchange.getRequest();
String body = GatewayUtils.resolveBodyFromRequest(request);
System.out.println("第二个拦截器:Request body: " + body);
// 继续执行过滤器链
return chain.filter(exchange);
}
}
最简单的读取方式,也就是环境搭建里用到的,我们在全局拦截器中获取:
public class GatewayUtils {
public static String resolveBodyFromRequest(ServerHttpRequest serverHttpRequest) {
Flux<DataBuffer> body = serverHttpRequest.getBody();
AtomicReference<String> bodyRef = new AtomicReference<>();
body.subscribe(buffer -> {
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());
DataBufferUtils.release(buffer);
bodyRef.set(charBuffer.toString());
});
return bodyRef.get();
}
}
然而这样读取请求体会有问题
运行报错reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Only one connection receive subscriber allowed.
:
问题原因
spring-cloud-gateway反向代理的原理是,首先读取原请求的数据,然后构造一个新的请求,将原请求的数据封装到新的请求中,然后再转发出去。然而我们在他封装之前读取了一次request body,而request body只能读取一次。因此就 出现了上面的错误。
请求体无法正确解析:Spring WebFlux 中的请求体是一个
Flux<DataBuffer>
,它是一个异步的流,不能在同步的代码块(如使用的resolveBodyFromRequest
方法中)直接获取其内容。如果试图使用AtomicReference
来保存请求体,但这种做法是同步的,无法正确处理异步的流。请求体只能读取一次:
ServerHttpRequest.getBody()
只能读取一次,读取后内容就会被消费掉,后续的过滤器将无法再次读取请求体。因此,直接通过多个过滤器读取请求体的做法是不可行的。
我们在拦截器中获取请求的一些信息
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 在这里拦截请求,执行一些逻辑,例如日志记录
ServerHttpRequest request = exchange.getRequest();
System.out.println("第一个拦截器:Request intercepted: " + request.getURI());
HttpHeaders headers = request.getHeaders();
MultiValueMap<String, HttpCookie> cookies = request.getCookies();
MultiValueMap<String, String> queryParams = request.getQueryParams();
Flux<DataBuffer> body = request.getBody();
return chain.filter(exchange);
}
可以看到如果是取 Header、Cookie、Query Params 都很容易,但是如果想要取body,就不容易了。这里的Body返回的是一个Flux<DataBuffer>
,即一个包含 0-N 个 DataBuffer
类型元素的异步序列。如果不考虑Request Body只能读取一次的问题(可以缓存),我们可以先把这个Flux转为字符串,最简单的办法是使用:block
或者subscribe
- 使用
block()
:
Flux<DataBuffer> body = request.getBody();
// 通过block()方法获取请求体中的数据
String bodyStr = body.blockLast().toString();
System.out.println("第一个拦截器:Request body: " + bodyStr);
return chain.filter(exchange);
这种操作相当于把异步流转为了同步,然而Flux
请求体的数据是以流的形式异步传递的。而 blockLast()
会阻塞当前线程,直到 Flux
中的所有数据都被处理完毕,并返回最后一个数据。这意味着将异步流的处理变成了同步处理会引起以下问题:
- 阻塞性:
block()
或blockLast()
会阻塞当前线程,直到数据完全读取完毕。在响应式编程环境中,阻塞会降低系统的性能和响应性,尤其是在高并发的情况下 - 违背响应式设计理念:Spring WebFlux 基于响应式编程模型,整个流程应当是异步和非阻塞的。引入阻塞操作违背了响应式设计理念,可能导致瓶颈和线程阻塞,无法充分利用非阻塞 I/O 的优势。
并且这种方式在运行也会报错: block()/blockFirst()/blockLast() are blocking
- 使用subscirbe
Flux<DataBuffer> body = request.getBody();
// 通过 subscribe 方法异步处理请求体
body.subscribe(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
String bodyStr = new String(bytes, StandardCharsets.UTF_8);
System.out.println("第一个拦截器:Request body: " + bodyStr);
});
这样确实可以读取到body的内容,但是如果说body的长度非常大,就会导致一次性读取不完整的情况,例如:
我这里大概有一万个a,导致一次性无法读取全部的请求体,因为请求体是流的形式。
解决办法
异步处理(无法解决读取多次问题)
针对读取到的数据为空的情况:使用异步处理方式:将读取请求体的逻辑放在 Mono
或 Flux
的异步流中,确保数据能够正确处理。这种方式其实和上面的subscribe差不多,只不过这里使用map
subscribe
是数据流的终端操作,触发流的执行并处理数据(类似于开始消费数据)。
map
是中间操作,用于转换数据流中的元素,返回的是一个新的数据流,不会触发流的执行。map
是惰性求值,只有当终端操作(例如subscribe
)被调用时,才会真正开始处理数据流。
public class GatewayUtils {
// 异步解析请求体
public static Mono<String> resolveBodyFromRequest(ServerWebExchange exchange) {
ServerHttpRequest request = exchange.getRequest();
return DataBufferUtils.join(request.getBody())
.map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return new String(bytes, StandardCharsets.UTF_8);
});
}
}
@Component
public class FirstGlobalFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 在这里拦截请求,执行一些逻辑,例如日志记录
System.out.println("第一个拦截器:Request intercepted: " + exchange.getRequest().getURI());
return GatewayUtils.resolveBodyFromRequest(exchange)
.flatMap(body -> {
System.out.println("第一个拦截器:Request body: " + body);
// 继续执行过滤器链
return chain.filter(exchange);
});
}
}
@Component
public class SecondGlobalFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 在这里拦截请求,执行一些逻辑,例如日志记录
System.out.println("第二个拦截器:Request intercepted: " + exchange.getRequest().getURI());
return GatewayUtils.resolveBodyFromRequest(exchange)
.flatMap(body -> {
System.out.println("第二个拦截器:Request body: " + body);
// 继续执行过滤器链
return chain.filter(exchange);
});
}
}
运行结果:这种方式可以读取到请求体,但是依然不可以保证多次读取
包装请求(可行)
使用 ServerHttpRequestDecorator
来重新包装请求,使请求体可以再次读取,确保后续过滤器能够继续处理。先在全局过滤器中获取,然后再把request重新包装,继续向下传递传递
具体代码如下:
public class GatewayUtils {
// 装饰请求以确保后续过滤器可以读取请求体
public static ServerHttpRequest decorateRequest(ServerWebExchange exchange, String body) {
ServerHttpRequest request = exchange.getRequest();
return new ServerHttpRequestDecorator(request) {
@Override
public Flux<DataBuffer> getBody() {
DataBuffer buffer = new DefaultDataBufferFactory().wrap(body.getBytes(StandardCharsets.UTF_8));
return Flux.just(buffer);
}
};
}
}
@Component
public class FirstGlobalFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 在这里拦截请求,执行一些逻辑,例如日志记录
System.out.println("第一个拦截器:Request intercepted: " + exchange.getRequest().getURI());
return GatewayUtils.resolveBodyFromRequest(exchange)
.flatMap(body -> {
System.out.println("第一个拦截器:Request body: " + body);
// 继续执行过滤器链
ServerHttpRequest decoratedRequest = GatewayUtils.decorateRequest(exchange, body);
return chain.filter(exchange.mutate().request(decoratedRequest).build());
});
}
}
@Component
public class SecondGlobalFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 在这里拦截请求,执行一些逻辑,例如日志记录
System.out.println("第二个拦截器:Request intercepted: " + exchange.getRequest().getURI());
return GatewayUtils.resolveBodyFromRequest(exchange)
.flatMap(body -> {
System.out.println("第二个拦截器:Request body: " + body);
// 包装请求,确保后续过滤器能继续读取请求体
ServerHttpRequest decoratedRequest = GatewayUtils.decorateRequest(exchange, body);
return chain.filter(exchange.mutate().request(decoratedRequest).build());
});
}
}