跳到主要内容

IP归属地设计

我们希望聊天的时候可以展示用户的IP归属地

IP获取

如果是HTTP请求,那么很简单,我们可以写一个拦截器 ,从请求头中获取对应的IP信息,可以借助Hutool工具类实现这一功能,之前也已经做了

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
RequestInfo info = new RequestInfo();
info.setUid(Optional.ofNullable(request.getAttribute(TokenInterceptor.ATTRIBUTE_UID))
.map(Object::toString).map(Long::parseLong).orElse(null));
info.setIp(ServletUtil.getClientIP(request));
RequestHolder.set(info);
return true;
}

如果是nginx做了代理,那么要在nginx中保存用户真实ip到X-Real-IP,否则拿到的就是nginx的ip

location /{
proxy_pass http://127.0.0.1:8080;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}

如果是WebSocket请求,我们知道第一次发送的是HTTP请求,然后进行协议的升级为Websocket,之后就再也获取不到了,所以只能在升级协议之前获取这个IP

public class MyHeaderollectHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
UrlBuilder urlBuilder = UrlBuilder.ofHttp(request.getUri());
Optional<String> tokenOptional = Optional.of(urlBuilder).map(UrlBuilder::getQuery)
.map(k -> k.get("token"))
.map(CharSequence::toString);
// 如果有token,就保存到channel中
tokenOptional.ifPresent(s -> NettyUtils.setAttr(ctx.channel(), NettyUtils.TOKEN, s));
// 去掉token
request.setUri(urlBuilder.getPath().toString());
// 获取用户IP
String ip = request.headers().get("X-Real-IP");
if (StringUtils.isBlank(ip)) {
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
ip = address.getAddress().getHostAddress();
}
// 保存到channel中
NettyUtils.setAttr(ctx.channel(), NettyUtils.IP, ip);
// 处理器只需要用一次,处理完就移除
ctx.pipeline().remove(this);
}
// 触发责任链传递
ctx.fireChannelRead(msg);
}
}

IP更新

我们不可能每次用户请求都去更新用户的IP,那样就太消耗服务器资源了,只有在用户登录或者认证的时候才会去更新IP信息

  • 登录:用户扫码登录
  • 认证:用户带着token认证

IP保存

IP信息比较复杂,我们保存JSON格式到数据库

IpInfo类 :

@Data
public class IpInfo implements Serializable {

private static final long serialVersionUID = 1L;
//注册时的ip
private String createIp;
//注册时的ip详情
private IpDetail createIpDetail;
//最新登录的ip
private String updateIp;
//最新登录的ip详情
private IpDetail updateIpDetail;

public void refreshIp(String ip) {
if (StringUtils.isEmpty(ip)) {
return;
}
updateIp = ip;
if (createIp == null) {
createIp = ip;
}
}

/**
* 需要刷新的ip,这里判断更新ip就够,初始化的时候ip也是相同的,只需要设置的时候多设置进去就行
*
* @return
*/
public String needRefreshIp() {
boolean notNeedRefresh = Optional.ofNullable(updateIpDetail)
.map(IpDetail::getIp)
.filter(ip -> Objects.equals(ip, updateIp))
.isPresent();
return notNeedRefresh ? null : updateIp;
}

public void refreshIpDetail(IpDetail ipDetail) {
if (Objects.equals(createIp, ipDetail.getIp())) {
createIpDetail = ipDetail;
}
if (Objects.equals(updateIp, ipDetail.getIp())) {
updateIpDetail = ipDetail;
}
}
}

详细的IP信息:

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class IpDetail implements Serializable {

private static final long serialVersionUID = 1L;
//注册时的ip
private String ip;
//最新登录的ip
private String isp;
private String isp_id;
private String city;
private String city_id;
private String country;
private String country_id;
private String region;//省
private String region_id;
}

我们的ip_info在数据库中就是json类型的,这个mysql已经支持了

我们希望查询出结果的时候,可以自动解析为我们的实体类 ,那么就需要做以下配置:

  1. 设置@TableName(value = "user", autoResultMap = true)
@TableName(value = "user", autoResultMap = true)
public class User implements Serializable {
  1. 设置@TableField(value = "ip_info", typeHandler = JacksonTypeHandler.class)
/**
* ip信息
*/
@TableField(value = "ip_info", typeHandler = JacksonTypeHandler.class)
private IpInfo ipInfo;

在登录成功的时候,我们去保存IP信息:

