/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.remoting.etcd.jetcd;

import com.google.protobuf.ByteString;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.api.Event;
import io.etcd.jetcd.api.KeyValue;
import io.etcd.jetcd.api.WatchCancelRequest;
import io.etcd.jetcd.api.WatchCreateRequest;
import io.etcd.jetcd.api.WatchGrpc;
import io.etcd.jetcd.api.WatchRequest;
import io.etcd.jetcd.api.WatchResponse;
import io.etcd.jetcd.common.exception.ClosedClientException;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.netty.util.internal.ConcurrentSet;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.remoting.etcd.ChildListener;
import org.apache.dubbo.remoting.etcd.Constants;
import org.apache.dubbo.remoting.etcd.jetcd.JEtcdClientWrapper;
import org.apache.dubbo.remoting.etcd.option.OptionUtil;
import org.apache.dubbo.remoting.etcd.support.AbstractEtcdClient;

public class JEtcdClient
extends AbstractEtcdClient<EtcdWatcher> {
    private JEtcdClientWrapper clientWrapper;
    private ScheduledExecutorService reconnectSchedule;
    private ExecutorService notifyExecutor;
    private int delayPeriod;
    private Logger logger = LoggerFactory.getLogger(JEtcdClient.class);

    public JEtcdClient(URL url) {
        super(url);
        try {
            this.clientWrapper = new JEtcdClientWrapper(url);
            this.clientWrapper.setConnectionStateListener((client, state) -> {
                if (state == 1) {
                    this.stateChanged(1);
                } else if (state == 0) {
                    this.stateChanged(0);
                }
            });
            this.delayPeriod = this.getUrl().getParameter("retry.period", 5000);
            this.reconnectSchedule = Executors.newScheduledThreadPool(1, (ThreadFactory)new NamedThreadFactory("etcd3-watch-auto-reconnect"));
            this.notifyExecutor = new ThreadPoolExecutor(1, url.getParameter("etcd3.notify.maxthreads", Constants.DEFAULT_ETCD3_NOTIFY_THREADS), 60000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(url.getParameter("etcd3.notify.queues", 9000000)), (ThreadFactory)new NamedThreadFactory("etcd3-notify", true));
            this.clientWrapper.start();
        }
        catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override
    public void doCreatePersistent(String path) {
        this.clientWrapper.createPersistent(path);
    }

    @Override
    public long doCreateEphemeral(String path) {
        return this.clientWrapper.createEphemeral(path);
    }

    @Override
    public boolean checkExists(String path) {
        return this.clientWrapper.checkExists(path);
    }

    @Override
    public EtcdWatcher createChildWatcherListener(String path, ChildListener listener) {
        return new EtcdWatcher(listener);
    }

    @Override
    public List<String> addChildWatcherListener(String path, EtcdWatcher etcdWatcher) {
        return etcdWatcher.forPath(path);
    }

    @Override
    public void removeChildWatcherListener(String path, EtcdWatcher etcdWatcher) {
        etcdWatcher.unwatch();
    }

    @Override
    public List<String> getChildren(String path) {
        return this.clientWrapper.getChildren(path);
    }

    @Override
    public boolean isConnected() {
        return this.clientWrapper.isConnected();
    }

    @Override
    public long createLease(long second) {
        return this.clientWrapper.createLease(second);
    }

    @Override
    public long createLease(long ttl, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return this.clientWrapper.createLease(ttl, timeout, unit);
    }

    @Override
    public void delete(String path) {
        this.clientWrapper.delete(path);
    }

    @Override
    public void revokeLease(long lease) {
        this.clientWrapper.revokeLease(lease);
    }

    @Override
    public void doClose() {
        try {
            if (this.notifyExecutor != null) {
                ExecutorUtil.shutdownNow((Executor)this.notifyExecutor, (int)100);
            }
        }
        catch (Exception e) {
            this.logger.warn(e.getMessage(), (Throwable)e);
        }
        try {
            if (this.reconnectSchedule != null) {
                ExecutorUtil.shutdownNow((Executor)this.reconnectSchedule, (int)100);
            }
        }
        catch (Exception e) {
            this.logger.warn(e.getMessage(), (Throwable)e);
        }
        finally {
            this.clientWrapper.doClose();
        }
    }

    @Override
    public String getKVValue(String key) {
        return this.clientWrapper.getKVValue(key);
    }

    @Override
    public boolean put(String key, String value) {
        return this.clientWrapper.put(key, value);
    }

    @Override
    public boolean putEphemeral(String key, String value) {
        return this.clientWrapper.putEphemeral(key, value);
    }

    public ManagedChannel getChannel() {
        return this.clientWrapper.getChannel();
    }

    public class EtcdWatcher
    implements StreamObserver<WatchResponse> {
        protected WatchGrpc.WatchStub watchStub;
        protected StreamObserver<WatchRequest> watchRequest;
        protected long watchId;
        protected String path;
        protected Throwable throwable;
        protected volatile Set<String> urls = new ConcurrentSet();
        private ChildListener listener;
        protected ReentrantLock lock = new ReentrantLock(true);

        public EtcdWatcher(ChildListener listener) {
            this.listener = listener;
        }

        public void onNext(WatchResponse response) {
            if (!JEtcdClient.this.isConnected()) {
                return;
            }
            this.watchId = response.getWatchId();
            if (this.listener != null) {
                int modified = 0;
                String service = null;
                for (Event event : response.getEventsList()) {
                    switch (event.getType()) {
                        case PUT: {
                            service = this.find(event);
                            if (service == null || !this.safeUpdate(service, true)) break;
                            ++modified;
                            break;
                        }
                        case DELETE: {
                            service = this.find(event);
                            if (service == null || !this.safeUpdate(service, false)) break;
                            ++modified;
                            break;
                        }
                    }
                }
                if (modified > 0) {
                    JEtcdClient.this.notifyExecutor.execute(() -> this.listener.childChanged(this.path, new ArrayList<String>(this.urls)));
                }
            }
        }

        public void onError(Throwable e) {
            this.tryReconnect(e);
        }

        public void unwatch() {
            if (!JEtcdClient.this.isConnected()) {
                return;
            }
            try {
                if (this.watchRequest != null) {
                    WatchCancelRequest watchCancelRequest = WatchCancelRequest.newBuilder().setWatchId(this.watchId).build();
                    WatchRequest cancelRequest = WatchRequest.newBuilder().setCancelRequest(watchCancelRequest).build();
                    this.watchRequest.onNext((Object)cancelRequest);
                }
            }
            catch (Exception ignored) {
                JEtcdClient.this.logger.warn("Failed to cancel watch for path '" + this.path + "'", (Throwable)ignored);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public List<String> forPath(String path) {
            if (!JEtcdClient.this.isConnected()) {
                throw new ClosedClientException("watch client has been closed, path '" + path + "'");
            }
            if (this.path != null) {
                this.unwatch();
            }
            this.path = path;
            this.lock.lock();
            try {
                this.watchStub = WatchGrpc.newStub((Channel)JEtcdClient.this.clientWrapper.getChannel());
                this.watchRequest = this.watchStub.watch((StreamObserver)this);
                this.watchRequest.onNext((Object)this.nextRequest());
                List<String> children = JEtcdClient.this.clientWrapper.getChildren(path);
                if (!children.isEmpty()) {
                    this.urls.addAll(this.filterChildren(children));
                }
                ArrayList<String> arrayList = new ArrayList<String>(this.urls);
                return arrayList;
            }
            finally {
                this.lock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean safeUpdate(String service, boolean add) {
            this.lock.lock();
            try {
                boolean bl = add ? this.urls.add(service) : this.urls.remove(service);
                return bl;
            }
            finally {
                this.lock.unlock();
            }
        }

        private String find(Event event) {
            int len;
            KeyValue keyValue = event.getKv();
            String key = keyValue.getKey().toStringUtf8();
            int index = len = this.path.length();
            int count = 0;
            if (key.length() >= index) {
                while ((index = key.indexOf("/", index)) != -1 && count++ <= 1) {
                    ++index;
                }
            }
            if (count == 1) {
                return key.substring(len + 1);
            }
            return null;
        }

        private List<String> filterChildren(List<String> children) {
            if (children == null) {
                return Collections.emptyList();
            }
            if (children.size() <= 0) {
                return children;
            }
            int len = this.path.length();
            return ((Stream)children.stream().parallel()).filter(child -> {
                int index = len;
                int count = 0;
                if (child.length() > len) {
                    while ((index = child.indexOf("/", index)) != -1 && count++ <= 1) {
                        ++index;
                    }
                }
                return count == 1;
            }).map(child -> child.substring(len + 1)).collect(Collectors.toList());
        }

        protected WatchRequest nextRequest() {
            WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder().setKey(ByteString.copyFromUtf8((String)this.path)).setRangeEnd(ByteString.copyFrom((byte[])OptionUtil.prefixEndOf(ByteSequence.from((String)this.path, (Charset)JEtcdClientWrapper.UTF_8)).getBytes())).setProgressNotify(true);
            return WatchRequest.newBuilder().setCreateRequest(builder).build();
        }

        public void tryReconnect(Throwable e) {
            this.throwable = e;
            JEtcdClient.this.logger.error("watcher client has error occurred, current path '" + this.path + "'", e);
            if (!JEtcdClient.this.isConnected()) {
                return;
            }
            Status status = Status.fromThrowable((Throwable)e);
            if (OptionUtil.isHaltError(status) || OptionUtil.isNoLeaderError(status)) {
                JEtcdClient.this.reconnectSchedule.schedule(this::reconnect, (long)new Random().nextInt(JEtcdClient.this.delayPeriod), TimeUnit.MILLISECONDS);
                return;
            }
            JEtcdClient.this.reconnectSchedule.schedule(this::reconnect, (long)new Random().nextInt(JEtcdClient.this.delayPeriod), TimeUnit.MILLISECONDS);
        }

        protected synchronized void reconnect() {
            this.closeWatchRequest();
            this.recreateWatchRequest();
        }

        protected void recreateWatchRequest() {
            if (this.watchRequest == null) {
                this.watchStub = WatchGrpc.newStub((Channel)JEtcdClient.this.clientWrapper.getChannel());
                this.watchRequest = this.watchStub.watch((StreamObserver)this);
            }
            this.watchRequest.onNext((Object)this.nextRequest());
            this.throwable = null;
            JEtcdClient.this.logger.warn("watch client retried connect for path '" + this.path + "', connection status : " + JEtcdClient.this.isConnected());
        }

        protected void closeWatchRequest() {
            if (this.watchRequest == null) {
                return;
            }
            try {
                WatchCancelRequest watchCancelRequest = WatchCancelRequest.newBuilder().setWatchId(this.watchId).build();
                WatchRequest cancelRequest = WatchRequest.newBuilder().setCancelRequest(watchCancelRequest).build();
                this.watchRequest.onNext((Object)cancelRequest);
            }
            finally {
                this.watchRequest.onCompleted();
                this.watchRequest = null;
            }
        }

        public void onCompleted() {
        }
    }
}

