/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.v1;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsub.v1.MessageDispatcher;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.StatusUtil;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import com.google.pubsub.v1.SubscriberGrpc;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

final class StreamingSubscriberConnection
extends AbstractApiService
implements MessageDispatcher.AckProcessor {
    private static final Logger logger = Logger.getLogger(StreamingSubscriberConnection.class.getName());
    private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis((long)100L);
    private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds((long)10L);
    private static final int MAX_PER_REQUEST_CHANGES = 1000;
    private static final Duration UNARY_TIMEOUT = Duration.ofSeconds((long)60L);
    private final SubscriberGrpc.SubscriberStub stub;
    private final String subscription;
    private final ScheduledExecutorService systemExecutor;
    private final MessageDispatcher messageDispatcher;
    private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
    private final Lock lock = new ReentrantLock();
    private ClientCallStreamObserver<StreamingPullRequest> requestObserver;

    public StreamingSubscriberConnection(String subscription, MessageReceiver receiver, Duration ackExpirationPadding, Duration maxAckExtensionPeriod, Distribution ackLatencyDistribution, SubscriberGrpc.SubscriberStub stub, FlowController flowController, Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches, ScheduledExecutorService executor, ScheduledExecutorService systemExecutor, ApiClock clock) {
        this.subscription = subscription;
        this.systemExecutor = systemExecutor;
        this.stub = stub;
        this.messageDispatcher = new MessageDispatcher(receiver, this, ackExpirationPadding, maxAckExtensionPeriod, ackLatencyDistribution, flowController, outstandingMessageBatches, executor, systemExecutor, clock);
    }

    protected void doStart() {
        logger.config("Starting subscriber.");
        this.messageDispatcher.start();
        this.initialize();
        this.notifyStarted();
    }

    protected void doStop() {
        this.messageDispatcher.stop();
        this.lock.lock();
        try {
            this.requestObserver.onError((Throwable)Status.CANCELLED.asException());
        }
        finally {
            this.lock.unlock();
            this.notifyStopped();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initialize() {
        SettableApiFuture errorFuture = SettableApiFuture.create();
        StreamingPullResponseObserver responseObserver = new StreamingPullResponseObserver((SettableApiFuture<Void>)errorFuture);
        ClientCallStreamObserver requestObserver = (ClientCallStreamObserver)this.stub.streamingPull((StreamObserver)responseObserver);
        logger.log(Level.FINER, "Initializing stream to subscription {0}", this.subscription);
        requestObserver.onNext((Object)StreamingPullRequest.newBuilder().setSubscription(this.subscription).setStreamAckDeadlineSeconds(60).build());
        requestObserver.request(1);
        this.lock.lock();
        try {
            this.requestObserver = requestObserver;
        }
        finally {
            this.lock.unlock();
        }
        ApiFutures.addCallback((ApiFuture)errorFuture, (ApiFutureCallback)new ApiFutureCallback<Void>(){

            public void onSuccess(@Nullable Void result) {
                if (!StreamingSubscriberConnection.this.isAlive()) {
                    return;
                }
                StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
                StreamingSubscriberConnection.this.initialize();
            }

            public void onFailure(Throwable cause) {
                if (!StreamingSubscriberConnection.this.isAlive()) {
                    logger.log(Level.FINE, "pull failure after service no longer running", cause);
                    return;
                }
                if (!StatusUtil.isRetryable(cause)) {
                    ApiException gaxException = ApiExceptionFactory.createException((Throwable)cause, (StatusCode)GrpcStatusCode.of((Status.Code)Status.fromThrowable((Throwable)cause).getCode()), (boolean)false);
                    logger.log(Level.SEVERE, "terminated streaming with exception", (Throwable)gaxException);
                    StreamingSubscriberConnection.this.notifyFailed((Throwable)gaxException);
                    return;
                }
                logger.log(Level.FINE, "stream closed with retryable exception; will reconnect", cause);
                long backoffMillis = StreamingSubscriberConnection.this.channelReconnectBackoffMillis.get();
                long newBackoffMillis = Math.min(backoffMillis * 2L, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis());
                StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(newBackoffMillis);
                StreamingSubscriberConnection.this.systemExecutor.schedule(new Runnable(){

                    @Override
                    public void run() {
                        StreamingSubscriberConnection.this.initialize();
                    }
                }, backoffMillis, TimeUnit.MILLISECONDS);
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    private boolean isAlive() {
        ApiService.State state = this.state();
        return state == ApiService.State.RUNNING || state == ApiService.State.STARTING;
    }

    @Override
    public void sendAckOperations(List<String> acksToSend, List<MessageDispatcher.PendingModifyAckDeadline> ackDeadlineExtensions) {
        SubscriberGrpc.SubscriberStub timeoutStub = (SubscriberGrpc.SubscriberStub)this.stub.withDeadlineAfter(UNARY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        StreamObserver<Empty> loggingObserver = new StreamObserver<Empty>(){

            public void onCompleted() {
            }

            public void onNext(Empty e) {
            }

            public void onError(Throwable t) {
                Level level = StreamingSubscriberConnection.this.isAlive() ? Level.WARNING : Level.FINER;
                logger.log(level, "failed to send operations", t);
            }
        };
        for (MessageDispatcher.PendingModifyAckDeadline modack : ackDeadlineExtensions) {
            for (List idChunk : Lists.partition(modack.ackIds, (int)1000)) {
                timeoutStub.modifyAckDeadline(ModifyAckDeadlineRequest.newBuilder().setSubscription(this.subscription).addAllAckIds((Iterable)idChunk).setAckDeadlineSeconds(modack.deadlineExtensionSeconds).build(), (StreamObserver)loggingObserver);
            }
        }
        for (List idChunk : Lists.partition(acksToSend, (int)1000)) {
            timeoutStub.acknowledge(AcknowledgeRequest.newBuilder().setSubscription(this.subscription).addAllAckIds((Iterable)idChunk).build(), (StreamObserver)loggingObserver);
        }
    }

    @InternalApi
    static List<StreamingPullRequest> partitionAckOperations(List<String> acksToSend, List<MessageDispatcher.PendingModifyAckDeadline> ackDeadlineExtensions, int size) {
        int numExtensions = 0;
        for (MessageDispatcher.PendingModifyAckDeadline modify : ackDeadlineExtensions) {
            numExtensions += modify.ackIds.size();
        }
        int numChanges = Math.max(numExtensions, acksToSend.size());
        int numRequests = numChanges / size + (numChanges % size == 0 ? 0 : 1);
        ArrayList<StreamingPullRequest.Builder> requests = new ArrayList<StreamingPullRequest.Builder>(numRequests);
        for (int i = 0; i < numRequests; ++i) {
            requests.add(StreamingPullRequest.newBuilder());
        }
        int reqCount = 0;
        for (Object acksChunk : Lists.partition(acksToSend, (int)size)) {
            ((StreamingPullRequest.Builder)requests.get(reqCount)).addAllAckIds((Iterable)acksChunk);
            ++reqCount;
        }
        reqCount = 0;
        int ackCount = 0;
        for (MessageDispatcher.PendingModifyAckDeadline modify : ackDeadlineExtensions) {
            for (String ackId : modify.ackIds) {
                ((StreamingPullRequest.Builder)requests.get(reqCount)).addModifyDeadlineSeconds(modify.deadlineExtensionSeconds).addModifyDeadlineAckIds(ackId);
                if (++ackCount != size) continue;
                ++reqCount;
                ackCount = 0;
            }
        }
        ArrayList<StreamingPullRequest> ret = new ArrayList<StreamingPullRequest>(requests.size());
        for (StreamingPullRequest.Builder builder : requests) {
            ret.add(builder.build());
        }
        return ret;
    }

    private class StreamingPullResponseObserver
    implements ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> {
        final SettableApiFuture<Void> errorFuture;
        ClientCallStreamObserver<StreamingPullRequest> thisRequestObserver;

        StreamingPullResponseObserver(SettableApiFuture<Void> errorFuture) {
            this.errorFuture = errorFuture;
        }

        public void beforeStart(ClientCallStreamObserver<StreamingPullRequest> requestObserver) {
            this.thisRequestObserver = requestObserver;
            requestObserver.disableAutoInboundFlowControl();
        }

        public void onNext(StreamingPullResponse response) {
            StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
            StreamingSubscriberConnection.this.messageDispatcher.processReceivedMessages(response.getReceivedMessagesList(), new Runnable(){

                @Override
                public void run() {
                    if (StreamingSubscriberConnection.this.isAlive() && !StreamingPullResponseObserver.this.errorFuture.isDone()) {
                        StreamingSubscriberConnection.this.lock.lock();
                        try {
                            StreamingPullResponseObserver.this.thisRequestObserver.request(1);
                        }
                        catch (Exception e) {
                            logger.log(Level.WARNING, "cannot request more messages", e);
                        }
                        finally {
                            StreamingSubscriberConnection.this.lock.unlock();
                        }
                    }
                }
            });
        }

        public void onError(Throwable t) {
            this.errorFuture.setException(t);
        }

        public void onCompleted() {
            logger.fine("Streaming pull terminated successfully!");
            this.errorFuture.set(null);
        }
    }
}

