跳到主要内容

API 开放平台的系统架构优化

为了提高 API 开放平台的性能和可扩展性,需要对系统架构进行优化。本文主要介绍引入网关优化

统计用户调用接口次数

为了更好地管理用户对接口的使用,开发接口调用次数统计功能。当用户每次调用接口成功时,次数加 1。同时,我们设计用户接口关系表user_interface_info,用于存储用户与接口的调用关系,包括用户 ID、接口 ID、总调用次数、剩余调用次数、状态、创建时间、更新时间和是否删除等字段。

业务流程:

  1. 用户每次调用接口成功,次数+1
  2. 给用户分配或用户自主申请接口调用次数

用户接口关系表user_interface_info

字段类型说明
idbigint主键
userIdbigint调用用户id
interfaceInfoIdbigint接口id
totalNumint总调用次数
leftNumint剩余调用次数
statusint0-正常 1-禁用
createTimedatetime创建时间
updateTimedatetime更新时间
isDeletetinyint是否删除

调用次数增加

后端代码:

    @Override  
public boolean invokeCount(long interfaceInfoId, long userId) {
// 判断
if (interfaceInfoId <= 0 || userId <= 0) {
throw new BusinessException(ErrorCode.PARAMS_ERROR);
}
UpdateWrapper<UserInterfaceInfo> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("interfaceInfoId", interfaceInfoId);
updateWrapper.eq("userId", userId);
updateWrapper.setSql("leftNum = leftNum - 1, totalNum = totalNum + 1");
return this.update(updateWrapper);
}

那么,如何在每次调用接口时实现次数统计呢?我们有以下两种方式:

  1. 使用 AOP 切面:AOP(面向切面编程)切面可以在不修改原有接口代码的情况下,实现对接口调用次数的统计。其优点是独立于接口,能够在每个接口调用后自动统计次数加 1。然而,AOP 切面也存在一些缺点。它通常只存在于单个项目中,如果每个团队都要开发自己的模拟接口,那么每个团队都需要写一个切面,这会导致代码的重复和维护成本的增加。

  2. 使用网关:网关可以统一处理接口调用次数的统计问题。它的优势在于能够集中管理和控制接口的访问,对所有通过网关的接口调用进行统一的处理。相比 AOP 切面,网关的优势更加明显。它可以避免每个团队都要开发和维护自己的统计逻辑,减少了重复工作。同时,网关还可以处理其他与接口调用相关的问题,如路由、鉴权、限流等,提供了更全面的接口管理功能。

API网关

网关的作用

在现代分布式系统中,API 网关扮演着至关重要的角色,它是客户端与后端服务之间的桥梁,负责处理各种请求和响应。

  1. 路由:根据请求的目标地址,将请求准确地路由到相应的后端服务,确保请求能够得到正确的处理。
  2. 鉴权:对请求进行身份验证和权限验证,只有经过授权的用户才能访问特定资源,保障系统的安全性。
  3. 跨域:有效处理跨域请求,使得客户端能够从不同的源(域)访问服务端资源,增强了系统的开放性。
  4. 缓存:缓存经常被请求的数据,减少对后端服务的重复请求,从而提高系统的性能和响应速度。
  5. 流量染色:通过对流量进行标记,方便后续对流量进行分析和处理,通常是在请求头中添加新的请求头。
  6. 访问控制:根据配置的规则,对请求进行访问控制,包括允许或拒绝特定条件下的请求,以及防范 DDos 攻击等。
  7. 统一业务处理:对请求进行统一的预处理和后处理,例如对请求参数进行处理、统一响应格式等,提高了系统的一致性和可维护性。
  8. 发布控制:能够控制服务的发布,例如在上线新接口时,可以实现灰度发布,先给新接口分配 20% 的流量,老接口分配 80% 的流量,然后再慢慢调整比重,确保系统的稳定性。
  9. 负载均衡:将请求分发到多个后端服务实例上,以平衡负载,提高系统的可用性和性能。
  10. 接口保护:
    • 限制请求:对请求进行限制,防止恶意或异常请求对系统造成影响。
    • 信息脱敏:对返回给客户端的数据进行脱敏处理,保护用户的隐私。
    • 降级(熔断):当系统负载过高或出现故障时,暂时关闭或降级服务,防止系统崩溃。
    • 限流:对请求进行限流,避免过载和性能下降,常用的限流算法有令牌桶算法、漏桶算法、RedisLimitHandler 等。
    • 超时时间:设置请求的超时时间,防止长时间的请求占用资源。
  11. 统一日志:记录所有请求和响应的日志,方便进行监控和故障排查,有助于及时发现和解决问题。
  12. 统一文档:生成和管理服务的文档,为开发者提供统一的接口文档参考,提高开发效率。

