跳到主要内容

重试机制实现

什么是重试机制?

重试机制:就是当调用端发起的请求失败时,RPC框架自身可以进行重试,再重新发送请求,用户可以自行设置是否开启重试以及重试次数。

调用端在发起 RPC 调用时,会经过负载均衡,选择一个节点,之后它会向这个节点发送请求信息。当消息发送失败或收到异常消息时,我们就可以捕获异常,根据异常触发重试,重新通过负载均衡选择一个节点发送请求消息,并且记录请求的重试次数,当重试次数达到用户配置的重试次数的时候,就返回给调用端动态代理一个失败异常,否则就一直重试下去。

为什么要重试机制?

重试机制的主要原因有以下几点:

  1. 提高接口的可用性和可靠性:当远程服务调用失败时,比如网络抖动导致请求失败,重试机制可以让系统自动重新发起请求,尽量保证接口能够成功执行。

  2. 处理临时性的错误:一些临时性的错误,比如网络超时、连接异常等,可能会导致单次请求失败。重试机制可以自动处理这类临时性错误,提高成功率。

  3. 降低调用端的复杂度:如果没有重试机制,调用端需要自行捕获异常,并手动重试,这会增加调用端代码的复杂度。重试机制可以将这部分逻辑封装在RPC框架内部,降低调用端的开发难度。

  4. 实现幂等性:试机制要求被调用的服务具有幂等性,即多次执行同一个操作,不会产生副作用。这可以进一步提高系统的稳定性和可靠性。

重试机制是提高分布式系统可用性和容错性的一种有效手段,能够提高系统的整体可靠性。

https://blog.csdn.net/zhizhengguan/article/details/121451100

RPC框架的重试机制:当调用端发起的请求失败时,如果配置了异常重试机制,RPC框架会捕捉异常,对异常进行判定,符合条件的进行重试。

在重试的过程中,为了能够在约定的时间内进行安全可靠的重试,在每次触发重试之前,我们需要先判定下这个请求是否已经超时,如果超时了会直接返回超时异常,否则我们需要重置下这个请求的超时时间,防止因为多次重试而导致这个请求的处理时间超过用户配置的超时时间,从而影响到业务处理的耗时。

在发起重试、负载均衡选择节点的时候,我们应该去掉重试之前出现过问题的那个节点,这样可以提高重试的成功率,并且允许用户配置可重试异常的白名单,这样可以让RPC框架的异常重试功能变得更加友好。

另外,在使用RPC框架的重试机制时,我们要确保被调用的服务的业务逻辑是幂等的,这样才能考虑是否使用重试

重试机制有哪些?

在 RPC 系统中,常见的重试策略主要包括以下几种:

  1. 固定间隔重试策略(Fixed Interval Retry):

    • 每次重试之间固定一个时间间隔,例如 3 秒。
    • 适用于对响应时间要求不太严格的场景。
  2. 指数退避重试策略(Exponential Backoff Retry):

    • 每次重试的时间间隔呈指数增长,例如 1 秒、2 秒、4 秒、8 秒等。
    • 适用于网络波动较大的场景,避免短时间内发送大量重复请求。
  3. 线性重试策略(Linear Retry):

    • 每次重试之间的时间间隔是线性增加的,例如 1 秒、2 秒、3 秒等。
    • 介于固定间隔和指数退避之间,适用于一般的网络环境。
  4. 随机重试策略(Random Retry):

    • 每次重试的时间间隔是随机的,在一定范围内波动。
    • 适用于避免重试请求同步的场景,例如防止雪崩效应。
  5. 断路器重试策略(Circuit Breaker Retry):

    • 结合断路器模式,当服务出现多次失败时,暂时断开对该服务的调用。
    • 当服务恢复正常后,再逐步恢复对该服务的调用。
    • 适用于依赖服务不稳定的场景,可以有效防止级联故障。
  6. 重试次数限制策略(Retry Limit):

    • 设置最大重试次数,超过限制则放弃重试。
    • 与其他重试策略配合使用,防止无限重试耗尽资源。
  7. 不重试(No Retry)

    • 当远程调用失败时,直接返回失败结果,不进行任何重试。
    • 这种方式适用于对响应时间要求较高的场景,或者对于一些幂等性较强的操作。

这些重试策略各有适用场景,在实际的 RPC 系统设计中,通常会根据业务特点和性能需求,选择合适的重试策略或者组合使用多种策略。

例如,在一个对响应时间要求较高的 RPC 系统中,可以采用指数退避重试策略,并设置重试次数限制,以兼顾服务的可靠性和性能。而在一个对可用性要求更高的系统中,则可以考虑使用断路器重试策略。

