/*
 * Decompiled with CFR 0.152.
 */
package io.etcd.jetcd.impl;

import com.google.common.base.Strings;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.api.VertxWatchGrpc;
import io.etcd.jetcd.api.WatchCreateRequest;
import io.etcd.jetcd.api.WatchProgressRequest;
import io.etcd.jetcd.api.WatchRequest;
import io.etcd.jetcd.api.WatchResponse;
import io.etcd.jetcd.common.exception.ErrorCode;
import io.etcd.jetcd.common.exception.EtcdException;
import io.etcd.jetcd.common.exception.EtcdExceptionFactory;
import io.etcd.jetcd.impl.ClientConnectionManager;
import io.etcd.jetcd.impl.Impl;
import io.etcd.jetcd.options.OptionsUtil;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.support.Errors;
import io.etcd.jetcd.support.Util;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import io.vertx.grpc.stub.GrpcWriteStream;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class WatchImpl
extends Impl
implements Watch {
    private static final Logger LOG = LoggerFactory.getLogger(WatchImpl.class);
    private final Object lock = new Object();
    private final VertxWatchGrpc.WatchVertxStub stub;
    private final ListeningScheduledExecutorService executor;
    private final AtomicBoolean closed;
    private final List<WatcherImpl> watchers;
    private final ByteSequence namespace;

    WatchImpl(ClientConnectionManager connectionManager) {
        super(connectionManager);
        this.stub = connectionManager.newStub(VertxWatchGrpc::newVertxStub);
        this.executor = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1, Util.createThreadFactory("jetcd-watch-", true)));
        this.closed = new AtomicBoolean();
        this.watchers = new CopyOnWriteArrayList<WatcherImpl>();
        this.namespace = connectionManager.getNamespace();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Watch.Watcher watch(ByteSequence key, WatchOption option, Watch.Listener listener) {
        WatcherImpl impl;
        if (this.closed.get()) {
            throw EtcdExceptionFactory.newClosedWatchClientException();
        }
        Object object = this.lock;
        synchronized (object) {
            impl = new WatcherImpl(key, option, listener);
            impl.resume();
            this.watchers.add(impl);
        }
        return impl;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            Object object = this.lock;
            synchronized (object) {
                this.executor.shutdownNow();
                this.watchers.forEach(Watch.Watcher::close);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestProgress() {
        if (!this.closed.get()) {
            Object object = this.lock;
            synchronized (object) {
                this.watchers.forEach(Watch.Watcher::requestProgress);
            }
        }
    }

    final class WatcherImpl
    implements Watch.Watcher {
        private final ByteSequence key;
        private final WatchOption option;
        private final Watch.Listener listener;
        private final AtomicBoolean closed;
        private final AtomicReference<WriteStream<WatchRequest>> wstream;
        private final AtomicBoolean started;
        private long revision;
        private long id;

        WatcherImpl(ByteSequence key, WatchOption option, Watch.Listener listener) {
            this.key = key;
            this.option = option;
            this.listener = listener;
            this.closed = new AtomicBoolean();
            this.started = new AtomicBoolean();
            this.wstream = new AtomicReference();
            this.id = -1L;
            this.revision = this.option.getRevision();
        }

        @Override
        public boolean isClosed() {
            return this.closed.get() || WatchImpl.this.closed.get();
        }

        void resume() {
            if (this.isClosed()) {
                return;
            }
            if (this.started.compareAndSet(false, true)) {
                this.id = -1L;
                WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder().setKey(Util.prefixNamespace(this.key, WatchImpl.this.namespace)).setPrevKv(this.option.isPrevKV()).setProgressNotify(this.option.isProgressNotify()).setStartRevision(this.revision);
                this.option.getEndKey().map(endKey -> Util.prefixNamespaceToRangeEnd(endKey, WatchImpl.this.namespace)).ifPresent(builder::setRangeEnd);
                if (this.option.getEndKey().isEmpty() && this.option.isPrefix()) {
                    ByteSequence endKey2 = OptionsUtil.prefixEndOf(this.key);
                    builder.setRangeEnd(Util.prefixNamespaceToRangeEnd(endKey2, WatchImpl.this.namespace));
                }
                if (this.option.isNoDelete()) {
                    builder.addFilters(WatchCreateRequest.FilterType.NODELETE);
                }
                if (this.option.isNoPut()) {
                    builder.addFilters(WatchCreateRequest.FilterType.NOPUT);
                }
                ReadStream<WatchResponse> readStream = Util.applyRequireLeader(this.option.withRequireLeader(), WatchImpl.this.stub).watchWithHandler(stream -> {
                    this.wstream.set((WriteStream<WatchRequest>)stream);
                    stream.write(WatchRequest.newBuilder().setCreateRequest(builder).build());
                }, this::onNext, event -> this.onCompleted(), this::onError);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            Object object = WatchImpl.this.lock;
            synchronized (object) {
                if (this.closed.compareAndSet(false, true)) {
                    GrpcWriteStream gws;
                    StreamObserver observer;
                    WriteStream<WatchRequest> ws;
                    if (this.wstream.get() != null && (ws = this.wstream.get()) instanceof GrpcWriteStream && (observer = (gws = (GrpcWriteStream)ws).streamObserver()) instanceof ClientCallStreamObserver) {
                        ClientCallStreamObserver callObs = (ClientCallStreamObserver)observer;
                        callObs.cancel("Watcher cancelled", null);
                    }
                    this.id = -1L;
                    this.listener.onCompleted();
                    WatchImpl.this.watchers.remove(this);
                }
            }
        }

        @Override
        public void requestProgress() {
            if (!this.closed.get() && this.wstream.get() != null) {
                WatchProgressRequest watchProgressRequest = WatchProgressRequest.newBuilder().build();
                this.wstream.get().write(WatchRequest.newBuilder().setProgressRequest(watchProgressRequest).build());
            }
        }

        private void onNext(WatchResponse response) {
            if (this.closed.get()) {
                return;
            }
            if (response.getCreated() && response.getCanceled() && response.getCancelReason() != null && (response.getCancelReason().contains("etcdserver: permission denied") || response.getCancelReason().contains("etcdserver: invalid auth token"))) {
                WatchImpl.this.connectionManager().authCredential().refresh();
                Status error = Status.Code.CANCELLED.toStatus().withDescription(response.getCancelReason());
                this.handleError(EtcdExceptionFactory.toEtcdException(error), true);
            } else if (response.getCreated()) {
                if (response.getWatchId() == -1L) {
                    this.listener.onError(EtcdExceptionFactory.newEtcdException(ErrorCode.INTERNAL, "etcd server failed to create watch id"));
                    return;
                }
                this.revision = Math.max(this.revision, response.getHeader().getRevision());
                this.id = response.getWatchId();
                if (this.option.isCreatedNotify()) {
                    this.listener.onNext(new io.etcd.jetcd.watch.WatchResponse(response));
                }
            } else if (response.getCanceled()) {
                String reason = response.getCancelReason();
                EtcdException error = response.getCompactRevision() != 0L ? EtcdExceptionFactory.newCompactedException(response.getCompactRevision()) : (Strings.isNullOrEmpty(reason) ? EtcdExceptionFactory.newEtcdException(ErrorCode.OUT_OF_RANGE, "etcdserver: mvcc: required revision is a future revision") : EtcdExceptionFactory.newEtcdException(ErrorCode.FAILED_PRECONDITION, reason));
                this.handleError(EtcdExceptionFactory.toEtcdException(error), false);
            } else if (io.etcd.jetcd.watch.WatchResponse.isProgressNotify(response)) {
                this.listener.onNext(new io.etcd.jetcd.watch.WatchResponse(response));
                this.revision = Math.max(this.revision, response.getHeader().getRevision());
            } else if (response.getEventsCount() == 0 && this.option.isProgressNotify()) {
                this.listener.onNext(new io.etcd.jetcd.watch.WatchResponse(response, WatchImpl.this.namespace));
                this.revision = response.getHeader().getRevision();
            } else if (response.getEventsCount() > 0) {
                this.listener.onNext(new io.etcd.jetcd.watch.WatchResponse(response, WatchImpl.this.namespace));
                this.revision = response.getEvents(response.getEventsCount() - 1).getKv().getModRevision() + 1L;
            }
        }

        private void onCompleted() {
            this.listener.onCompleted();
        }

        private void onError(Throwable t) {
            this.handleError(EtcdExceptionFactory.toEtcdException(t), this.shouldReschedule(Status.fromThrowable(t)));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handleError(EtcdException etcdException, boolean shouldReschedule) {
            Object object = WatchImpl.this.lock;
            synchronized (object) {
                if (this.isClosed()) {
                    return;
                }
                this.listener.onError(etcdException);
                if (this.wstream.get() != null) {
                    this.wstream.get().end();
                }
                this.wstream.set(null);
                this.started.set(false);
            }
            if (shouldReschedule) {
                if (etcdException.getMessage().contains("etcdserver: permission denied")) {
                    WatchImpl.this.connectionManager().authCredential().refresh();
                }
                this.reschedule();
                return;
            }
            this.close();
        }

        private boolean shouldReschedule(Status status) {
            return !Errors.isHaltError(status) && !Errors.isNoLeaderError(status);
        }

        private void reschedule() {
            Futures.addCallback(WatchImpl.this.executor.schedule(this::resume, 500L, TimeUnit.MILLISECONDS), new FutureCallback<Object>(){

                @Override
                public void onFailure(Throwable t) {
                    LOG.warn("scheduled resume failed", t);
                }

                @Override
                public void onSuccess(Object result) {
                }
            }, WatchImpl.this.executor);
        }
    }
}

