Nacos客户端源码解读

论坛 期权论坛 编程之家     
选择匿名的用户   2021-5-17 05:36   11   0

背景:

如今的网络服务大多都是分布式的集群部署,采用的架构也是面向服务的架构(SOA),大家都在服务治理领域奋斗,这不,今天要说的Nacos也是一个服务治理领域的产物。Nacos是一个优秀的注册和配置中心。Nacos的介绍推荐,借个图:https://www.jianshu.com/p/39ade28c150d

介绍:

本篇主要以介绍Nacos的客户端为主。Nacos如何获取到服务器的配置,并做到实时更新。

例子:

 @Test
 public void testNacos() throws NacosException {
  String serverAddr = "10.199.150.216:8848";
  String namespace = "8ba3b4f6-1c6a-4c7d-99b0-f444002d526d";
  String dataId = "cxtj.properties";
  String group = "cxtj";
  Properties properties = new Properties();
  properties.put("serverAddr",serverAddr);
  properties.put("namespace",namespace);
                //1、创建Nacos配置服务
  ConfigService configService = NacosFactory.createConfigService(properties);
                //2、从服务中获取配置信息
  String content = configService.getConfig(dataId,group,3000);
  System.out.println("receive" + content);
 }

打印出配置文件的信息:

从这里开始:

//1、创建Nacos配置服务
ConfigService configService = NacosFactory.createConfigService(properties);
//2、从服务中获取配置信息
String content = configService.getConfig(dataId,group,3000);

首先通过前面的配置属性进行创建配置服务:

    public static ConfigService createConfigService(Properties properties) throws NacosException {
            Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
            return vendorImpl;
    }

可见这里是通过反射,获取到NacosConfigService类的构造方法,来实例化对象。(问题1:难道直接new 一个对象不好吗?最后说答案)

看一下构造方法:

    public NacosConfigService(Properties properties) throws NacosException {
        //初始化编码格式        
        String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
        if (StringUtils.isBlank(encodeTmp)) {
            encode = Constants.ENCODE;
        } else {
            encode = encodeTmp.trim();
        }
        //初始化命名空间
        initNamespace(properties);
        //创建服务端的代理
        agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
        //启动代理
        agent.start();
        //创建客户端干活的
        worker = new ClientWorker(agent, configFilterChainManager, properties);
    }

前面的编码和namespace的初始化略过,MetricsHttpAgent是在ServerHttpAgent的基础上,新增了监控功能,记录请求的时间等信息。主要还是ServerHttpAgent,看下ServerHttpAgent的构造方法。

    public ServerHttpAgent(Properties properties) throws NacosException {
        //创建服务列表管理器
        serverListMgr = new ServerListManager(properties);
        //初始化
        init(properties);
    }

ServerListManager中的服务列表一般会维护这两个概念,一个是isStarted(服务是否启动),一个是isFixed(服务地址是否固定,例子中的String serverAddr = "10.199.150.216:8848";就是固定地址),服务列表ServerListManager保存着服务器端的一些基本信息,如服务地址、服务器名称、endpoint等。

    public ServerListManager(Properties properties) throws NacosException {
        isStarted = false;
        serverAddrsStr = properties.getProperty(PropertyKeyConst.SERVER_ADDR);
        String namespace = properties.getProperty(PropertyKeyConst.NAMESPACE);
        //初始化endpoint、contextPath、clusterName
        initParam(properties);
        //如果serverAddrsStr是有值的,说明服务端地址是固定的
        if (StringUtils.isNotEmpty(serverAddrsStr)) {
            isFixed = true;
            List<String> serverAddrs = new ArrayList<String>();
            //获取服务地址
            String[] serverAddrsArr = serverAddrsStr.split(",");
            for (String serverAddr: serverAddrsArr) {
                if (serverAddr.startsWith(HTTPS) || serverAddr.startsWith(HTTP)) {
                    serverAddrs.add(serverAddr);
                } else {
                    String[] serverAddrArr = serverAddr.split(":");
                    if (serverAddrArr.length == 1) {
                        serverAddrs.add(HTTP + serverAddrArr[0] + ":" + ParamUtil.getDefaultServerPort());
                    } else {
                        serverAddrs.add(HTTP + serverAddr);
                    }
                }
            }
            serverUrls = serverAddrs;
            //命名空间选择
            if (StringUtils.isBlank(namespace)) {
                name = FIXED_NAME + "-" + getFixedNameSuffix(serverUrls.toArray(new String[serverUrls.size()]));
            } else {
                this.namespace = namespace;
                this.tenant = namespace;
                name = FIXED_NAME + "-" + getFixedNameSuffix(serverUrls.toArray(new String[serverUrls.size()])) + "-"
                    + namespace;
            }
        //如果是endpoint方式连接
        } else {
            if (StringUtils.isBlank(endpoint)) {
                throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "endpoint is blank");
            }
            isFixed = false;
            if (StringUtils.isBlank(namespace)) {
                name = endpoint;
                addressServerUrl = String.format("http://%s:%d/%s/%s", endpoint, endpointPort, contentPath,
                    serverListName);
            } else {
                this.namespace = namespace;
                this.tenant = namespace;
                name = endpoint + "-" + namespace;
                addressServerUrl = String.format("http://%s:%d/%s/%s?namespace=%s", endpoint, endpointPort,
                    contentPath, serverListName, namespace);
            }
        }

    }