重试策略实现

我们需要搞清楚几个问题:

  1. 什么时候,什么条件重试
  2. 重试时间,下一次重试时间
  3. 什么时候,什么条件停止重试
  4. 重试之后要做什么?

重试策略接口定义

代码如下:

/**
* 重试策略接口
*/
public interface RetryStrategy {
/**
* 重试
* @param callable 重试的方法 代表一个任务
* @return
* @throws Exception
*/
RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception;
}

解释:

RetryStrategy 接口定义了重试策略的标准,包括以下方法:doRetry(Callable<RpcResponse> callable):

  • 该方法接受一个 Callable 对象作为参数,表示需要重试的方法调用。
  • 方法实现需要根据具体的重试策略,决定是否需要重试,并执行重试操作。
  • 如果重试成功,则返回调用结果 RpcResponse。如果重试失败,则抛出异常。

引入Google的Guava-Retrying库

<!--        重试策略-->
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
</dependency>

不重试策略

实现最简单,我们直接返回即可

@Slf4j
public class NoRetryStrategy implements RetryStrategy {
/**
* 重试
*
* @param callable 重试的方法 代表一个任务
* @return
* @throws Exception
*/
@Override
public RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception {
return callable.call();
}
}

固定重试间隔策略

我们使用了 google-guava 提供的 Retryer 工具来实现重试逻辑。具体实现如下:

  1. doRetry() 方法是 RetryStrategy 接口的实现,它接收一个 Callable 作为需要重试的任务。
  2. 在方法中,首先创建了一个 Retryer 对象,并配置了以下重试策略:
    • retryIfExceptionOfType(Exception.class): 遇到任何异常类型都进行重试。
    • withWaitStrategy(WaitStrategies.fixedWait(3L, TimeUnit.SECONDS)): 每次重试之间固定间隔 3 秒。
    • withStopStrategy(StopStrategies.stopAfterAttempt(3)): 最多重试 3 次,超过则停止重试。
    • withRetryListener(new RetryListener() { ... }): 添加了一个重试监听器,在每次重试时打印当前重试次数。
  3. 最后调用 retryer.call(callable) 执行重试操作,并返回最终的调用结果 RpcResponse

这个重试策略的特点如下:

  1. 固定时间间隔: 每次重试之间都有 3 秒的固定时间间隔,这种策略适用于对响应时间要求不太严格的场景。

  2. 有限重试次数: 最多重试 3 次,超过则停止重试。这可以避免无限重试导致资源耗尽的问题。

  3. 异常捕获: 对任何异常类型都进行捕获并重试,适用于比较通用的重试场景。

    1. 重试监听: 添加了重试监听器,可以在每次重试时打印日志,方便问题排查。
/**
* @author houyunfei
* 固定时间间隔重试策略
*/
@Slf4j
public class FixedIntervalRetryStrategy implements RetryStrategy {

/**
* 重试
* @param callable 重试的方法 代表一个任务
* @return
* @throws Exception
*/
@Override
public RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception {
Retryer<RpcResponse> retryer = RetryerBuilder.<RpcResponse>newBuilder()
.retryIfExceptionOfType(Exception.class)
.withWaitStrategy(WaitStrategies.fixedWait(3L, TimeUnit.SECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
log.info("重试第{}次", attempt.getAttemptNumber());
}
}).build();
return retryer.call(callable);
}
}

指数退避重试策略

我们可以实现一个指数退避重试策略。主要步骤如下:

  1. 定义最大重试次数 MAX_RETRY_TIMES 为 5 次。
  2. doRetry() 方法中,使用 Stopwatch 来记录每次重试的耗时。
  3. 在每次重试时,先调用 callable.call() 执行远程调用。
  4. 如果出现异常,则进行重试处理:
    • 记录当前重试次数 retryTimes
    • 计算本次重试的退避时间 sleepTime。初始退避时间为 100 毫秒,每次重试时退避时间翻倍。
    • 如果 sleepTime 大于 0,则通过 Thread.sleep() 进行退避延迟。
    • 如果重试次数达到上限,则抛出异常。
  5. 如果重试成功,则直接返回结果 RpcResponse

这种指数退避重试策略可以有效应对网络抖动和服务短暂不可用的情况。它会逐步增加重试间隔,避免在短时间内大量重复请求,从而降低系统负载。

