SpringCloudGateway源码分析
ReadBodyRoutePredicateFactory源码分析
doOnNext(objectValue -> ...)
:在读取完请求体后,使用 doOnNext()
方法将 objectValue
(即请求体内容)缓存到 ServerWebExchange
的属性中。
exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue)
通过这个语句,将读取到的请求体对象存储在ServerWebExchange
的属性中,使用键CACHE_REQUEST_BODY_OBJECT_KEY
进行存储。- 这样,后续的操作可以从该缓存中直接获取请求体对象,而不需要再次从流中读取。
缓存的目的是:
- 避免多次读取请求体,因为在 Reactive 流程中,请求体(
request body
)只能读取一次,之后流会关闭,不能再次读取。 - 通过缓存机制,保证即使多次调用、多个过滤器也可以复用相同的请求体内容。
ModifyRequestBodyGatewayFilterFactory源码分析
ModifyRequestBodyGatewayFilterFactory主要功能是 修改请求体,并确保修改后的请求体可以继续被其他过滤器或后续操作读取
具体分析:
- 获取请求体:
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
Mono<?> modifiedBody = serverRequest.bodyToMono(inClass)
.flatMap(originalBody -> config.getRewriteFunction().apply(exchange, originalBody));
通过 ServerRequest.create()
方法来创建一个 ServerRequest
对象,结合 messageReaders
用于解析请求体。它使用了 bodyToMono()
来将请求体转换为一个 Mono
对象。这意味着请求体将会被读取为一个对象,且支持异步处理。
- 修改请求体:
flatMap()
用于异步处理,config.getRewriteFunction()
是对请求体进行处理和修改的函数,它返回修改后的请求体。
- 重构后的请求体插入到请求中:
BodyInserters.fromPublisher()
将修改后的 Mono
转换为 BodyInserter
,并且将其写入到 CachedBodyOutputMessage
中。
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, config.getOutClass());
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
bodyInserter.insert(outputMessage, new BodyInserterContext())
CachedBodyOutputMessage
是缓存请求体的核心组件,确保在写入修改后的请求体时,后续的过滤器或处理器仍然可以读取。
- 封装新的请求
这里通过 ServerHttpRequestDecorator 来封装修改后的请求,并将其插入到 ServerWebExchange 中,保证后续操作可以使用。
ServerHttpRequest decorator = decorate(exchange, headers, outputMessage);
ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
CachedBodyOutputMessage outputMessage) {
return new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(headers);
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
}
else {
// TODO: this causes a 'HTTP/1.1 411 Length Required' // on
// httpbin.org
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}
@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
}
自己实现读取请求体
本想着参考源码,然后写一个可以读取请求体的方法,但是发现CachedBodyOutputMessage的访问权限变了
但实际上我只有知道decorate和release方法在干嘛 就可以做到了:
此时可以实现缓存请求体,修改请求体的功能,以及自己去对请求体做一些参数校验,登录等功能
完整代码
@Component
public class FirstGlobalFilter implements GlobalFilter, Ordered {
@Override
public int getOrder() {
return -1; // 优先级高的过滤器
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerRequest serverRequest = ServerRequest.create(exchange, HandlerStrategies.withDefaults().messageReaders());
MediaType contentType = exchange.getRequest().getHeaders().getContentType();
// read & modify request
Mono<String> modifyBody = serverRequest.bodyToMono(String.class)
.flatMap(body -> {
if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) {
System.out.println("FirstGlobalFilter: " + body);
// todo 对请求体进行修改/校验
String newBody = body; // 修改后的请求体
return Mono.just(newBody);
}
return Mono.empty();
});
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifyBody, String.class);
// the new content type will be computed by bodyInserter
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
// the new content type will be computed by bodyInserter
// and then set in the request decorator
headers.remove(HttpHeaders.CONTENT_LENGTH);
// if the body is changing content types, set it here, to the bodyInserter
// will know about it
if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) {
headers.setContentType(MediaType.APPLICATION_JSON);
}
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext())
.then(Mono.defer(() -> {
// ServerHttpRequest decorator = decorate(exchange, headers, outputMessage);
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(headers);
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
} else {
// TODO: this causes a 'HTTP/1.1 411 Length Required' // on
// httpbin.org
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}
@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
return chain.filter(exchange.mutate().request(decorator).build());
}))
.onErrorResume(t -> {
// todo if an error happens, we can use the original body
return chain.filter(exchange);
});
}
}