负载均衡与发布控制

负载均衡可以通过将 uri 从固定地址改成lb:xxx来实现,这样可以更好地分配请求到不同的后端服务实例上。

发布控制方面,灰度发布是一种常用的策略,例如上线新接口时,先给新接口分配 20% 的流量,老接口分配 80% 的流量,然后逐渐调整比重,以确保新接口的稳定性和可靠性。

网关分类

  1. 全局网关(接入层网关):主要作用是实现负载均衡、请求日志等功能,不与具体的业务逻辑绑定。
  2. 业务网关(微服务网关):会包含一些业务逻辑,其主要作用是将请求转发到不同的业务、项目、接口或服务。

实现方式

  • Nginx(全局网关)、Kong 网关(API 网关)。
  • Spring Cloud Gateway(取代了 Zuul):性能高,可使用 Java 代码写逻辑。

Spring Cloud Gateway

Spring Cloud Gateway 是基于 Spring Framework 5、Project Reactor 和 Spring Boot 2.0 构建的网关,它提供了一种简单而有效的方式来路由和过滤 HTTP 请求。

官网地址:Spring Cloud Gateway:https://spring.io/projects/spring-cloud-gatewa

核心概念

路由(根据什么条件,转发请求到哪里) 断言:一组规则、条件,用来确定如何转发路由 过滤器:对请求进行一系列的处理,比如添加请求头、添加请求参数

处理流程

  1. 客户端发起请求:客户端向 API 网关发送请求。
  2. Handler Mapping:根据断言,将请求转发到对应的路由。
  3. Web Handler:处理请求,通过一层层的过滤器对请求进行处理。
  4. 实际调用:最后将请求转发到后端服务进行实际的处理。

配置方式

它有两种配置方式:编程式和配置式。我们通过配置断言和过滤器,实现了前缀匹配路由、添加请求头和参数、降级等功能。

网关日志开启:

logging:  
level:
org:
springframework:
cloud:
gateway: trace

断言类型

断言类型包括:

  1. After 在 x 时间之后。
  2. Before 在 x 时间之前。
  3. Between 在 x 时间之间。
  4. 请求类别。
  5. 请求头(包含 Cookie)。
  6. 查询参数。
  7. 客户端地址。
  8. 权重。

过滤器功能

过滤器的基本功能包括对请求头、请求参数、响应头的增删改查,具体功能如下:

  1. 添加请求头。
  2. 添加请求参数。
  3. 添加响应头。
  4. 降级。
  5. 限流。
  6. 重试。

具体实现