初始化方法:

    private void init(Properties properties) {
        //初始化编码格式,默认utf-8
        initEncode(properties);
        //初始化accessKey、secretKey,非对称加密用的
        initAkSk(properties);
        //初始化最大重试次数,默认是3次
        initMaxRetry(properties);
    }

AK/SK的相关知识,参考https://blog.csdn.net/makenothing/article/details/81158481

代理创建完成之后,启动代理。该代理的作用是 动态获取服务器的地址列表。

        //启动代理
        agent.start();

代理的start执行的是serverListMgr的start方法,也就是我们前面创建出的agent中的serverListManager

    @Override
    public synchronized void start() throws NacosException {
        serverListMgr.start();
    }

以例子中固定的服务端地址,start就直接return了,无需动态获取服务端的地址,暂时不说。

public synchronized void start() throws NacosException {
        //如果像例子中的serverAddr是固定的,所以直接return了
        if (isStarted || isFixed) {
            return;
        }
        //这里做的操作是 动态的获取服务端地址
        GetServerListTask getServersTask = new GetServerListTask(addressServerUrl);
        for (int i = 0; i < initServerlistRetryTimes && serverUrls.isEmpty(); ++i) {
            getServersTask.run();
            try {
                this.wait((i + 1) * 100L);
            } catch (Exception e) {
                LOGGER.warn("get serverlist fail,url: {}", addressServerUrl);
            }
        }
        TimerService.scheduleWithFixedDelay(getServersTask, 0L, 30L, TimeUnit.SECONDS);
        isStarted = true;
    }

接下来就是ClientWorker了,他主要做的就是实时同步服务器的配置。

//客户端实时获取服务器端的配置        
worker = new ClientWorker(agent, configFilterChainManager, properties);
    public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
        //赋值一下服务器列表的代理、配置的过滤器
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;

        // Initialize the timeout parameter
        init(properties);

        //创建一个守护线程池,一个线程,用来启动(守护线程是与用户线程同生共死的线程)
        executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        //创建另一个守护线程池,数量为当前可用线程数,用来同步服务器上的配置信息
        executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        //启动线程,每10ms启动一次
        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    //检查配置信息
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }

下面看看checkConfigInfo方法:

    public void checkConfigInfo() {
        // 分任务同步,cacheMap存着所有的配置信息,且是线程安全的AtomicReference。
        int listenerSize = cacheMap.get().size();
        // 每3000个监听为一批,不满3000向上取整为批数 
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                //执行同步
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }

对缓存中的配置信息cacheData分批进行更新,启动LongPollingRunnable线程。源代码中有这样一条注释 “要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题”,就是因为这块线程监听是有必要的,不能停止的,但是目前还无法对这些线程进行监听。

我们来看看LongPollingRunnable这个线程是怎么做到监听服务器的配置的。

class LongPollingRunnable implements Runnable {
        private int taskId;

        public LongPollingRunnable(int taskId) {
            this.taskId = taskId;
        }

