跳到主要内容

SpringCloudGateway源码分析

ReadBodyRoutePredicateFactory源码分析

源码地址

image-20240826224545102

doOnNext(objectValue -> ...):在读取完请求体后,使用 doOnNext() 方法将 objectValue(即请求体内容)缓存到 ServerWebExchange 的属性中。

  • exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue) 通过这个语句,将读取到的请求体对象存储在 ServerWebExchange 的属性中,使用键 CACHE_REQUEST_BODY_OBJECT_KEY 进行存储。
  • 这样,后续的操作可以从该缓存中直接获取请求体对象,而不需要再次从流中读取。

缓存的目的是:

  • 避免多次读取请求体,因为在 Reactive 流程中,请求体(request body)只能读取一次,之后流会关闭,不能再次读取。
  • 通过缓存机制,保证即使多次调用、多个过滤器也可以复用相同的请求体内容。

ModifyRequestBodyGatewayFilterFactory源码分析

源码地址

ModifyRequestBodyGatewayFilterFactory主要功能是 修改请求体,并确保修改后的请求体可以继续被其他过滤器或后续操作读取

image-20240826220347645

具体分析:

  1. 获取请求体:
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
Mono<?> modifiedBody = serverRequest.bodyToMono(inClass)
.flatMap(originalBody -> config.getRewriteFunction().apply(exchange, originalBody));

通过 ServerRequest.create() 方法来创建一个 ServerRequest 对象,结合 messageReaders 用于解析请求体。它使用了 bodyToMono() 来将请求体转换为一个 Mono 对象。这意味着请求体将会被读取为一个对象,且支持异步处理。

  1. 修改请求体:

flatMap() 用于异步处理,config.getRewriteFunction() 是对请求体进行处理和修改的函数,它返回修改后的请求体。

  1. 重构后的请求体插入到请求中:

BodyInserters.fromPublisher() 将修改后的 Mono 转换为 BodyInserter,并且将其写入到 CachedBodyOutputMessage 中。

BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, config.getOutClass());
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
bodyInserter.insert(outputMessage, new BodyInserterContext())

CachedBodyOutputMessage 是缓存请求体的核心组件,确保在写入修改后的请求体时,后续的过滤器或处理器仍然可以读取。

  1. 封装新的请求

这里通过 ServerHttpRequestDecorator 来封装修改后的请求,并将其插入到 ServerWebExchange 中,保证后续操作可以使用。

ServerHttpRequest decorator = decorate(exchange, headers, outputMessage);


ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
CachedBodyOutputMessage outputMessage) {
return new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(headers);
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
}
else {
// TODO: this causes a 'HTTP/1.1 411 Length Required' // on
// httpbin.org
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}

@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
}

自己实现读取请求体

本想着参考源码,然后写一个可以读取请求体的方法,但是发现CachedBodyOutputMessage的访问权限变了

image-20240826222404128

但实际上我只有知道decorate和release方法在干嘛就可以做到了:

image-20240826222905408

此时可以实现缓存请求体,修改请求体的功能,以及自己去对请求体做一些参数校验,登录等功能

完整代码

@Component
public class FirstGlobalFilter implements GlobalFilter, Ordered {

@Override
public int getOrder() {
return -1; // 优先级高的过滤器
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerRequest serverRequest = ServerRequest.create(exchange, HandlerStrategies.withDefaults().messageReaders());
MediaType contentType = exchange.getRequest().getHeaders().getContentType();
// read & modify request
Mono<String> modifyBody = serverRequest.bodyToMono(String.class)
.flatMap(body -> {
if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) {
System.out.println("FirstGlobalFilter: " + body);
// todo 对请求体进行修改/校验
String newBody = body; // 修改后的请求体
return Mono.just(newBody);
}
return Mono.empty();
});

BodyInserter bodyInserter = BodyInserters.fromPublisher(modifyBody, String.class);
// the new content type will be computed by bodyInserter
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
// the new content type will be computed by bodyInserter
// and then set in the request decorator
headers.remove(HttpHeaders.CONTENT_LENGTH);
// if the body is changing content types, set it here, to the bodyInserter
// will know about it
if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) {
headers.setContentType(MediaType.APPLICATION_JSON);
}
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext())
.then(Mono.defer(() -> {
// ServerHttpRequest decorator = decorate(exchange, headers, outputMessage);
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(headers);
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
} else {
// TODO: this causes a 'HTTP/1.1 411 Length Required' // on
// httpbin.org
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}

@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
return chain.filter(exchange.mutate().request(decorator).build());
}))
.onErrorResume(t -> {
// todo if an error happens, we can use the original body
return chain.filter(exchange);
});

}

}