背景:
如今的网络服务大多都是分布式的集群部署,采用的架构也是面向服务的架构(SOA),大家都在服务治理领域奋斗,这不,今天要说的Nacos也是一个服务治理领域的产物。Nacos是一个优秀的注册和配置中心。Nacos的介绍推荐,借个图:https://www.jianshu.com/p/39ade28c150d

介绍:
本篇主要以介绍Nacos的客户端为主。Nacos如何获取到服务器的配置,并做到实时更新。
例子:
@Test
public void testNacos() throws NacosException {
String serverAddr = "10.199.150.216:8848";
String namespace = "8ba3b4f6-1c6a-4c7d-99b0-f444002d526d";
String dataId = "cxtj.properties";
String group = "cxtj";
Properties properties = new Properties();
properties.put("serverAddr",serverAddr);
properties.put("namespace",namespace);
//1、创建Nacos配置服务
ConfigService configService = NacosFactory.createConfigService(properties);
//2、从服务中获取配置信息
String content = configService.getConfig(dataId,group,3000);
System.out.println("receive" + content);
}
打印出配置文件的信息:

从这里开始:
//1、创建Nacos配置服务
ConfigService configService = NacosFactory.createConfigService(properties);
//2、从服务中获取配置信息
String content = configService.getConfig(dataId,group,3000);
首先通过前面的配置属性进行创建配置服务:
public static ConfigService createConfigService(Properties properties) throws NacosException {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
return vendorImpl;
}
可见这里是通过反射,获取到NacosConfigService类的构造方法,来实例化对象。(问题1:难道直接new 一个对象不好吗?最后说答案)
看一下构造方法:
public NacosConfigService(Properties properties) throws NacosException {
//初始化编码格式
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
encode = Constants.ENCODE;
} else {
encode = encodeTmp.trim();
}
//初始化命名空间
initNamespace(properties);
//创建服务端的代理
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
//启动代理
agent.start();
//创建客户端干活的
worker = new ClientWorker(agent, configFilterChainManager, properties);
}
前面的编码和namespace的初始化略过,MetricsHttpAgent是在ServerHttpAgent的基础上,新增了监控功能,记录请求的时间等信息。主要还是ServerHttpAgent,看下ServerHttpAgent的构造方法。
public ServerHttpAgent(Properties properties) throws NacosException {
//创建服务列表管理器
serverListMgr = new ServerListManager(properties);
//初始化
init(properties);
}
ServerListManager中的服务列表一般会维护这两个概念,一个是isStarted(服务是否启动),一个是isFixed(服务地址是否固定,例子中的String serverAddr = "10.199.150.216:8848";就是固定地址),服务列表ServerListManager保存着服务器端的一些基本信息,如服务地址、服务器名称、endpoint等。
public ServerListManager(Properties properties) throws NacosException {
isStarted = false;
serverAddrsStr = properties.getProperty(PropertyKeyConst.SERVER_ADDR);
String namespace = properties.getProperty(PropertyKeyConst.NAMESPACE);
//初始化endpoint、contextPath、clusterName
initParam(properties);
//如果serverAddrsStr是有值的,说明服务端地址是固定的
if (StringUtils.isNotEmpty(serverAddrsStr)) {
isFixed = true;
List<String> serverAddrs = new ArrayList<String>();
//获取服务地址
String[] serverAddrsArr = serverAddrsStr.split(",");
for (String serverAddr: serverAddrsArr) {
if (serverAddr.startsWith(HTTPS) || serverAddr.startsWith(HTTP)) {
serverAddrs.add(serverAddr);
} else {
String[] serverAddrArr = serverAddr.split(":");
if (serverAddrArr.length == 1) {
serverAddrs.add(HTTP + serverAddrArr[0] + ":" + ParamUtil.getDefaultServerPort());
} else {
serverAddrs.add(HTTP + serverAddr);
}
}
}
serverUrls = serverAddrs;
//命名空间选择
if (StringUtils.isBlank(namespace)) {
name = FIXED_NAME + "-" + getFixedNameSuffix(serverUrls.toArray(new String[serverUrls.size()]));
} else {
this.namespace = namespace;
this.tenant = namespace;
name = FIXED_NAME + "-" + getFixedNameSuffix(serverUrls.toArray(new String[serverUrls.size()])) + "-"
+ namespace;
}
//如果是endpoint方式连接
} else {
if (StringUtils.isBlank(endpoint)) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "endpoint is blank");
}
isFixed = false;
if (StringUtils.isBlank(namespace)) {
name = endpoint;
addressServerUrl = String.format("http://%s:%d/%s/%s", endpoint, endpointPort, contentPath,
serverListName);
} else {
this.namespace = namespace;
this.tenant = namespace;
name = endpoint + "-" + namespace;
addressServerUrl = String.format("http://%s:%d/%s/%s?namespace=%s", endpoint, endpointPort,
contentPath, serverListName, namespace);
}
}
}
初始化方法:
private void init(Properties properties) {
//初始化编码格式,默认utf-8
initEncode(properties);
//初始化accessKey、secretKey,非对称加密用的
initAkSk(properties);
//初始化最大重试次数,默认是3次
initMaxRetry(properties);
}
AK/SK的相关知识,参考https://blog.csdn.net/makenothing/article/details/81158481
代理创建完成之后,启动代理。该代理的作用是 动态获取服务器的地址列表。
//启动代理
agent.start();
代理的start执行的是serverListMgr的start方法,也就是我们前面创建出的agent中的serverListManager
@Override
public synchronized void start() throws NacosException {
serverListMgr.start();
}
以例子中固定的服务端地址,start就直接return了,无需动态获取服务端的地址,暂时不说。
public synchronized void start() throws NacosException {
//如果像例子中的serverAddr是固定的,所以直接return了
if (isStarted || isFixed) {
return;
}
//这里做的操作是 动态的获取服务端地址
GetServerListTask getServersTask = new GetServerListTask(addressServerUrl);
for (int i = 0; i < initServerlistRetryTimes && serverUrls.isEmpty(); ++i) {
getServersTask.run();
try {
this.wait((i + 1) * 100L);
} catch (Exception e) {
LOGGER.warn("get serverlist fail,url: {}", addressServerUrl);
}
}
TimerService.scheduleWithFixedDelay(getServersTask, 0L, 30L, TimeUnit.SECONDS);
isStarted = true;
}
接下来就是ClientWorker了,他主要做的就是实时同步服务器的配置。
//客户端实时获取服务器端的配置
worker = new ClientWorker(agent, configFilterChainManager, properties);
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
//赋值一下服务器列表的代理、配置的过滤器
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
// Initialize the timeout parameter
init(properties);
//创建一个守护线程池,一个线程,用来启动(守护线程是与用户线程同生共死的线程)
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
//创建另一个守护线程池,数量为当前可用线程数,用来同步服务器上的配置信息
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
//启动线程,每10ms启动一次
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
//检查配置信息
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
下面看看checkConfigInfo方法:
public void checkConfigInfo() {
// 分任务同步,cacheMap存着所有的配置信息,且是线程安全的AtomicReference。
int listenerSize = cacheMap.get().size();
// 每3000个监听为一批,不满3000向上取整为批数
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
//执行同步
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
对缓存中的配置信息cacheData分批进行更新,启动LongPollingRunnable线程。源代码中有这样一条注释 “要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题”,就是因为这块线程监听是有必要的,不能停止的,但是目前还无法对这些线程进行监听。
我们来看看LongPollingRunnable这个线程是怎么做到监听服务器的配置的。
class LongPollingRunnable implements Runnable {
private int taskId;
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// check failover config
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
//检查本地的配置,nacos会在本地创建一个配置的副本,增强系统的容灾性
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
//检查服务端的配置信息,返回有变动的配置的groupId,dataId
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
//对有变动的配置重新从服务器端获取最新的配置并更新
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
String content = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(content));
} catch (NacosException ioe) {
String message = String.format(
"[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
//同步中出现新增的配置,需要同时更新监听器
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
//同步完继续同步,该线程就用来保持3000个配置与服务器一致
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
}
nacos会在本地搞一个配置的副本,这样可以极大地增强系统的容灾性。当nacos服务宕机的时候,还可以使用本地的配置运行。
在LongPollingRunnable 线程中,同时还维持着客户端与服务器的心跳信息,标识服务器当前的健康状况。具体在获取服务器变化的groupkey:
/**
* 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
*/
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {
StringBuilder sb = new StringBuilder();
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isUseLocalConfigInfo()) {
sb.append(cacheData.dataId).append(WORD_SEPARATOR);
sb.append(cacheData.group).append(WORD_SEPARATOR);
if (StringUtils.isBlank(cacheData.tenant)) {
sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
} else {
sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
}
if (cacheData.isInitializing()) {
// cacheData 首次出现在cacheMap中&首次check更新
inInitializingCacheList
.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
}
}
}
boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}
进入checkUpdateConfigStr方法:
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {
List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
List<String> headers = new ArrayList<String>(2);
headers.add("Long-Pulling-Timeout");
headers.add("" + timeout);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.add("Long-Pulling-Timeout-No-Hangup");
headers.add("true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}
try {
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
agent.getEncode(), timeout);
//调用服务器的listener接口,根据返回的http状态码,标识服务器的健康状况
if (HttpURLConnection.HTTP_OK == result.code) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.content);
} else {
setHealthServer(false);
LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);
}
} catch (IOException e) {
setHealthServer(false);
LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
throw e;
}
return Collections.emptyList();
}
说一下前面说的,问题1:难道直接new 一个对象不好吗?而要用反射
1、new属于静态编译,在编译的时候就已经确定要实例化的对象,反射则不确定
2、反射属于动态编译,意思就说只有到运行时才会去获得该对象的实例,大多框架都是使用反射获取对象实例,如Spring |