    private void loginSuccess(Channel channel, User user, String token) {
// 保存channel对应的uid
WSChannelExtraDTO extra = ONLINE_WS_MAP.get(channel);
extra.setUid(user.getId());
// 推送成功消息
sendMsg(channel, WSAdapter.buildLoginSuccessResp(user, token));
user.setLastOptTime(new Date());
user.refreshIp(NettyUtils.getAttr(channel, NettyUtils.IP));
applicationEventPublisher.publishEvent(new UserOnlineEvent(this, user));
}

这里具体的操作我们放到了User类里面,有点像领域模型设计了 。

User类加如下操作,也就是User类不仅保存信息 ,还有一些具体的操作

public void refreshIp(String ip) {
if (ipInfo == null) {
ipInfo = new IpInfo();
}
ipInfo.refreshIp(ip);
}

IpInfo的刷新操作:

public void refreshIp(String ip) {
if (StringUtils.isEmpty(ip)) {
return;
}
updateIp = ip;
if (createIp == null) {
createIp = ip;
}
}

在监听的时候 UserOnlineEvent做的事情是解析ip,并且保存到数据库中,可以 异步操作。

@Component
public class UserOnlineListener {

@Resource
private IpService ipService;
@Resource
private UserService userService;

@Async
@TransactionalEventListener(classes = UserRegisterEvent.class, phase = TransactionPhase.AFTER_COMMIT)
public void saveDB(UserRegisterEvent event) {
User user = event.getUser();
User update = new User();
update.setId(user.getId());
update.setLastOptTime(user.getLastOptTime());
update.setIpInfo(user.getIpInfo());
update.setActiveStatus(UserActiveStatusEnum.ONLINE.getStatus());
userService.updateById(update);
// 用户ip详情的解析
ipService.refreshIpDetailAsync(user.getId());
}
}

接下来我们具体看IP解析的操作

IP解析

我们解析IP采用的是调用淘宝的接口来进行解析

例如:GET请求

https://ip.taobao.com/outGetIpInfo?ip=121.42.166.230&accessKey=alibaba-inc

得到结果:

{
"data": {
"area": "",
"country": "中国",
"isp_id": "1000323",
"queryIp": "121.42.166.230",
"city": "青岛",
"ip": "121.42.166.230",
"isp": "阿里云",
"county": "",
"region_id": "370000",
"area_id": "",
"county_id": null,
"region": "山东",
"country_id": "CN",
"city_id": "370200"
},
"msg": "query success",
"code": 0
}

但是使用别人的接口解析也会存在问题,淘宝为了防止盗刷接口,对接口做了限流操作,如果我们请求过于频繁,就会被封,因此 我们 需要去限制请求的频率,这里使用核心线程数为1的多线程池做,因为有以下好处

  • 排队,解析IP就是一个一个的任务,存入阻塞队列里,然后从队列里一个一个取出来进行解析,不会出现过大的并发
  • 重试机制:如果解析失败了,可以一段时间后进行重试,并且可以设置重试的次数
  • 异步操作,不会影响主进程执行

具体操作的代码如下

/**
* @author houyunfei
*/
@Slf4j
public class IpServiceImpl implements IpService {
private static ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(500), new NamedThreadFactory("refresh-ipDetail", false));

@Resource
private UserService userService;

@Override
public void refreshIpDetailAsync(Long uid) {
executorService.execute(() -> {
User user = userService.getById(uid);
IpInfo ipInfo = user.getIpInfo();
if (Objects.isNull(ipInfo)) {
return;
}
String ip = ipInfo.needRefreshIp();
if (StringUtils.isBlank(ip)) {
return;
}
IpDetail ipDetail = tryGetIpDetailOrNullThreeTimes(ip);
if (Objects.nonNull(ipDetail)) {
ipInfo.refreshIpDetail(ipDetail);
User update = new User();
update.setId(uid);
update.setIpInfo(ipInfo);
userService.updateById(update);
}
});


}

private static IpDetail tryGetIpDetailOrNullThreeTimes(String ip) {
for (int i = 0; i < 3; i++) {
IpDetail ipDetail = getIpDetailOrNull(ip);
if (Objects.nonNull(ipDetail)) {
return ipDetail;
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
log.error("tryGetIpDetailOrNullThreeTimes error", e);
}
}
return null;
}

private static IpDetail getIpDetailOrNull(String ip) {
String URL = String.format("https://ip.taobao.com/outGetIpInfo?ip=%s&accessKey=alibaba-inc", ip);
// 调用淘宝接口获取ip详情
String data = HttpUtil.get(URL);
ApiResult<IpDetail> result = JsonUtils.toObj(data, new TypeReference<ApiResult<IpDetail>>() {
});
IpDetail detail = result.getData();
return detail;
}
}

整体的流程如下

ip.svg

问题:

在解析的时候可能会报错:

image-20240519182428500

这是解析JSON的时候出现了问题,返回结果中有些字段出现,但是我们的实体类中没有这些字段就出现了报错,显然这太严格了,我们需要做一些设置:

@JsonIgnoreProperties(ignoreUnknown = true)
public class ApiResult<T> {
@JsonIgnoreProperties(ignoreUnknown = true)
public class IpDetail implements Serializable {

线程池缺点:

线程池的等待队列都是保存在内存中,不可靠,而且用的是自己的线程池,需要去设置优雅停机。

可以去实现DisposableBean接口,重写里面的destroy方法:

public class IpServiceImpl implements IpService, DisposableBean {
private static ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(500), new NamedThreadFactory("refresh-ipDetail", false));
@Override
public void destroy() throws Exception {
executorService.shutdown();
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
// 30s后强制关闭
executorService.shutdownNow();
log.error("IpServiceImpl destroy timeout");
}
}
}