哥妞 海淀山后有哥妞

Nacos源码之一-配置自动更新

2021-05-28

Nacos 是阿里巴巴开源的集分布式配置中心、分布式注册中心为一体的分布式解决方案。

它的优点:

  1. 提供命令空间,方便管理不同环境的配置;
  2. 提供web界面,方便管理配置和服务;
  3. 支持配置版本管理,回滚;
  4. 支持服务管理,手动上线、下线服务。

等等其他优点。

1 如何使用 Nacos 自动更新配置

1.1 配置自动更新的两种方式

第一种方式

  1. 属性使用@Value注解

  2. 类使用@RefreshScope 注解

@RefreshScope
@RequestMapping("config")
public class ConfigController {

    @Value("${useLocalCache:false}")
    private boolean useLocalCache;
}

第二种方式

  1. 使用@NacosValue注解,自动更新配置成true
@Controller
@RequestMapping("config")
public class ConfigController {

    @NacosValue(value = "${useLocalCache:false}", autoRefreshed = true)
    private boolean useLocalCache;

    @RequestMapping(value = "/get", method = GET)
    @ResponseBody
    public boolean get() {
        return useLocalCache;
    }
}

2 Nacos 配置更新源码分析

先上图

Nacos配置更新流程图

具体步骤

1、第一步(更新数据库)比较简单,就是判断是新增配置还是修改配置,然后修改或者新增数据库响应信息(根据使用的数据库是内嵌derby还是mysql使用不同的实现)。

2、比较复杂的是通知(更新文件系统中的文件)

2.1 通过发布订阅模式,发布配置变更事件

2.2 订阅者接收到消息后,调用controller请求(communication/dataChange)

2.3 这个controller请求,启动一个异步任务,这个异步任务更新配置数据到指定的配置文件(nacos目录下)

之后通过发布-订阅模式通知其他服务,其他服务接收到通知之后,更新配置。

PS:

1、controller 请求http://ip:port/nacos/v1/cs/communication/dataChange?dataId=example&group=DEFAULT_GROUP

2、配置存储有两个地方,一个是在文件系统中,另一个是配置的数据库中(可能是内嵌的derby数据库和MySQL数据库)

3、一些配置基本信息,比如配置文件的 MD5 值、dataId、groupName等,会保存在 ConcurrentHashMap 存储的缓存的

源码1: 添加异步任务更新配置文件代码

/**
 * Add DumpTask to TaskManager, it will execute asynchronously.
 */
public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {
    String groupKey = GroupKey2.getKey(dataId, group, tenant);
    String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta));
    dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));
    DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}

然后通过persistService读取数据库中的相应记录

源码2: 读取数据库中的最新配置数据

ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);

将读取到的内容,写入到本地文件nacos/distribution/target/nacos-server-1.4.2/nacos/data/config-data/${GROUP_NAME}/${dataId}中,并更新配置文件的md5值。

源码3: 保存配置文件到文件目录中,并更新文件的 MD5 值

/**
 * Save config file and update md5 value in cache.
 *
 * @param dataId         dataId string value.
 * @param group          group string value.
 * @param tenant         tenant string value.
 * @param content        content string value.
 * @param lastModifiedTs lastModifiedTs.
 * @param type           file type.
 * @return dumpChange success or not.
 */
public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
        String type) {
    String groupKey = GroupKey2.getKey(dataId, group, tenant);
    CacheItem ci = makeSure(groupKey);
    ci.setType(type);
    final int lockResult = tryWriteLock(groupKey);
    assert (lockResult != 0);
    
    if (lockResult < 0) {
        DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
        return false;
    }
    
    try {
        final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
        
        if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
            DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
                            + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
                    lastModifiedTs);
        } else if (!PropertyUtil.isDirectRead()) {
        	// 保存数据到文件中
            DiskUtil.saveToDisk(dataId, group, tenant, content);
        }
        updateMd5(groupKey, md5, lastModifiedTs);
        return true;
    } catch (IOException ioe) {
        DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
        if (ioe.getMessage() != null) {
            String errMsg = ioe.getMessage();
            if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
                    .contains(DISK_QUATA_EN)) {
                // Protect from disk full.
                FATAL_LOG.error("磁盘满自杀退出", ioe);
                System.exit(0);
            }
        }
        return false;
    } finally {
        releaseWriteLock(groupKey);
    }
}

