- 《架构世界》2020微服务刊:微服务分布式事务实战
- 普元信息
- 1450字
- 2020-11-18 15:22:42
.实现细节
动态路由管理
作为所有请求流量的入口,在实际生产环境中为了保证高可靠和高可用,尽量避免重启,需要实现 动态路由配置。实现动态路由其实很简单,重点在于 这个接口. 这个接口继承自两个接口,其中 是用来加载路由的.它有很多实现类,其中的 就用来实现从 中加载路由. 另一个 用来实现路由的添加与删除. 通过查看 的源码可以发现,在 . . . . . 中这么一段:
@Bean
@ConditionalOnMissingBean(RouteDefinitionRepository.class)
public InMemoryRouteDefinitionRepositoryinMemoryRouteDefinitionRepository() {
return new InMemoryRouteDefinitionRepository();
}
可以看出,网关中如果没有
的 ,就会采用 做为实现.这个 有一个问题,就是数据没有持久化,网关重启之后,原来通过接口设置的路由就会丢失了.这当然是不可接受的,所以我们需要实现自已的
,来提供路由配置信息. 如使用 做为存储,来实现路由的存储.具体实现请参考文章: :// . . / / / /除此以外,每当路由更改之后,还需要通知网关刷新路由.这需要发送
来通知网关. 如下列示例:@Component
public classRouteDynamicServiceimplementsApplicationEventPublisherAware{
private ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher= publisher;
}
/**
*
刷新路由表
*/
() {
. ( ( ));
}
}
刷新可以通过消息通知机制来触发,当然,也可以对外接供
接口,手动触发.数据存储
如上述类图所示,
为治理数据统一存储接口. 为实现的它的抽像类,它需要依赖两个,一个是 ,用来实现 数据的存储.另一个为 ,用来为各治数对象生成对应的 . 下面则为各个治理数据存储的实现类.使用 做为持久存储时,需要注意以下几点:)为对象生成 时,建议为 添加一个命名空间(就是加一段有意义的前缀)
)在 中进行模糊搜索时,提供给 的 ,不能是一个正则的通配,它支持三种通配 *(多个),?(单个),
)如果数据量比较大,不建议使用 进行模糊查询,应该使用 方式
数据缓存
我们提供了内部缓存,它处于使用者与持久存储之间,缓存数据以提升性能. 缓存的实现主要有如下几点:
)实现了 以实现在网关启动时,自动加载数据
)内部使用了 ,保证写时的线程同步,又保证了 时的高效( 整个过程不需要加锁)
)从缓存中取数据时,如果需要懒加载,当从持久存储中加载不到数据时,建议使用空数据,或空集合占位,避免每次都去持久存储中查询.
代码示例如下:
/**
*根据appCode获取流量策略
*
*@param appCode
*@return
*/
public Set<ApplicationTrafficPolicy>getAppTrafficPolicies(StringappCode) {
//从缓存加载
Map<String, ApplicationTrafficPolicy> map = policyMap.get(appCode);
//缓存中没有
if(map ==null) {
//尝试从持久存储中加载所有此网关的流量策略
Set<ApplicationTrafficPolicy> policies = trafficPolicyRepository.fuzzyQuery();
//持久存储中没有任何流量策略,占个位置,防止缓存重复去加载
if(policies ==null|| policies.size() ==0) {
map =new ConcurrentHashMap<>();
policyMap.put(appCode, map);
}else{
//持久存储中有流量策略,放入缓存
for(ApplicationTrafficPolicy policy : policies) {
setTrafficPolicy(policy);
}
//重新从缓存中加载一次
map = policyMap.get(appCode);
//如果还是没有,使用空map占位子
if(map ==null) {
map=new ConcurrentHashMap<>();
policyMap.put(appCode, map);
}
}
}
returnmap.values().stream().collect(Collectors.toSet());
}
事件通知
事件通知,这里我们使用的是
的发布与订阅能力. 默认是不发送事件的,要让它发布事件,需要先修改它的配置文件 . ,添加一个配置:notify-keyspace-events "K$g"
上面的配置将使得
中发生数据的添加,修改或删除时,发送 或 事件.然后,我们需要配置一个
,用来订阅我们感兴趣的事件.@Bean
RedisMessageListenerContainercontainer(MessageListenerAdapter listenerAdapter) {
StringgtwReidsPattern ="__keyspace@*__:"+ GTW + keyGenerator.getGatewayCode() +"]*";
StringcofRedisPattern ="__keyspace@*__:"+ COF + cacheKey.getKeyNameSpace() + USER_NAME +"*";
log.info("Add gateway redis message listener, patternTopic is {}", gtwReidsPattern);
log.info("Add coframe redis message listener, patternTopic is {}", cofRedisPattern);
RedisMessageListenerContainer container=new RedisMessageListenerContainer();
container.setConnectionFactory(redisTemplate.getConnectionFactory());
// PatternTopic参考: http://redisdoc.com/topic/notification.html
container.addMessageListener(listenerAdapter,Arrays.asList(new PatternTopic(PatternUtil.fmt(gtwReidsPattern)),new PatternTopic(PatternUtil.fmt(cofRedisPattern))));
return container;
}
当
事件订阅好了之后,每次其中我们关心的数据有变更,都会发送 或 事件.我们需要定义一个
,来接收事件:@Service(value = RedisMessageListener.REDIS_LISTENER_NAME)
public classRedisMessageListenerimplementsMessageListener {
@Override
public void onMessage(Message message,byte[] pattern) {
Stringops =new String(message.getBody());
Stringchannel =new String(message.getChannel());
Stringkey = channel.split(":")[1];
if("set".equals(ops)) {
Stringvalue = redisTemplate.opsForValue().get(key);
handleSet(key, value);
}else if("del".equals(ops)) {
handleDel(key);
}
}
...
}
接收到事件后,会调用相应的内部缓存,更新内部缓存中的数据,以实现治理数据变更的及时生效。
推荐阅读
关于作者:将晓渔,现任普元信息云计算架构师。曾在 ,云计算,数据备份,移动互联相关领域公司工作,十年以上 工作经验。国内 云计算的早期尝鲜者,容器技术专家,微服务架构的践行者。