/*
 * Decompiled with CFR 0.152.
 */
package io.vitess.client.grpc;

import io.vitess.client.grpc.RetryingInterceptorConfig;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.cdc.connectors.vitess.shaded.com.google.common.base.Preconditions;
import org.apache.flink.cdc.connectors.vitess.shaded.io.grpc.CallOptions;
import org.apache.flink.cdc.connectors.vitess.shaded.io.grpc.Channel;
import org.apache.flink.cdc.connectors.vitess.shaded.io.grpc.ClientCall;
import org.apache.flink.cdc.connectors.vitess.shaded.io.grpc.ClientInterceptor;
import org.apache.flink.cdc.connectors.vitess.shaded.io.grpc.Context;
import org.apache.flink.cdc.connectors.vitess.shaded.io.grpc.Metadata;
import org.apache.flink.cdc.connectors.vitess.shaded.io.grpc.MethodDescriptor;
import org.apache.flink.cdc.connectors.vitess.shaded.io.grpc.Status;
import org.apache.flink.cdc.connectors.vitess.shaded.io.grpc.internal.GrpcUtil;
import org.apache.flink.cdc.connectors.vitess.shaded.io.grpc.internal.SharedResourceHolder;

public class RetryingInterceptor
implements ClientInterceptor {
    private final RetryingInterceptorConfig config;

    RetryingInterceptor(RetryingInterceptorConfig config) {
        this.config = config;
    }

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
        if (method.getType() != MethodDescriptor.MethodType.UNARY || this.config.isDisabled()) {
            return next.newCall(method, callOptions);
        }
        return new RetryingCall<ReqT, RespT>(method, callOptions, next, Context.current(), this.config);
    }

    private class RetryingCall<ReqT, RespT>
    extends ClientCall<ReqT, RespT> {
        private final MethodDescriptor<ReqT, RespT> method;
        private final CallOptions callOptions;
        private final Channel channel;
        private final Context context;
        private final ScheduledExecutorService scheduledExecutor;
        private ClientCall.Listener<RespT> responseListener;
        private Metadata requestHeaders;
        private ReqT requestMessage;
        private boolean compressionEnabled;
        private final Queue<AttemptListener> attemptListeners = new ConcurrentLinkedQueue<AttemptListener>();
        private volatile AttemptListener latestResponse;
        private volatile ScheduledFuture<?> retryTask;
        private volatile long nextBackoffMillis = 5L;
        private final long maxBackoffMillis;
        private final double backoffMultiplier;

        RetryingCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel channel, Context context, RetryingInterceptorConfig config) {
            this.method = method;
            this.callOptions = callOptions;
            this.channel = channel;
            this.context = context;
            this.nextBackoffMillis = config.getInitialBackoffMillis();
            this.maxBackoffMillis = config.getMaxBackoffMillis();
            this.backoffMultiplier = config.getBackoffMultiplier();
            this.scheduledExecutor = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
        }

        @Override
        public void start(ClientCall.Listener<RespT> listener, Metadata headers) {
            Preconditions.checkState(this.attemptListeners.isEmpty());
            Preconditions.checkState(this.responseListener == null);
            Preconditions.checkState(this.requestHeaders == null);
            this.responseListener = listener;
            this.requestHeaders = headers;
            ClientCall<ReqT, RespT> firstCall = this.channel.newCall(this.method, this.callOptions);
            AttemptListener attemptListener = new AttemptListener(firstCall);
            this.attemptListeners.add(attemptListener);
            firstCall.start(attemptListener, headers);
        }

        @Override
        public void request(int numMessages) {
            this.lastCall().request(numMessages);
        }

        @Override
        public void cancel(@Nullable String message, @Nullable Throwable cause) {
            for (AttemptListener attempt : this.attemptListeners) {
                attempt.call.cancel(message, cause);
            }
            if (this.retryTask != null) {
                this.retryTask.cancel(true);
            }
        }

        @Override
        public void halfClose() {
            this.lastCall().halfClose();
        }

        @Override
        public void sendMessage(ReqT message) {
            Preconditions.checkState(this.requestMessage == null);
            this.requestMessage = message;
            this.lastCall().sendMessage(message);
        }

        @Override
        public boolean isReady() {
            return this.lastCall().isReady();
        }

        @Override
        public void setMessageCompression(boolean enabled) {
            this.compressionEnabled = enabled;
            this.lastCall().setMessageCompression(enabled);
        }

        private long computeSleepTime() {
            long currentBackoff = this.nextBackoffMillis;
            this.nextBackoffMillis = Math.min((long)((double)currentBackoff * this.backoffMultiplier), this.maxBackoffMillis);
            return currentBackoff;
        }

        private void maybeRetry(AttemptListener attempt) {
            Status status = attempt.responseStatus;
            if (status.isOk() || status.getCode() != Status.Code.UNAVAILABLE) {
                this.useResponse(attempt);
                return;
            }
            long nextBackoffMillis = this.computeSleepTime();
            long deadlineMillis = Long.MIN_VALUE;
            if (this.callOptions.getDeadline() != null) {
                deadlineMillis = this.callOptions.getDeadline().timeRemaining(TimeUnit.MILLISECONDS);
            }
            if (deadlineMillis > Long.MIN_VALUE && deadlineMillis < nextBackoffMillis) {
                AttemptListener latest = this.latestResponse;
                if (latest != null) {
                    this.useResponse(latest);
                } else {
                    this.useResponse(attempt);
                }
                return;
            }
            this.latestResponse = attempt;
            this.retryTask = this.scheduledExecutor.schedule(this.context.wrap(new Runnable(){

                @Override
                public void run() {
                    ClientCall nextCall = RetryingCall.this.channel.newCall(RetryingCall.this.method, RetryingCall.this.callOptions);
                    AttemptListener nextAttemptListener = new AttemptListener(nextCall);
                    RetryingCall.this.attemptListeners.add(nextAttemptListener);
                    nextCall.start(nextAttemptListener, RetryingCall.this.requestHeaders);
                    nextCall.setMessageCompression(RetryingCall.this.compressionEnabled);
                    nextCall.sendMessage(RetryingCall.this.requestMessage);
                    nextCall.request(1);
                    nextCall.halfClose();
                }
            }), nextBackoffMillis, TimeUnit.MILLISECONDS);
        }

        private void useResponse(AttemptListener attempt) {
            this.responseListener.onHeaders(attempt.responseHeaders);
            if (attempt.responseMessage != null) {
                this.responseListener.onMessage(attempt.responseMessage);
            }
            this.responseListener.onClose(attempt.responseStatus, attempt.responseTrailers);
        }

        private ClientCall<ReqT, RespT> lastCall() {
            Preconditions.checkState(!this.attemptListeners.isEmpty());
            return this.attemptListeners.peek().call;
        }

        private class AttemptListener
        extends ClientCall.Listener<RespT> {
            final ClientCall<ReqT, RespT> call;
            Metadata responseHeaders;
            RespT responseMessage;
            Status responseStatus;
            Metadata responseTrailers;

            AttemptListener(ClientCall<ReqT, RespT> call) {
                this.call = call;
            }

            @Override
            public void onHeaders(Metadata headers) {
                this.responseHeaders = headers;
            }

            @Override
            public void onMessage(RespT message) {
                this.responseMessage = message;
            }

            @Override
            public void onClose(Status status, Metadata trailers) {
                this.responseStatus = status;
                this.responseTrailers = trailers;
                RetryingCall.this.maybeRetry(this);
            }

            @Override
            public void onReady() {
                RetryingCall.this.responseListener.onReady();
            }
        }
    }
}

