/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shenyu.sync.data.consul;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.kv.model.GetValue;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.config.ShenyuConfig;
import org.apache.shenyu.common.constant.ConsulConstants;
import org.apache.shenyu.common.constant.DefaultPathConstants;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
import org.apache.shenyu.sync.data.consul.config.ConsulConfig;
import org.apache.shenyu.sync.data.core.AbstractPathDataSyncService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsulSyncDataService
extends AbstractPathDataSyncService {
    private static final Logger LOG = LoggerFactory.getLogger(ConsulSyncDataService.class);
    private final Map<String, Long> consulIndexes = new HashMap<String, Long>();
    private final Map<String, List<ConsulData>> cacheConsulDataKeyMap = new HashMap<String, List<ConsulData>>();
    private final ScheduledThreadPoolExecutor executor;
    private final ConsulConfig consulConfig;
    private final ConsulClient consulClient;
    private final ShenyuConfig shenyuConfig;

    public ConsulSyncDataService(ShenyuConfig shenyuConfig, ConsulClient consulClient, ConsulConfig consulConfig, PluginDataSubscriber pluginDataSubscriber, List<MetaDataSubscriber> metaDataSubscribers, List<AuthDataSubscriber> authDataSubscribers, List<ProxySelectorDataSubscriber> proxySelectorDataSubscribers, List<DiscoveryUpstreamDataSubscriber> discoveryUpstreamDataSubscribers) {
        super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers, proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers);
        this.shenyuConfig = shenyuConfig;
        this.consulClient = consulClient;
        this.consulConfig = consulConfig;
        this.executor = new ScheduledThreadPoolExecutor(7, ShenyuThreadFactory.create((String)"consul-config-watch", (boolean)true));
        this.watcherData();
    }

    private void watcherData() {
        String configNamespace = "/" + this.shenyuConfig.getNamespace();
        this.watcherData0(DefaultPathConstants.handlePathData((String)String.join((CharSequence)"/", configNamespace, "/shenyu/plugin")));
        this.watcherData0(DefaultPathConstants.handlePathData((String)String.join((CharSequence)"/", configNamespace, "/shenyu/selector")));
        this.watcherData0(DefaultPathConstants.handlePathData((String)String.join((CharSequence)"/", configNamespace, "/shenyu/rule")));
        this.watcherData0(DefaultPathConstants.handlePathData((String)String.join((CharSequence)"/", configNamespace, "/shenyu/proxySelectorData")));
        this.watcherData0(DefaultPathConstants.handlePathData((String)String.join((CharSequence)"/", configNamespace, "/shenyu/discoveryUpstream")));
        this.watcherData0(DefaultPathConstants.handlePathData((String)String.join((CharSequence)"/", configNamespace, "/shenyu/auth")));
        this.watcherData0(DefaultPathConstants.handlePathData((String)String.join((CharSequence)"/", configNamespace, "/shenyu/metaData")));
    }

    private void watcherData0(String registerPath) {
        String configNamespace = "/" + this.shenyuConfig.getNamespace();
        this.consulIndexes.put(registerPath, 0L);
        BiConsumer<String, String> updateHandler = (changeData, decodedValue) -> this.event(configNamespace, (String)changeData, (String)decodedValue, registerPath, AbstractPathDataSyncService.EventType.PUT);
        Consumer<String> deleteHandler = removeKey -> this.event(configNamespace, (String)removeKey, null, registerPath, AbstractPathDataSyncService.EventType.DELETE);
        this.executor.schedule(() -> this.watchConfigKeyValues(registerPath, updateHandler, deleteHandler), -1L, TimeUnit.MILLISECONDS);
    }

    private void watchConfigKeyValues(String watchPathRoot, BiConsumer<String, String> updateHandler, Consumer<String> deleteHandler) {
        try {
            Response response;
            Long currentIndex = this.consulIndexes.get(watchPathRoot);
            if (Objects.isNull(currentIndex)) {
                currentIndex = ConsulConstants.INIT_CONFIG_VERSION_INDEX;
            }
            if (Objects.isNull((response = this.consulClient.getKVValues(watchPathRoot, null, new QueryParams(TimeUnit.MILLISECONDS.toSeconds(this.consulConfig.getWaitTime()), currentIndex.longValue()))).getValue()) || ((List)response.getValue()).isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("No value for watchPathRoot {}", (Object)watchPathRoot);
                }
                this.executor.schedule(() -> this.watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler), (long)this.consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
                return;
            }
            Long newIndex = response.getConsulIndex();
            if (Objects.isNull(newIndex)) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Same index for watchPathRoot {}", (Object)watchPathRoot);
                }
                this.executor.schedule(() -> this.watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler), (long)this.consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
                return;
            }
            if (Objects.equals(newIndex, currentIndex)) {
                this.executor.schedule(() -> this.watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler), -1L, TimeUnit.MILLISECONDS);
                return;
            }
            if (!this.consulIndexes.containsValue(newIndex) && !currentIndex.equals(ConsulConstants.INIT_CONFIG_VERSION_INDEX)) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("watchPathRoot {} has new index {}", (Object)watchPathRoot, (Object)newIndex);
                }
                Long lastIndex = currentIndex;
                List<ConsulData> lastDatas = this.cacheConsulDataKeyMap.get(watchPathRoot);
                ((List)response.getValue()).forEach(data -> {
                    ConsulData consulData;
                    if (data.getModifyIndex() == lastIndex.longValue()) {
                        return;
                    }
                    if (Objects.nonNull(lastDatas) && Objects.nonNull(consulData = (ConsulData)lastDatas.stream().filter(lastData -> data.getKey().equals(lastData.getConsulKey())).findFirst().orElse(null)) && !StringUtils.isBlank((CharSequence)consulData.getConsulDataMd5()) && consulData.getConsulDataMd5().equals(DigestUtils.md5Hex((String)data.getValue()))) {
                        return;
                    }
                    updateHandler.accept(data.getKey(), data.getDecodedValue());
                });
                List currentKeys = ((List)response.getValue()).stream().map(GetValue::getKey).collect(Collectors.toList());
                if (!ObjectUtils.isEmpty(lastDatas)) {
                    lastDatas.stream().map(ConsulData::getConsulKey).filter(lastKey -> !currentKeys.contains(lastKey)).forEach(deleteHandler);
                }
                this.cacheConsulDataKeyMap.put(watchPathRoot, ((List)response.getValue()).stream().map(data -> {
                    ConsulData consulData = new ConsulData();
                    consulData.setConsulKey(data.getKey());
                    consulData.setConsulDataMd5(DigestUtils.md5Hex((String)data.getValue()));
                    return consulData;
                }).collect(Collectors.toList()));
            } else if (LOG.isTraceEnabled()) {
                LOG.info("Event for index already published for watchPathRoot {}", (Object)watchPathRoot);
            }
            this.consulIndexes.put(watchPathRoot, newIndex);
            this.executor.schedule(() -> this.watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler), -1L, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            LOG.warn("Error querying consul Key/Values for watchPathRoot '{}'. Message: ", (Object)watchPathRoot, (Object)e);
            this.executor.schedule(() -> this.watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler), (long)this.consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
        }
    }

    public void close() {
        if (!ObjectUtils.isEmpty((Object)this.executor)) {
            this.executor.shutdown();
        }
    }

    private static class ConsulData {
        private String consulKey;
        private String consulDataMd5;

        private ConsulData() {
        }

        public String getConsulKey() {
            return this.consulKey;
        }

        public void setConsulKey(String consulKey) {
            this.consulKey = consulKey;
        }

        public String getConsulDataMd5() {
            return this.consulDataMd5;
        }

        public void setConsulDataMd5(String consulDataMd5) {
            this.consulDataMd5 = consulDataMd5;
        }
    }
}

