一、背景
通過對請求標記分組,實現請求在灰度服務的分組中流轉,當微服務鏈路內無灰度分組對應的下游服務時,用主線分組中對應的微服務提供服務。
1、應用場景
(1)A/B Testing
線上環境實現A/B Testing,期望在生產環境通過內測用戶驗證無誤后再全量發布給所有用戶使用。
(2)多版本開發測試調試
多個版本并行開發時,需要為每個版本準備一整套開發環境。如果版本較多,開發環境成本會非常大。分組隔離可以在多版本開發測試時大幅度降低資源成本,并實現開發機加入測試環境完成本地代碼調試。
2、需要解決的問題
現有的灰度發布工具可以實現同步調用鏈路的流量按請求標識在響應的服務分組內流轉,但是存在兩個異步調用鏈路問題導致灰度請求無法在灰度環境中流轉完畢:
(1)異步線程
鏈路中存在異步線程調用下游服務時,請求中灰度分組標識會丟失,導致灰度請求被流轉到主線分組中處理,灰度分組無法正常接收異步線程調用的請求;
(2)異步消息
當鏈路中請求產生mq消息后,因灰度分組和主線分組內消息消費方監聽同一隊列導致消息流轉混亂,易出現問題:消息處理邏輯不能兼容、消息丟失(因同一隊列在同一訂閱組內訂閱規則可能不一致)等;
二、方案實現
方案實現前提:在項目中使用Nacos,Spring Cloud OpenFeign、Spring Cloud Gateway,RoketMq
1自定義SpringMVC攔截器
將http請求中的灰度分組標識寫入當前本地線程ThreadLocal中,ThreadLocal采用Alibaba開源的TransmittableThreadLocal增強,解決當前請求中存在異步線程調用下游服務時,請求中灰度分組標識會丟失,導致灰度請求被流轉到主線分組中處理的問題。
(1)攔截器實現
package com.easyhome.common.feign;
import com.easyhome.common.utils.GrayscaleConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;
import org.springframework.web.servlet.HandlerInterceptor;
import JAVAx.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
/**
* 請求分組參數攔截器
* @author wangshufeng
*/
@Slf4j
public class TransmitHeaderPrintLogHanlerInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
Map<String,String> param=new HashMap<>(8);
//獲取所有灰度參數值設置到ThreadLocal,以便傳值
for (GrayHeaderParam item:GrayHeaderParam.values()) {
String hParam = request.getHeader(item.getValue());
if(!StringUtils.isEmpty(hParam)){
param.put(item.getValue(), hParam);
}
}
GrayParamHolder.putValues(param);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler,
@Nullable Exception ex) throws Exception {
//清除灰度ThreadLocal
GrayParamHolder.clearValue();
}
}
(2)ThreadLocal增強工具類
package com.easyhome.common.feign;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.easyhome.common.utils.GrayUtil;
import com.easyhome.common.utils.GrayscaleConstant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* 異步線程間參數傳遞
*
* @author wangshufeng
*/
public class GrayParamHolder {
/**
* 在Java的啟動參數加上:-javaagent:path/to/transmittable-thread-local-2.x.y.jar。
* <p>
* 注意:
* <p>
* 如果修改了下載的TTL的Jar的文件名(transmittable-thread-local-2.x.y.jar),則需要自己手動通過-Xbootclasspath JVM參數來顯式配置。
* 比如修改文件名成ttl-foo-name-changed.jar,則還需要加上Java的啟動參數:-Xbootclasspath/a:path/to/ttl-foo-name-changed.jar。
* 或使用v2.6.0之前的版本(如v2.5.1),則也需要自己手動通過-Xbootclasspath JVM參數來顯式配置(就像TTL之前的版本的做法一樣)。
* 加上Java的啟動參數:-Xbootclasspath/a:path/to/transmittable-thread-local-2.5.1.jar。
*/
private static ThreadLocal<Map<String, String>> paramLocal = new TransmittableThreadLocal();
/**
* 獲取單個參數值
*
* @param key
* @return
*/
public static String getValue(String key) {
Map<String, String> paramMap = GrayParamHolder.paramLocal.get();
if (Objects.nonNull(paramMap) && !paramMap.isEmpty()) {
return paramMap.get(key);
}
return null;
}
/**
* 獲取所有參數
*
* @return
*/
public static Map<String, String> getGrayMap() {
Map<String, String> paramMap = GrayParamHolder.paramLocal.get();
if(paramMap==null){
paramMap=new HashMap<>(8);
if(GrayUtil.isGrayPod()){
paramMap.put(GrayscaleConstant.HEADER_KEY, GrayscaleConstant.HEADER_VALUE);
paramMap.put(GrayscaleConstant.PRINT_HEADER_LOG_KEY, GrayscaleConstant.STR_BOOLEAN_TRUE);
GrayParamHolder.paramLocal.set(paramMap);
}
}
return paramMap;
}
/**
* 設置單個參數
*
* @param key
* @param value
*/
public static void putValue(String key, String value) {
Map<String, String> paramMap = GrayParamHolder.paramLocal.get();
if (Objects.isNull(paramMap) || paramMap.isEmpty()) {
paramMap = new HashMap<>(6);
GrayParamHolder.paramLocal.set(paramMap);
}
paramMap.put(key, value);
}
/**
* 設置單多個參數
*
* @param map
*/
public static void putValues(Map<String,String> map) {
Map<String, String> paramMap = GrayParamHolder.paramLocal.get();
if (Objects.isNull(paramMap) || paramMap.isEmpty()) {
paramMap = new HashMap<>(6);
GrayParamHolder.paramLocal.set(paramMap);
}
if(Objects.nonNull(map)&&!map.isEmpty()){
for (Map.Entry<String,String> item:map.entrySet()){
paramMap.put(item.getKey(),item.getValue());
}
}
}
/**
* 清空線程參數
*/
public static void clearValue() {
GrayParamHolder.paramLocal.remove();
}
}
(3)啟動加載攔截器
package com.easyhome.common.feign;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* 請求分組參數攔截器加載配置
* @author wangshufeng
*/
@Configuration
public class TransmitHeaderPrintLogConfig implements WebMvcConfigurer {
/**
* 配置攔截規則與注入攔截器
* @param registry
*/
@Override
public void addInterceptors(InterceptorRegistry registry) {
// addPathPattern 添加攔截規則 /** 攔截所有包括靜態資源
// excludePathPattern 排除攔截規則 所以我們需要放開靜態資源的攔截
registry.addInterceptor(new TransmitHeaderPrintLogHanlerInterceptor())
.addPathPatterns("/**");
}
}
2、自定義Feign攔截器
將自定義SpringMVC攔截器中放入ThreadLocal的灰度分組標識傳遞給下游服務。
package com.easyhome.common.feign;
import com.easyhome.common.utils.GrayscaleConstant;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.Objects;
/**
* feign傳遞請求頭信息攔截器
*
* @author wangshufeng
*/
@Slf4j
@Configuration
public class FeignTransmitHeadersRequestInterceptor implements RequestInterceptor {
@Override
public void Apply(RequestTemplate requestTemplate) {
Map<String,String> attributes=GrayParamHolder.getGrayMap();
if (Objects.nonNull(attributes)) {
//灰度標識傳遞
String version = attributes.get(GrayscaleConstant.HEADER_KEY);
if(!StringUtils.isEmpty(version)){
requestTemplate.header(GrayscaleConstant.HEADER_KEY, version);
}
//自定義一些在鏈路中需要一直攜帶的通用參數
//userId傳遞
String userId = attributes.get(GrayscaleConstant.USER_ID);
if(!StringUtils.isEmpty(userId)){
requestTemplate.header(GrayscaleConstant.USER_ID, userId);
}
String dwLang = attributes.get(GrayscaleConstant.DW_LANG);
if(!StringUtils.isEmpty(dwLang)){
requestTemplate.header(GrayscaleConstant.DW_LANG, dwLang);
}
String deviceOs = attributes.get(GrayscaleConstant.DEVICE_OS);
if(!StringUtils.isEmpty(deviceOs)){
requestTemplate.header(GrayscaleConstant.DEVICE_OS, deviceOs);
}
}
}
}
3、自定義負載策略
(1)負載策略實現
通過請求中的分組標識選擇對應分組的服務列表,實現請求在灰度服務的分組中流轉,當微服務鏈路內無對應分組的下游服務存活時,用主線分組中對應的微服務提供服務。
基于com.alibaba.cloud.nacos.ribbon.NacosRule重寫
package com.easyhome.common.nacos.ribbon;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.ribbon.ExtendBalancer;
import com.alibaba.cloud.nacos.ribbon.NacosServer;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.easyhome.common.utils.GrayUtil;
import com.easyhome.common.utils.GrayscaleConstant;
import com.NETflix.client.config.IClientConfig;
import com.netflix.loadbalancer.AbstractLoadBalancerRule;
import com.netflix.loadbalancer.DynamicServerListLoadBalancer;
import com.netflix.loadbalancer.Server;
import lombok.extern.slf4j.Slf4j;
import org.Apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* nacos自定義負載策略
*
* @author wangshufeng
*/
@Slf4j
public class NacosRule extends AbstractLoadBalancerRule {
@Autowired
private NacosDiscoveryProperties nacosDiscoveryProperties;
@Override
public Server choose(Object key) {
try {
String clusterName = this.nacosDiscoveryProperties.getClusterName();
DynamicServerListLoadBalancer loadBalancer = (DynamicServerListLoadBalancer) getLoadBalancer();
String name = loadBalancer.getName();
NamingService namingService = nacosDiscoveryProperties.namingServiceInstance();
List<Instance> instances = namingService.selectInstances(name, true);
instances = this.getGrayFilterInstances(instances, key);
if (CollectionUtils.isEmpty(instances)) {
log.warn("no instance in service {}", name);
return null;
}
List<Instance> instancesToChoose = instances;
if (StringUtils.isNotBlank(clusterName)) {
List<Instance> sameClusterInstances = instances.stream()
.filter(instance -> Objects.equals(clusterName, instance.getClusterName()))
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(sameClusterInstances)) {
instancesToChoose = sameClusterInstances;
} else {
log.warn(
"A cross-cluster call occurs,name = {}, clusterName = {}, instance = {}",
name, clusterName, instances);
}
}
Instance instance = ExtendBalancer.getHostByRandomWeight2(instancesToChoose);
return new NacosServer(instance);
} catch (Exception e) {
log.warn("NacosRule error", e);
return null;
}
}
/**
* 根據當前請求是否為灰度過濾服務實例列表
*
* @param instances
* @return List<Instance>
*/
private List<Instance> getGrayFilterInstances(List<Instance> instances, Object key) {
if (CollectionUtils.isEmpty(instances)) {
return instances;
} else {
//是否灰度請求
Boolean isGrayRequest;
String grayGroup=GrayscaleConstant.HEADER_VALUE;
//兼容gateway傳值方式,gateway是nio是通過key來做負載實例識別的
if (Objects.nonNull(key) && !GrayscaleConstant.DEFAULT.equals(key)) {
isGrayRequest = true;
if(isGrayRequest){
grayGroup=(String)key;
}
} else {
isGrayRequest = GrayUtil.isGrayRequest();
if(isGrayRequest){
grayGroup=GrayUtil.requestGroup();
}
}
List<Instance> prodInstance=new ArrayList<>();
List<Instance> grayInstance=new ArrayList<>();
for(Instance item:instances){
Map<String, String> metadata = item.getMetadata();
if (metadata.isEmpty() || !GrayscaleConstant.STR_BOOLEAN_TRUE.equals(metadata.get(GrayscaleConstant.POD_GRAY))) {
prodInstance.add(item);
}
if (isGrayRequest) {
if (!metadata.isEmpty() && GrayscaleConstant.STR_BOOLEAN_TRUE.equals(metadata.get(GrayscaleConstant.POD_GRAY))) {
if(Objects.equals(grayGroup,metadata.get(GrayscaleConstant.GRAY_GROUP))){
grayInstance.add(item);
}
}
}
}
if(!isGrayRequest||CollectionUtils.isEmpty(grayInstance)){
return prodInstance;
}
return grayInstance;
}
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
(2)啟動加載負載策略
package com.easyhome.common.nacos;
import com.easyhome.common.nacos.ribbon.NacosRule;
import com.netflix.loadbalancer.IRule;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
/**
* 灰度負載策略配置
* @author wangshufeng
*/
@Configuration
public class BalancerRuleConfig {
@Bean
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public IRule getRule(){
return new NacosRule();
}
}
4、注冊服務添加元數據信息
在服務啟動時向注冊中心注冊當前服務所在服務分組信息,在自定義負載策略中通過識別服務元數據中服務分組信息進行服務選擇。
package com.easyhome.common.nacos;
import com.alibaba.cloud.nacos.ConditionalOnNacosDiscoveryEnabled;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.discovery.NacosWatch;
import com.easyhome.common.utils.GrayUtil;
import com.easyhome.common.utils.GrayscaleConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.client.CommonsClientAutoConfiguration;
import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Objects;
/**
* 注冊服務添加元數據信息
*
* @author wangshufeng
*/
@Slf4j
@Configuration
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({SimpleDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class})
public class NacosMetadataConfig {
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(value = {"spring.cloud.nacos.discovery.watch.enabled"}, matchIfMissing = true)
public NacosWatch nacosWatch(NacosDiscoveryProperties nacosDiscoveryProperties) {
String grayFlg = GrayUtil.isGrayPod().toString();
log.info("注冊服務添加元數據:當前實例是否為灰度環境-{}", grayFlg);
nacosDiscoveryProperties.getMetadata().put(GrayscaleConstant.POD_GRAY, grayFlg);
if(Objects.equals(grayFlg,GrayscaleConstant.STR_BOOLEAN_TRUE)){
String groupFlg = GrayUtil.podGroup();
nacosDiscoveryProperties.getMetadata().put(GrayscaleConstant.GRAY_GROUP, groupFlg);
}
return new NacosWatch(nacosDiscoveryProperties);
}
}
5、異步消息處理
采用消息雙隊列隔離消息的流轉,消費方通過識別消息來源隊列在調用下游服務時放入服務分組信息,達到鏈路的正確流轉。
消息消費方灰度分組有實例運行情況:
消息消費方灰度分組實例下線情況:
(1)自定義灰度mq消息監聽器
接收灰度隊列消息后在當前線程中添加灰度流量分組標識,保證在消息處理邏輯中調用下游服務時請求在對應分組內流轉。
package com.easyhome.common.rocketmq;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.easyhome.common.feign.GrayParamHolder;
import com.easyhome.common.utils.GrayscaleConstant;
import lombok.extern.slf4j.Slf4j;
/**
* 灰度mq消息監聽器
* 通過topic后綴判斷是否為灰度流量
* @author wangshufeng
*/
@Slf4j
public final class GrayMessageListener implements MessageListener {
private MessageListener messageListener;
public GrayMessageListener(MessageListener messageListener) {
this.messageListener = messageListener;
}
@Override
public Action consume(Message message, ConsumeContext context) {
if(message.getTopic().endsWith(GrayscaleConstant.GRAY_TOPIC_SUFFIX)){
GrayParamHolder.putValue(GrayscaleConstant.HEADER_KEY, GrayscaleConstant.HEADER_VALUE);
GrayParamHolder.putValue(GrayscaleConstant.PRINT_HEADER_LOG_KEY, GrayscaleConstant.STR_BOOLEAN_TRUE);
log.info("為當前mq設置傳遞灰度標識。");
}
Action result= messageListener.consume(message,context);
GrayParamHolder.clearValue();
return result;
}
}
(2)自定義spring灰度環境變更事件
package com.easyhome.common.event;
import com.easyhome.common.rocketmq.ListenerStateEnum;
import org.springframework.context.ApplicationEvent;
/**
* 灰度環境變更事件
* @author wangshufeng
*/
public class GrayEventChangeEvent extends ApplicationEvent {
/**
* Create a new {@code ApplicationEvent}.
*
* @param source the object on which the event initially occurred or with
* which the event is associated (never {@code null})
*/
public GrayEventChangeEvent(ListenerStateEnum source) {
super(source);
}
}
(3)灰度實例上下線事件處理基礎類
定義spring灰度環境變更事件統一處理抽象類,RocketMq消費者繼承此抽象類,實現當前服務實例監聽spring事件完成正式隊列和灰度隊列的監聽自動切換。
package com.easyhome.common.rocketmq;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.easyhome.common.event.GrayEventChangeEvent;
import com.easyhome.common.utils.GrayUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.util.StringUtils;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
/**
* 灰度實例上下線事件處理基礎類
*
* @author wangshufeng
*/
@Slf4j
public abstract class AbstractGrayEventListener implements ApplicationListener<GrayEventChangeEvent> {
private Consumer consumer;
private Consumer consumerGray;
/**
* 默認訂閱tag規則
*/
private static final String DEFAULT_SUB_EXPRESSION = "*";
private List<SubscriptionData> subscribes = new ArrayList<>();
private ListenerStateEnum currentState;
private Properties mqProperties;
@Resource
private ApplicationContext applicationContext;
/**
* 初始化消費者實例
*/
public void initConsumer() {
if (GrayUtil.isGrayPod()) {
initConsumerGray();
} else {
initConsumerProduction();
}
}
/**
* 初始化生產消費者實例
*/
private void initConsumerProduction() {
if (consumer == null) {
synchronized (this) {
if (consumer == null) {
if (Objects.isNull(mqProperties)) {
throw new NullPointerException("rocketMq配置信息未設置");
} else {
consumer = ONSFactory.createConsumer(mqProperties);
consumer.start();
}
}
}
}
}
/**
* 初始化灰度消費者實例
*/
private void initConsumerGray() {
if (consumerGray == null) {
synchronized (this) {
if (consumerGray == null) {
if (Objects.isNull(mqProperties)) {
throw new NullPointerException("rocketMq配置信息未設置");
} else {
Properties grayProperties = new Properties();
grayProperties.putAll(mqProperties);
grayProperties.setProperty(PropertyKeyConst.GROUP_ID, GrayUtil.topicGrayName(grayProperties.getProperty(PropertyKeyConst.GROUP_ID)));
consumerGray = ONSFactory.createConsumer(grayProperties);
consumerGray.start();
}
}
}
}
}
@Override
public void onApplicationEvent(GrayEventChangeEvent event) {
ListenerStateEnum listenerStateEnum = (ListenerStateEnum) event.getSource();
log.info(this.getClass().getName() + "灰度環境變更:" + listenerStateEnum.getValue());
currentState = listenerStateEnum;
if (ListenerStateEnum.PRODUCTION.equals(listenerStateEnum)) {
initConsumerProduction();
for (SubscriptionData item : subscribes) {
if (Objects.nonNull(consumer)) {
consumer.subscribe(item.getTopic(), item.getSubExpression(), item.getListener());
}
}
shutdownConsumerGray();
}
if (ListenerStateEnum.TOGETHER.equals(listenerStateEnum)) {
initConsumerProduction();
initConsumerGray();
for (SubscriptionData item : subscribes) {
if (Objects.nonNull(consumer)) {
consumer.subscribe(item.getTopic(), item.getSubExpression(), item.getListener());
}
if (Objects.nonNull(consumerGray)) {
consumerGray.subscribe(GrayUtil.topicGrayName(item.getTopic()), item.getSubExpression(), item.getListener());
}
}
}
if (ListenerStateEnum.GRAYSCALE.equals(listenerStateEnum)) {
initConsumerGray();
for (SubscriptionData item : subscribes) {
if (Objects.nonNull(consumerGray)) {
consumerGray.subscribe(GrayUtil.topicGrayName(item.getTopic()), item.getSubExpression(), item.getListener());
}
}
shutdownConsumerProduction();
}
}
/**
* 添加訂閱規則
*
* @param topic 主題
* @param listenerClass 處理消息監聽器類名稱
* @return AbstractGrayEventListener
*/
public AbstractGrayEventListener subscribe(String topic, Class<? extends MessageListener> listenerClass) {
return this.subscribe(topic, DEFAULT_SUB_EXPRESSION, listenerClass);
}
/**
* 添加訂閱規則
*
* @param topic 主題
* @param subExpression 訂閱tag規則
* @param listenerClass 處理消息監聽器類名稱
* @return AbstractGrayEventListener
*/
public AbstractGrayEventListener subscribe(String topic, String subExpression, Class<? extends MessageListener> listenerClass) {
if (Objects.isNull(listenerClass)) {
throw new NullPointerException("listenerClass信息未設置");
}
MessageListener listener = applicationContext.getBean(listenerClass);
if (Objects.isNull(listener)) {
throw new NullPointerException(listenerClass.getName().concat("未找到實例對象"));
}
return this.subscribe(topic, subExpression, listener);
}
/**
* 添加訂閱規則
*
* @param topic 主題
* @param listener 處理消息監聽器
* @return AbstractGrayEventListener
*/
public AbstractGrayEventListener subscribe(String topic, MessageListener listener) {
return this.subscribe(topic, DEFAULT_SUB_EXPRESSION, listener);
}
/**
* 添加訂閱規則
*
* @param topic 主題
* @param subExpression 訂閱tag規則
* @param listener 處理消息監聽器
* @return AbstractGrayEventListener
*/
public AbstractGrayEventListener subscribe(String topic, String subExpression, MessageListener listener) {
if (StringUtils.isEmpty(topic)) {
throw new NullPointerException("topic信息未設置");
}
if (StringUtils.isEmpty(subExpression)) {
throw new NullPointerException("subExpression信息未設置");
}
if (Objects.isNull(listener)) {
throw new NullPointerException("listener信息未設置");
}
if (listener instanceof GrayMessageListener) {
subscribes.add(new SubscriptionData(topic, subExpression, listener));
} else {
subscribes.add(new SubscriptionData(topic, subExpression, new GrayMessageListener(listener)));
}
return this;
}
/**
* 設置RoketMq配置屬性
*
* @param mqProperties 配置屬性
* @return AbstractGrayEventListener
*/
public AbstractGrayEventListener setMqProperties(Properties mqProperties) {
this.mqProperties = mqProperties;
return this;
}
/**
* 銷毀方法
*/
@PreDestroy
public void shutdown() {
shutdownConsumerProduction();
shutdownConsumerGray();
}
/**
* 銷毀生產消費實例
*/
private void shutdownConsumerProduction() {
if (Objects.nonNull(consumer)) {
consumer.shutdown();
consumer = null;
}
}
/**
* 銷毀灰度消費者實例
*/
private void shutdownConsumerGray() {
if (Objects.nonNull(consumerGray)) {
consumerGray.shutdown();
consumerGray = null;
}
}
}
(4)nacos注冊中心服務列表變更事件監聽器實現
監聽nacos注冊中心服務列表發生變化的事件,識別當前實例需要監聽的消息隊列的類型,發出spring灰度環境變更事件通知所有mq消費者完成監聽隊列切換。
package com.easyhome.common.nacos;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.easyhome.common.event.GrayEventChangeEvent;
import com.easyhome.common.rocketmq.ListenerStateEnum;
import com.easyhome.common.utils.GrayUtil;
import com.easyhome.common.utils.GrayscaleConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
/**
* nacos自定義監聽實現
*
* @author wangshufeng
*/
@Slf4j
@Component
public class NacosEventListener implements EventListener {
@Resource
private ApplicationEventPublisher publisher;
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
this.mqInit(((NamingEvent) event).getInstances());
}
}
/**
* 當前的mq監聽狀態
*/
private static ListenerStateEnum listenerMqState;
public synchronized void mqInit(List<Instance> instances) {
ListenerStateEnum newState;
//當前實例是灰度實例
if (GrayUtil.isGrayPod()) {
newState = ListenerStateEnum.GRAYSCALE;
} else {
//判斷當前服務有灰度實例
if (this.isHaveGray(instances)) {
newState = ListenerStateEnum.PRODUCTION;
} else {
newState = ListenerStateEnum.TOGETHER;
}
}
log.info("當前實例是否為灰度環境:{}", GrayUtil.isGrayPod());
log.info("當前實例監聽mq隊列的狀態:{}", newState.getValue());
//防止重復初始化監聽mq隊列信息
if (!newState.equals(listenerMqState)) {
listenerMqState = newState;
publisher.publishEvent(new GrayEventChangeEvent(listenerMqState));
}
}
/**
* 是否有灰度實例
*
* @return
*/
private boolean isHaveGray(List<Instance> instances) {
if (!CollectionUtils.isEmpty(instances)) {
for (Instance instance : instances) {
if (GrayscaleConstant.STR_BOOLEAN_TRUE.equals(instance.getMetadata().get(GrayscaleConstant.POD_GRAY))) {
return true;
}
}
}
return false;
}
}
(5)加載nacos自定義監聽器
package com.easyhome.common.nacos;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* 配置nacos自定義監聽
* @author wangshufeng
*/
@Configuration
@Slf4j
public class NacosListenerConfig {
@Resource
NacosDiscoveryProperties nacosDiscoveryProperties;
@Resource
NacosEventListener nacosEventListener;
@PostConstruct
public void subscribe() {
try {
NamingService namingService = NamingFactory.createNamingService(nacosDiscoveryProperties.getServerAddr());
namingService.subscribe(nacosDiscoveryProperties.getService(),nacosDiscoveryProperties.getGroup(), nacosEventListener);
log.info("配置nacos自定義監聽完成");
} catch (NacosException e) {
log.error("配置nacos自定義監聽錯誤", e);
}
}
}
三、使用方法
1、項目中引入easyhome-common-gray.jar
<dependency>
<groupId>com.easyhome</groupId>
<artifactId>easyhome-common-gray</artifactId>
<version>1.0.2-RELEASE</version>
</dependency>
2、 SpringBoot啟動類上添加掃描類路徑
@SpringBootApplication(scanBasePackages = {"com.easyhome.*" })
3、 定義RocketMq消費者時,繼承AbstractGrayEventListener,示例代碼如下
/**
* 商品事件消費
* @author wangshufeng
*/
@Component
@Slf4j
public class GoodsChangeEventConsumer extends AbstractGrayEventListener {
@Resource
private MqGoodsConfig mqConfig;
@Resource
private MqMarketingConfig mqMarketingConfig;
/**
* 消息訂閱
*/
@PostConstruct
public void consume() {
this.subscribe(mqConfig.getGoodsEventTopic(), "*", GoodsChangeMessageListener.class)
.subscribe(mqConfig.getShopEventTopic(), "*", ShopChangeMessageListener.class)
.subscribe(this.mqMarketingConfig.getChangeTopic(), this.mqMarketingConfig.getChangeTag(), MarketingChangeMessageListener.class)
.subscribe(mqConfig.getCategoryEventTopic(),"*", CategoryChangeMessageListener.class)
.setMqProperties(mqConfig.getGoodsEventMsgMqProperties()).initConsumer();
}
}
4、jvm 啟動參數添加如下
-Dpod.gray值為false時,啟動服務實例為主線分組實例,-Dgray.group無需設置;-Dpod.gray值為true時,啟動服務實例為灰度分組實例,-Dgray.group需設置當前服務實例所屬分組
-javaagent:/home/easyhome/transmittable-thread-local-2.13.2.jar
-Dpod.gray=true -Dgray.group=自定義分組名稱
四、存在問題
目前消息只支持主線隊列和灰度隊列兩種隊列,多灰度組時灰度消息沒有分組隔離,后續版本解決。