/**
 * Save configuration information to disk.
 */
public static void saveToDisk(String dataId, String group, String tenant, String content) throws IOException {
    File targetFile = targetFile(dataId, group, tenant);
    FileUtils.writeStringToFile(targetFile, content, Constants.ENCODE);
}

3 源码亮点

3.1 发布订阅模式

发布订阅模式使用于一个事件发生需要通知多个订阅者的场景。实际开发中,也能会有不同的名字,大家要能够识别,比如:Subject-Observer(经典例子)、Publisher-Subscriber(Nacos)、Producer-Consumer(Kafka)、Dispatcher-Listener。

发布变更事件

/**
 * Request publisher publish event Publishers load lazily, calling publisher.
 *
 * @param eventType class Instances type of the event type.
 * @param event     event instance.
 */
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
    if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
        return INSTANCE.sharePublisher.publish(event);
    }
    
    final String topic = ClassUtils.getCanonicalName(eventType);
    
    EventPublisher publisher = INSTANCE.publisherMap.get(topic);
    if (publisher != null) {
        return publisher.publish(event);
    }
    LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
    return false;
}

循环订阅者列表,通知订阅者。

/**
 * Receive and notifySubscriber to process the event.
 *
 * @param event {@link Event}.
 */
void receiveEvent(Event event) {
    final long currentEventSequence = event.sequence();
    
    // Notification single event listener
    for (Subscriber subscriber : subscribers) {
        // Whether to ignore expiration events
        if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
            LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
                    event.getClass());
            continue;
        }
        
        // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
        // Remove original judge part of codes.
        notifySubscriber(subscriber, event);
    }
}
    

通过线程池处理订阅者任务,因为这里订阅者需要调用HTTP请求来处理更新,所以通过线程池可以解耦

@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {
    
    LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
    
    final Runnable job = new Runnable() {
        @Override
        public void run() {
            subscriber.onEvent(event);
        }
    };
    
    final Executor executor = subscriber.executor();
    
    if (executor != null) {
        executor.execute(job);
    } else {
        try {
            job.run();
        } catch (Throwable e) {
            LOGGER.error("Event callback exception : {}", e);
        }
    }
}

3.2 任务管理器

这里存在竞争资源—单个配置文件,也就是确定Group下的dataId文件或者数据库记录。

  1. 失败重试
/**
 * process tasks in execute engine.
 */
protected void processTasks() {
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            if (!processor.process(task)) {
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error : " + e.toString(), e);
            retryFailedTask(taskKey, task);
        }
    }
}

3.3. 添加任务处理并发

使用重入锁ReentrantLock

protected final ReentrantLock lock = new ReentrantLock();

@Override
public void addTask(Object key, AbstractDelayTask newTask) {
    lock.lock();
    try {
        AbstractDelayTask existTask = tasks.get(key);
        if (null != existTask) {
            newTask.merge(existTask);
        }
        tasks.put(key, newTask);
    } finally {
        lock.unlock();
    }
}
  1. 缓存任务处理器对象,需要的时候直接通过本地缓存获取
private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<Object, NacosTaskProcessor>();

3.4 一个接口多个实现,根据条件选择

如果要实现多种不同数据库的操作的时候,Condition是非常有用的,比如这里要支持内嵌的derby数据库,也要支持MySQL,通过Condition可以根据需要使用不同的数据库实现。

传统企业里面有些业务系统需要支持多种不同的数据库,不同客户现场可能会使用不同的数据库,通过这种方式可以减少定制、减少到现场由于客户数据库不同而临时进行定制开发。

@Conditional(value = ConditionOnEmbeddedStorage.class)
@Component
public class EmbeddedStoragePersistServiceImpl implements PersistService {
@Conditional(value = ConditionOnExternalStorage.class)
@Component
public class ExternalStoragePersistServiceImpl implements PersistService {

如果本文对你有帮助,欢迎关注我的公众号 【哥妞】 ,带你深入 JAVA 的世界~


1分也是爱


微信扫描二维码,关注公众号 (转载本站文章请注明作者和出处 哥妞-geniu)

Similar Posts

评论