        @Override
        public void run() {

            List<CacheData> cacheDatas = new ArrayList<CacheData>();
            List<String> inInitializingCacheList = new ArrayList<String>();
            try {
                // check failover config
                for (CacheData cacheData : cacheMap.get().values()) {
                    if (cacheData.getTaskId() == taskId) {
                        cacheDatas.add(cacheData);
                        try {
                            //检查本地的配置,nacos会在本地创建一个配置的副本,增强系统的容灾性
                            checkLocalConfig(cacheData);
                            if (cacheData.isUseLocalConfigInfo()) {
                                cacheData.checkListenerMd5();
                            }
                        } catch (Exception e) {
                            LOGGER.error("get local config info error", e);
                        }
                    }
                }

                //检查服务端的配置信息,返回有变动的配置的groupId,dataId
                List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);

                //对有变动的配置重新从服务器端获取最新的配置并更新
                for (String groupKey : changedGroupKeys) {
                    String[] key = GroupKey.parseKey(groupKey);
                    String dataId = key[0];
                    String group = key[1];
                    String tenant = null;
                    if (key.length == 3) {
                        tenant = key[2];
                    }
                    try {
                        String content = getServerConfig(dataId, group, tenant, 3000L);
                        CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                        cache.setContent(content);
                        LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
                            agent.getName(), dataId, group, tenant, cache.getMd5(),
                            ContentUtils.truncateContent(content));
                    } catch (NacosException ioe) {
                        String message = String.format(
                            "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                            agent.getName(), dataId, group, tenant);
                        LOGGER.error(message, ioe);
                    }
                }
                //同步中出现新增的配置,需要同时更新监听器
                for (CacheData cacheData : cacheDatas) {
                    if (!cacheData.isInitializing() || inInitializingCacheList
                        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                        cacheData.checkListenerMd5();
                        cacheData.setInitializing(false);
                    }
                }
                inInitializingCacheList.clear();
                //同步完继续同步,该线程就用来保持3000个配置与服务器一致
                executorService.execute(this);

            } catch (Throwable e) {

                // If the rotation training task is abnormal, the next execution time of the task will be punished
                LOGGER.error("longPolling error : ", e);
                executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
            }
        }
    }

nacos会在本地搞一个配置的副本,这样可以极大地增强系统的容灾性。当nacos服务宕机的时候,还可以使用本地的配置运行。

在LongPollingRunnable 线程中,同时还维持着客户端与服务器的心跳信息,标识服务器当前的健康状况。具体在获取服务器变化的groupkey:

    /**
     * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
     */
    List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {
        StringBuilder sb = new StringBuilder();
        for (CacheData cacheData : cacheDatas) {
            if (!cacheData.isUseLocalConfigInfo()) {
                sb.append(cacheData.dataId).append(WORD_SEPARATOR);
                sb.append(cacheData.group).append(WORD_SEPARATOR);
                if (StringUtils.isBlank(cacheData.tenant)) {
                    sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
                } else {
                    sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                    sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
                }
                if (cacheData.isInitializing()) {
                    // cacheData 首次出现在cacheMap中&首次check更新
                    inInitializingCacheList
                        .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
                }
            }
        }
        boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
        return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
    }

进入checkUpdateConfigStr方法:

    List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {

        List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);

        List<String> headers = new ArrayList<String>(2);
        headers.add("Long-Pulling-Timeout");
        headers.add("" + timeout);

        // told server do not hang me up if new initializing cacheData added in
        if (isInitializingCacheList) {
            headers.add("Long-Pulling-Timeout-No-Hangup");
            headers.add("true");
        }

        if (StringUtils.isBlank(probeUpdateString)) {
            return Collections.emptyList();
        }

        try {
            HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
                agent.getEncode(), timeout);
            //调用服务器的listener接口,根据返回的http状态码,标识服务器的健康状况
            if (HttpURLConnection.HTTP_OK == result.code) {
                setHealthServer(true);
                return parseUpdateDataIdResponse(result.content);
            } else {
                setHealthServer(false);
                LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);
            }
        } catch (IOException e) {
            setHealthServer(false);
            LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
            throw e;
        }
        return Collections.emptyList();
    }

说一下前面说的,问题1:难道直接new 一个对象不好吗?而要用反射

1、new属于静态编译,在编译的时候就已经确定要实例化的对象,反射则不确定

2、反射属于动态编译,意思就说只有到运行时才会去获得该对象的实例,大多框架都是使用反射获取对象实例,如Spring

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP