/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiService;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.SerialExecutor;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriber;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriberFactory;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriberImpl;
import com.google.cloud.pubsublite.internal.wire.FlowControlBatcher;
import com.google.cloud.pubsublite.internal.wire.NextOffsetTracker;
import com.google.cloud.pubsublite.internal.wire.ResetSignal;
import com.google.cloud.pubsublite.internal.wire.RetryingConnection;
import com.google.cloud.pubsublite.internal.wire.RetryingConnectionImpl;
import com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver;
import com.google.cloud.pubsublite.internal.wire.StreamFactories;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberResetHandler;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.InitialSubscribeRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SubscribeRequest;
import com.google.cloud.pubsublite.proto.SubscribeResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.GoogleLogger;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;

public class SubscriberImpl
extends ProxyService
implements Subscriber,
RetryingConnectionObserver<List<SequencedMessage>> {
    private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
    private static final Duration FLOW_REQUESTS_FLUSH_INTERVAL = Duration.ofMillis(100L);
    private final AlarmFactory alarmFactory;
    private final Consumer<List<SequencedMessage>> messageConsumer;
    private final SubscriberResetHandler resetHandler;
    private final InitialSubscribeRequest baseInitialRequest;
    private final boolean retryStreamRaces;
    private final SerialExecutor messageDeliveryExecutor;
    private final CloseableMonitor monitor = new CloseableMonitor();
    @GuardedBy(value="monitor.monitor")
    private Optional<Future<?>> alarmFuture = Optional.empty();
    @GuardedBy(value="monitor.monitor")
    private final RetryingConnection<SubscribeRequest, ConnectedSubscriber> connection;
    @GuardedBy(value="monitor.monitor")
    private final NextOffsetTracker nextOffsetTracker = new NextOffsetTracker();
    @GuardedBy(value="monitor.monitor")
    private final FlowControlBatcher flowControlBatcher = new FlowControlBatcher();
    @GuardedBy(value="monitor.monitor")
    private SeekRequest initialLocation;
    @GuardedBy(value="monitor.monitor")
    private boolean shutdown = false;

    @VisibleForTesting
    SubscriberImpl(StreamFactories.SubscribeStreamFactory streamFactory, ConnectedSubscriberFactory factory, AlarmFactory alarmFactory, InitialSubscribeRequest baseInitialRequest, SeekRequest initialLocation, Consumer<List<SequencedMessage>> messageConsumer, SubscriberResetHandler resetHandler, boolean retryStreamRaces) throws ApiException {
        super(new ApiService[0]);
        this.alarmFactory = alarmFactory;
        this.messageConsumer = messageConsumer;
        this.resetHandler = resetHandler;
        this.baseInitialRequest = baseInitialRequest;
        this.retryStreamRaces = retryStreamRaces;
        this.messageDeliveryExecutor = new SerialExecutor(SystemExecutors.getFuturesExecutor());
        this.initialLocation = initialLocation;
        this.connection = new RetryingConnectionImpl<SubscribeRequest, SubscribeResponse, List<SequencedMessage>, ConnectedSubscriber>(streamFactory, factory, this, this.getInitialRequest());
        this.addServices(this.connection);
    }

    public SubscriberImpl(StreamFactories.SubscribeStreamFactory streamFactory, InitialSubscribeRequest baseInitialRequest, SeekRequest initialLocation, Consumer<List<SequencedMessage>> messageConsumer, SubscriberResetHandler resetHandler, boolean retryStreamRaces) throws ApiException {
        this(streamFactory, new ConnectedSubscriberImpl.Factory(), AlarmFactory.create(FLOW_REQUESTS_FLUSH_INTERVAL), baseInitialRequest, initialLocation, messageConsumer, resetHandler, retryStreamRaces);
    }

    @Override
    protected void start() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.alarmFuture = Optional.of(this.alarmFactory.newAlarm(this::processBatchFlowRequest));
        }
    }

    @Override
    protected void stop() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.shutdown = true;
            this.alarmFuture.ifPresent(future -> future.cancel(false));
            this.alarmFuture = Optional.empty();
            this.messageDeliveryExecutor.close();
        }
    }

    @Override
    protected void handlePermanentError(CheckedApiException error) {
        this.stop();
    }

    @Override
    public void allowFlow(FlowControlRequest clientRequest) throws CheckedApiException {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            this.flowControlBatcher.onClientFlowRequest(clientRequest);
            if (this.flowControlBatcher.shouldExpediteBatchRequest()) {
                this.connection.modifyConnection(connectedSubscriber -> connectedSubscriber.ifPresent(this::flushBatchFlowRequest));
            }
        }
    }

    private SubscribeRequest getInitialRequest() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            SubscribeRequest subscribeRequest = SubscribeRequest.newBuilder().setInitial(this.baseInitialRequest.toBuilder().setInitialLocation(this.nextOffsetTracker.requestForRestart().orElse(this.initialLocation))).build();
            return subscribeRequest;
        }
    }

    public void reset() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            this.nextOffsetTracker.reset();
            this.initialLocation = SeekRequest.newBuilder().setNamedTarget(SeekRequest.NamedTarget.COMMITTED_CURSOR).build();
        }
    }

    @Override
    public void triggerReinitialize(CheckedApiException streamError) {
        if (!this.retryStreamRaces && ExtractStatus.getErrorInfoReason(streamError).equals("DUPLICATE_SUBSCRIBER_CONNECTIONS")) {
            this.onPermanentError(streamError);
            return;
        }
        if (ResetSignal.isResetSignal(streamError)) {
            try {
                this.messageDeliveryExecutor.waitUntilInactive();
                if (this.resetHandler.handleReset()) {
                    this.reset();
                }
            }
            catch (CheckedApiException e) {
                this.onPermanentError(e);
                return;
            }
        }
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            this.connection.reinitialize(this.getInitialRequest());
            this.connection.modifyConnection(connectedSubscriber -> {
                CheckedApiPreconditions.checkArgument(this.monitor.monitor.isOccupiedByCurrentThread());
                CheckedApiPreconditions.checkArgument(connectedSubscriber.isPresent());
                this.flowControlBatcher.requestForRestart().ifPresent(request -> ((ConnectedSubscriber)connectedSubscriber.get()).allowFlow((FlowControlRequest)request));
            });
        }
        catch (CheckedApiException e) {
            this.onPermanentError(e);
        }
    }

    @Override
    public void onClientResponse(List<SequencedMessage> messages) throws CheckedApiException {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            this.nextOffsetTracker.onMessages(messages);
            this.flowControlBatcher.onMessages(messages);
            this.messageDeliveryExecutor.execute(() -> {
                try {
                    this.messageConsumer.accept(messages);
                }
                catch (Throwable t) {
                    ((GoogleLogger.Api)((GoogleLogger.Api)logger.atWarning()).withCause(t)).log("Consumer threw an exception- failing subscriber. %s", (Object)this.baseInitialRequest);
                    this.onPermanentError(ExtractStatus.toCanonical(t));
                }
            });
        }
    }

    private void processBatchFlowRequest() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            this.connection.modifyConnection(connectedSubscriber -> connectedSubscriber.ifPresent(this::flushBatchFlowRequest));
        }
        catch (CheckedApiException e) {
            this.onPermanentError(e);
        }
    }

    private void flushBatchFlowRequest(ConnectedSubscriber subscriber) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.flowControlBatcher.releasePendingRequest().ifPresent(subscriber::allowFlow);
        }
    }
}

