/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.v1;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiClock;
import com.google.api.core.ApiService;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.cloud.pubsub.v1.MessageDispatcher;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.StatusUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import com.google.pubsub.v1.SubscriberGrpc;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

final class StreamingSubscriberConnection
extends AbstractApiService
implements MessageDispatcher.AckProcessor {
    private static final Logger logger = Logger.getLogger(StreamingSubscriberConnection.class.getName());
    private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis((long)100L);
    private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds((long)10L);
    private static final int MAX_PER_REQUEST_CHANGES = 10000;
    private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
    private final SubscriberGrpc.SubscriberStub asyncStub;
    private final String subscription;
    private final ScheduledExecutorService executor;
    private final MessageDispatcher messageDispatcher;
    private ClientCallStreamObserver<StreamingPullRequest> requestObserver;

    public StreamingSubscriberConnection(String subscription, MessageReceiver receiver, Duration ackExpirationPadding, Duration maxAckExtensionPeriod, int streamAckDeadlineSeconds, Distribution ackLatencyDistribution, SubscriberGrpc.SubscriberStub asyncStub, FlowController flowController, Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches, ScheduledExecutorService executor, @Nullable ScheduledExecutorService alarmsExecutor, ApiClock clock) {
        this.subscription = subscription;
        this.executor = executor;
        this.asyncStub = asyncStub;
        this.messageDispatcher = new MessageDispatcher(receiver, this, ackExpirationPadding, maxAckExtensionPeriod, ackLatencyDistribution, flowController, outstandingMessageBatches, executor, alarmsExecutor, clock);
        this.messageDispatcher.setMessageDeadlineSeconds(streamAckDeadlineSeconds);
    }

    protected void doStart() {
        logger.config("Starting subscriber.");
        this.initialize();
        this.notifyStarted();
    }

    protected void doStop() {
        this.messageDispatcher.stop();
        this.requestObserver.onError((Throwable)Status.CANCELLED.asException());
        this.notifyStopped();
    }

    private void initialize() {
        SettableFuture errorFuture = SettableFuture.create();
        StreamingPullResponseObserver responseObserver = new StreamingPullResponseObserver((SettableFuture<Void>)errorFuture);
        ClientCallStreamObserver requestObserver = (ClientCallStreamObserver)this.asyncStub.streamingPull((StreamObserver)responseObserver);
        logger.log(Level.FINER, "Initializing stream to subscription {0} with deadline {1}", new Object[]{this.subscription, this.messageDispatcher.getMessageDeadlineSeconds()});
        requestObserver.onNext((Object)StreamingPullRequest.newBuilder().setSubscription(this.subscription).setStreamAckDeadlineSeconds(this.messageDispatcher.getMessageDeadlineSeconds()).build());
        requestObserver.request(1);
        Futures.addCallback((ListenableFuture)errorFuture, (FutureCallback)new FutureCallback<Void>(){

            public void onSuccess(@Nullable Void result) {
                if (!StreamingSubscriberConnection.this.isAlive()) {
                    return;
                }
                StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
                StreamingSubscriberConnection.this.initialize();
            }

            public void onFailure(Throwable cause) {
                if (!StreamingSubscriberConnection.this.isAlive()) {
                    logger.log(Level.FINE, "pull failure after service no longer running", cause);
                    return;
                }
                logger.log(Level.WARNING, "Terminated streaming with exception", cause);
                if (StatusUtil.isRetryable(cause)) {
                    long backoffMillis = StreamingSubscriberConnection.this.channelReconnectBackoffMillis.get();
                    long newBackoffMillis = Math.min(backoffMillis * 2L, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis());
                    StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(newBackoffMillis);
                    StreamingSubscriberConnection.this.executor.schedule(new Runnable(){

                        @Override
                        public void run() {
                            StreamingSubscriberConnection.this.initialize();
                        }
                    }, backoffMillis, TimeUnit.MILLISECONDS);
                } else {
                    StreamingSubscriberConnection.this.notifyFailed(cause);
                }
            }
        }, (Executor)this.executor);
    }

    private boolean isAlive() {
        return this.state() == ApiService.State.RUNNING || this.state() == ApiService.State.STARTING;
    }

    @Override
    public void sendAckOperations(List<String> acksToSend, List<MessageDispatcher.PendingModifyAckDeadline> ackDeadlineExtensions) {
        List<StreamingPullRequest> requests = StreamingSubscriberConnection.partitionAckOperations(acksToSend, ackDeadlineExtensions, 10000);
        for (StreamingPullRequest request : requests) {
            this.requestObserver.onNext((Object)request);
        }
    }

    @VisibleForTesting
    static List<StreamingPullRequest> partitionAckOperations(List<String> acksToSend, List<MessageDispatcher.PendingModifyAckDeadline> ackDeadlineExtensions, int size) {
        int numExtensions = 0;
        for (MessageDispatcher.PendingModifyAckDeadline modify : ackDeadlineExtensions) {
            numExtensions += modify.ackIds.size();
        }
        int numChanges = Math.max(numExtensions, acksToSend.size());
        int numRequests = numChanges / size + (numChanges % size == 0 ? 0 : 1);
        ArrayList<StreamingPullRequest.Builder> requests = new ArrayList<StreamingPullRequest.Builder>(numRequests);
        for (int i = 0; i < numRequests; ++i) {
            requests.add(StreamingPullRequest.newBuilder());
        }
        int reqCount = 0;
        for (List acksChunk : Lists.partition(acksToSend, (int)size)) {
            ((StreamingPullRequest.Builder)requests.get(reqCount)).addAllAckIds((Iterable)acksChunk);
            ++reqCount;
        }
        reqCount = 0;
        int ackCount = 0;
        for (MessageDispatcher.PendingModifyAckDeadline modify : ackDeadlineExtensions) {
            for (String ackId : modify.ackIds) {
                ((StreamingPullRequest.Builder)requests.get(reqCount)).addModifyDeadlineSeconds(modify.deadlineExtensionSeconds).addModifyDeadlineAckIds(ackId);
                if (++ackCount != size) continue;
                ++reqCount;
                ackCount = 0;
            }
        }
        ArrayList<StreamingPullRequest> ret = new ArrayList<StreamingPullRequest>(requests.size());
        for (StreamingPullRequest.Builder builder : requests) {
            ret.add(builder.build());
        }
        return ret;
    }

    public void updateStreamAckDeadline(int newAckDeadlineSeconds) {
        this.messageDispatcher.setMessageDeadlineSeconds(newAckDeadlineSeconds);
        this.requestObserver.onNext((Object)StreamingPullRequest.newBuilder().setStreamAckDeadlineSeconds(newAckDeadlineSeconds).build());
    }

    private class StreamingPullResponseObserver
    implements ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> {
        final SettableFuture<Void> errorFuture;

        StreamingPullResponseObserver(SettableFuture<Void> errorFuture) {
            this.errorFuture = errorFuture;
        }

        public void beforeStart(ClientCallStreamObserver<StreamingPullRequest> requestObserver) {
            StreamingSubscriberConnection.this.requestObserver = requestObserver;
            requestObserver.disableAutoInboundFlowControl();
        }

        public void onNext(StreamingPullResponse response) {
            StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
            StreamingSubscriberConnection.this.messageDispatcher.processReceivedMessages(response.getReceivedMessagesList(), new Runnable(){

                @Override
                public void run() {
                    if (StreamingSubscriberConnection.this.isAlive()) {
                        StreamingSubscriberConnection.this.requestObserver.request(1);
                    }
                }
            });
        }

        public void onError(Throwable t) {
            this.errorFuture.setException(t);
        }

        public void onCompleted() {
            logger.fine("Streaming pull terminated successfully!");
            this.errorFuture.set(null);
        }
    }
}

