/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.AbstractApiService;
import com.google.cloud.pubsublite.ErrorCodes;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.RetryingConnection;
import com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver;
import com.google.cloud.pubsublite.internal.wire.SingleConnectionFactory;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.common.flogger.GoogleLogger;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Duration;

class RetryingConnectionImpl<StreamRequestT, StreamResponseT, ClientResponseT, ConnectionT extends AutoCloseable>
extends AbstractApiService
implements RetryingConnection<ConnectionT>,
StreamObserver<ClientResponseT> {
    private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
    private static final Duration INITIAL_RECONNECT_BACKOFF_TIME = Duration.ofMillis((long)10L);
    private static final Duration MAX_RECONNECT_BACKOFF_TIME = Duration.ofSeconds((long)10L);
    private final StreamFactory<StreamRequestT, StreamResponseT> streamFactory;
    private final SingleConnectionFactory<StreamRequestT, StreamResponseT, ClientResponseT, ConnectionT> connectionFactory;
    private final StreamRequestT initialRequest;
    private final RetryingConnectionObserver<ClientResponseT> observer;
    private final ScheduledExecutorService systemExecutor;
    private final CloseableMonitor connectionMonitor = new CloseableMonitor();
    @GuardedBy(value="connectionMonitor.monitor")
    private long nextRetryBackoffDuration = INITIAL_RECONNECT_BACKOFF_TIME.toMillis();
    @GuardedBy(value="connectionMonitor.monitor")
    private ConnectionT currentConnection;
    @GuardedBy(value="connectionMonitor.monitor")
    private boolean completed = false;

    RetryingConnectionImpl(StreamFactory<StreamRequestT, StreamResponseT> streamFactory, SingleConnectionFactory<StreamRequestT, StreamResponseT, ClientResponseT, ConnectionT> connectionFactory, StreamRequestT initialRequest, RetryingConnectionObserver<ClientResponseT> observer) {
        this.streamFactory = streamFactory;
        this.connectionFactory = connectionFactory;
        this.initialRequest = initialRequest;
        this.observer = observer;
        this.systemExecutor = Executors.newSingleThreadScheduledExecutor();
    }

    protected void doStart() {
        this.reinitialize();
        this.notifyStarted();
    }

    @Override
    public void reinitialize() {
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            if (this.completed) {
                return;
            }
            this.currentConnection = this.connectionFactory.New(this.streamFactory, this, this.initialRequest);
        }
    }

    protected void doStop() {
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            this.completed = true;
            this.currentConnection.close();
        }
        catch (Exception e) {
            this.notifyFailed(e);
            return;
        }
        this.systemExecutor.shutdownNow();
        this.notifyStopped();
    }

    @Override
    public void modifyConnection(RetryingConnection.Modifier<ConnectionT> modifier) throws StatusException {
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            if (this.completed) {
                modifier.modify(Optional.empty());
            } else {
                modifier.modify(Optional.of(this.currentConnection));
            }
        }
    }

    void setPermanentError(Throwable error) {
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            if (this.completed) {
                return;
            }
            this.completed = true;
        }
        this.notifyFailed(error);
    }

    public final void onNext(ClientResponseT value) {
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            if (this.completed) {
                return;
            }
            this.nextRetryBackoffDuration = INITIAL_RECONNECT_BACKOFF_TIME.toMillis();
        }
        Status status = this.observer.onClientResponse(value);
        if (!status.isOk()) {
            this.setPermanentError((Throwable)status.asRuntimeException());
        }
    }

    public final void onError(Throwable t) {
        Optional<Status> statusOr = ExtractStatus.extract(t);
        if (!statusOr.isPresent()) {
            this.setPermanentError(t);
            return;
        }
        if (!ErrorCodes.IsRetryable(statusOr.get().getCode())) {
            this.setPermanentError((Throwable)statusOr.get().asRuntimeException());
            return;
        }
        Optional<Object> throwable = Optional.empty();
        long backoffTime = 0L;
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            this.currentConnection.close();
            backoffTime = this.nextRetryBackoffDuration;
            this.nextRetryBackoffDuration = Math.min(backoffTime * 2L, MAX_RECONNECT_BACKOFF_TIME.toMillis());
        }
        catch (Exception e) {
            throwable = Optional.of(e);
        }
        if (throwable.isPresent()) {
            this.setPermanentError((Throwable)Status.FAILED_PRECONDITION.withCause((Throwable)throwable.get()).withDescription("Failed to close preexisting stream after error.").asRuntimeException());
            return;
        }
        ((GoogleLogger.Api)((GoogleLogger.Api)logger.atFine()).withCause(t)).log("Stream disconnected attempting retry, after %s milliseconds", backoffTime);
        ScheduledFuture<?> retry = this.systemExecutor.schedule(this.observer::triggerReinitialize, backoffTime, TimeUnit.MILLISECONDS);
    }

    public final void onCompleted() {
        boolean expectedCompletion;
        ((GoogleLogger.Api)logger.atFine()).log("Stream completed for %s.", this.initialRequest);
        try (CloseableMonitor.Hold h = this.connectionMonitor.enter();){
            expectedCompletion = this.completed;
        }
        if (!expectedCompletion) {
            this.setPermanentError((Throwable)Status.FAILED_PRECONDITION.withDescription("Server unexpectedly closed stream.").asRuntimeException());
        }
    }
}