import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ExponentialBackoffRetryStrategy implements RetryStrategy {
private static final int MAX_RETRY_TIMES = 5;
private static final long INITIAL_BACKOFF_INTERVAL = 100; // 初始退避时间100毫秒

@Override
public RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception {
int retryTimes = 0;
long backoffInterval = INITIAL_BACKOFF_INTERVAL;
Stopwatch stopwatch = Stopwatch.createUnstarted();

while (retryTimes < MAX_RETRY_TIMES) {
try {
stopwatch.start();
return callable.call();
} catch (Exception e) {
retryTimes++;
log.warn("RPC call failed, retrying... Current retry times: {}", retryTimes, e);

stopwatch.stop();
long elapsedTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
long sleepTime = Math.min(backoffInterval, Math.max(0, backoffInterval - elapsedTime));
stopwatch.reset();

if (sleepTime > 0) {
log.info("Backing off for {} ms before next retry.", sleepTime);
Thread.sleep(sleepTime);
}

backoffInterval *= 2; // 指数退避
}
}

throw new Exception("Maximum retry times exceeded, giving up.");
}
}

callable.call() 是 Java 中 Callable 接口的一个方法,用于执行一个可以返回结果的任务。

在 Java 中, Callable 是一个函数式接口,它包含一个名为 call() 的方法,该方法声明为 throws Exception。这意味着 call() 方法可能会抛出任何类型的异常。

以下是 Callable 接口的定义:

@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

在我们的 ExponentialBackoffRetryStrategy 类中, callable.call() 用于执行需要重试的远程调用任务。具体工作流程如下:

  1. doRetry() 方法被调用时,会传入一个 Callable<RpcResponse> 对象作为参数。这个 Callable 对象代表了需要执行的远程调用任务。
  2. doRetry() 方法内部,我们会调用 callable.call() 来执行这个任务。
  3. 如果 call() 方法执行成功,则直接返回结果 RpcResponse
  4. 如果 call() 方法抛出异常,则进入重试流程。

使用 Callable 的好处是:

  1. 返回结果: Callable 可以返回一个计算结果,而 Runnable 只能执行一个任务,无法返回结果。
  2. 异常处理: Callablecall() 方法可以声明抛出异常,而 Runnablerun() 方法不能抛出异常。这使得我们可以更好地处理任务执行过程中的异常情况。
  3. 灵活性: Callable 是一个函数式接口,可以方便地使用 lambda 表达式或方法引用来创建任务对象。

线性重试策略

这个实现与之前的指数退避重试策略非常相似,主要区别在于退避时间的计算方式:

  1. 在每次重试时,我们将退避时间 backoffInterval 线性增加,初始值为 1 秒。
  2. 具体计算方式为 backoffInterval += INITIAL_BACKOFF_INTERVAL。这样每次重试时,退避时间都会增加 1 秒。
  3. 其他部分,如最大重试次数、异常处理、日志记录等,与指数退避重试策略保持一致。

这种线性重试策略适用于网络环境相对较为稳定的场景,对响应时间要求也不太严格。它能够提供一个平滑的重试过程,不会像指数退避那样导致重试间隔时间过长。

与指数退避相比,线性重试的优点是:

  1. 响应时间更短:每次重试的时间间隔增长较缓慢,可以更快地得到服务响应。
  2. 更加稳定:重试间隔变化平缓,不会出现大幅波动。

缺点是:

  1. 对网络抖动不太敏感:当网络环境较差时,线性重试可能无法有效地抑制重试请求。

如果对响应时间要求较高,且网络环境较为稳定,线性重试策略是一个不错的选择。

import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

@Slf4j
public class LinearRetryStrategy implements RetryStrategy {
private static final int MAX_RETRY_TIMES = 5;
private static final long INITIAL_BACKOFF_INTERVAL = 1000; // 初始退避时间1秒

@Override
public RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception {
int retryTimes = 0;
long backoffInterval = INITIAL_BACKOFF_INTERVAL;
Stopwatch stopwatch = Stopwatch.createUnstarted();

while (retryTimes < MAX_RETRY_TIMES) {
try {
stopwatch.start();
return callable.call();
} catch (Exception e) {
retryTimes++;
log.warn("RPC call failed, retrying... Current retry times: {}", retryTimes, e);

stopwatch.stop();
long elapsedTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
long sleepTime = Math.min(backoffInterval, Math.max(0, backoffInterval - elapsedTime));
stopwatch.reset();

if (sleepTime > 0) {
log.info("Backing off for {} ms before next retry.", sleepTime);
Thread.sleep(sleepTime);
}

backoffInterval += INITIAL_BACKOFF_INTERVAL; // 线性增加退避时间
}
}

throw new Exception("Maximum retry times exceeded, giving up.");
}
}