前缀匹配路由可以实现将所有路径为/api/**的请求进行转发,例如,将网关请求http://localhost:10001/api/name/get?name=cxk转发到http://localhost:10002/api/name/get?name=cxk(假设 interface 后端端口为 10002,网关后端端口为 10001)。

routes:  
- id: api_route
uri: http://localhost:10002
predicates:
- Path=/api/**
filters:
- AddRequestHeader=yunfei, swag
- AddRequestParameter=name, dog
- name: CircuitBreaker
args:
name: myCircuitBreaker
fallbackUri: forward:/fallback
@GetMapping("/get")  
public String getNameByGet(String name, HttpServletRequest request) {
System.out.println(request.getHeader("yunfei"));
String name1 = request.getParameter("name");
System.out.println("name1="+name1);
return "GET 你的名字是" + name;
}

降级:

    <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
</dependency>

并使用 GlobalFilter 全局拦截处理。

业务逻辑

  1. 用户发送请求到 API 网关。
  2. 记录请求日志。
  3. 进行黑白名单验证。
  4. 用户鉴权(判断 ak、sk 是否合法)。
  5. 检查请求的模拟接口是否存在。
  6. 请求转发,调用模拟接口。
  7. 记录响应日志。
  8. 调用成功,接口调用次数 + 1。
  9. 调用失败,返回一个规范的错误码。

前缀匹配路由将所有路径为/api/的请求进行转发,例如,将网关请求http://localhost:10001/api/name/get?name=cxk转发到http://localhost:10002/api/name/get?name=cxk

gateway:  
default-filters:
- AddResponseHeader=source, yunfei
routes:
- id: api_route
uri: http://localhost:10002
predicates:
- Path=/api/**

网关代码:

/**  
* 全局过滤
*
* * */@Slf4j
@Component
public class CustomGlobalFilter implements GlobalFilter, Ordered {

@DubboReference
private InnerUserService innerUserService;

@DubboReference
private InnerInterfaceInfoService innerInterfaceInfoService;

@DubboReference
private InnerUserInterfaceInfoService innerUserInterfaceInfoService;

private static final List<String> IP_WHITE_LIST = Arrays.asList("127.0.0.1","0:0:0:0:0:0:0:1%0");

private static final String INTERFACE_HOST = "http://localhost:10002";

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 1. 请求日志
ServerHttpRequest request = exchange.getRequest();
String path = INTERFACE_HOST + request.getPath().value();
String method = request.getMethod().toString();
log.info("请求唯一标识:" + request.getId());
log.info("请求路径:" + path);
log.info("请求方法:" + method);
log.info("请求参数:" + request.getQueryParams());
String sourceAddress = request.getLocalAddress().getHostString();
log.info("请求来源地址:" + sourceAddress);
log.info("请求来源地址:" + request.getRemoteAddress());
ServerHttpResponse response = exchange.getResponse();
// 2. 访问控制 - 黑白名单
if (!IP_WHITE_LIST.contains(sourceAddress)) {
response.setStatusCode(HttpStatus.FORBIDDEN);
return response.setComplete();
}
// 3. 用户鉴权(判断 ak、sk 是否合法)
HttpHeaders headers = request.getHeaders();
String accessKey = headers.getFirst("accessKey");
String nonce = headers.getFirst("nonce");
String timestamp = headers.getFirst("timestamp");
String sign = headers.getFirst("sign");
String body = headers.getFirst("body");
// todo 实际情况应该是去数据库中查是否已分配给用户
User invokeUser = null;
try {
invokeUser = innerUserService.getInvokeUser(accessKey);
} catch (Exception e) {
log.error("getInvokeUser error", e);
}
if (invokeUser == null) {
return handleNoAuth(response);
}
// if (!"yunfei".equals(accessKey)) {
// return handleNoAuth(response);
// }
if (Long.parseLong(nonce) > 10000L) {
return handleNoAuth(response);
}
// 时间和当前时间不能超过 5 分钟
Long currentTime = System.currentTimeMillis() / 1000;
final Long FIVE_MINUTES = 60 * 5L;
if ((currentTime - Long.parseLong(timestamp)) >= FIVE_MINUTES) {
return handleNoAuth(response);
}
// 实际情况中是从数据库中查出 secretKey String secretKey = invokeUser.getSecretKey();
String serverSign = SignUtils.genSign(body, secretKey);
if (sign == null || !sign.equals(serverSign)) {
return handleNoAuth(response);
}
// 4. 请求的模拟接口是否存在,以及请求方法是否匹配
InterfaceInfo interfaceInfo = null;
try {
interfaceInfo = innerInterfaceInfoService.getInterfaceInfo(path, method);
} catch (Exception e) {
log.error("getInterfaceInfo error", e);
}
if (interfaceInfo == null) {
return handleNoAuth(response);
}
}

/**
* 处理响应
*
* @param exchange
* @param chain
* @return
*/
public Mono<Void> handleResponse(ServerWebExchange exchange, GatewayFilterChain chain, long interfaceInfoId, long userId) {
try {
ServerHttpResponse originalResponse = exchange.getResponse();
// 缓存数据的工厂
DataBufferFactory bufferFactory = originalResponse.bufferFactory();
// 拿到响应码
HttpStatus statusCode = originalResponse.getStatusCode();
if (statusCode == HttpStatus.OK) {
// 装饰,增强能力
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
// 等调用完转发的接口后才会执行
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
log.info("body instanceof Flux: {}", (body instanceof Flux));
if (body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
// 往返回值里写数据
// 拼接字符串
return super.writeWith(
fluxBody.map(dataBuffer -> {
// 7. 调用成功,接口调用次数 + 1 invokeCount try {
innerUserInterfaceInfoService.invokeCount(interfaceInfoId, userId);
} catch (Exception e) {
log.error("invokeCount error", e);
}
byte[] content = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(content);
DataBufferUtils.release(dataBuffer);//释放掉内存
// 构建日志
StringBuilder sb2 = new StringBuilder(200);
List<Object> rspArgs = new ArrayList<>();
rspArgs.add(originalResponse.getStatusCode());
String data = new String(content, StandardCharsets.UTF_8); //data
sb2.append(data);
// 打印日志
log.info("响应结果:" + data);
return bufferFactory.wrap(content);
}));
} else {
// 8. 调用失败,返回一个规范的错误码
log.error("<--- {} 响应code异常", getStatusCode());
}
return super.writeWith(body);
}
};
// 设置 response 对象为装饰过的
return chain.filter(exchange.mutate().response(decoratedResponse).build());
}
return chain.filter(exchange); // 降级处理返回数据
} catch (Exception e) {
log.error("网关处理响应异常" + e);
return chain.filter(exchange);
}
}

@Override
public int getOrder() {
return -1;
}

public Mono<Void> handleNoAuth(ServerHttpResponse response) {
response.setStatusCode(HttpStatus.FORBIDDEN);
return response.setComplete();
}
}