/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.spi.v1;

import com.google.api.gax.grpc.FlowController;
import com.google.api.stats.Distribution;
import com.google.auth.Credentials;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.spi.v1.MessageDispatcher;
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
import com.google.cloud.pubsub.spi.v1.StatusUtil;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import com.google.pubsub.v1.SubscriberGrpc;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.joda.time.Duration;

final class StreamingSubscriberConnection
extends AbstractService
implements MessageDispatcher.AckProcessor {
    private static final Logger logger = Logger.getLogger(StreamingSubscriberConnection.class.getName());
    private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = new Duration(100L);
    private static final int MAX_PER_REQUEST_CHANGES = 10000;
    private Duration channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;
    private final Channel channel;
    private final Credentials credentials;
    private final String subscription;
    private final ScheduledExecutorService executor;
    private final MessageDispatcher messageDispatcher;
    private ClientCallStreamObserver<StreamingPullRequest> requestObserver;

    public StreamingSubscriberConnection(String subscription, Credentials credentials, MessageReceiver receiver, Duration ackExpirationPadding, int streamAckDeadlineSeconds, Distribution ackLatencyDistribution, Channel channel, FlowController flowController, ScheduledExecutorService executor, Clock clock) {
        this.subscription = subscription;
        this.executor = executor;
        this.credentials = credentials;
        this.channel = channel;
        this.messageDispatcher = new MessageDispatcher(receiver, this, ackExpirationPadding, ackLatencyDistribution, flowController, executor, clock);
        this.messageDispatcher.setMessageDeadlineSeconds(streamAckDeadlineSeconds);
    }

    protected void doStart() {
        logger.log(Level.INFO, "Starting subscriber.");
        this.initialize();
        this.notifyStarted();
    }

    protected void doStop() {
        this.messageDispatcher.stop();
        this.notifyStopped();
        this.requestObserver.onError((Throwable)Status.CANCELLED.asException());
    }

    private void initialize() {
        SettableFuture errorFuture = SettableFuture.create();
        StreamingPullResponseObserver responseObserver = new StreamingPullResponseObserver((SettableFuture<Void>)errorFuture);
        ClientCallStreamObserver requestObserver = (ClientCallStreamObserver)ClientCalls.asyncBidiStreamingCall((ClientCall)this.channel.newCall(SubscriberGrpc.METHOD_STREAMING_PULL, CallOptions.DEFAULT.withCallCredentials(MoreCallCredentials.from((Credentials)this.credentials))), (StreamObserver)responseObserver);
        logger.log(Level.INFO, "Initializing stream to subscription " + this.subscription + " with deadline " + this.messageDispatcher.getMessageDeadlineSeconds());
        requestObserver.onNext((Object)StreamingPullRequest.newBuilder().setSubscription(this.subscription).setStreamAckDeadlineSeconds(this.messageDispatcher.getMessageDeadlineSeconds()).build());
        requestObserver.request(1);
        Futures.addCallback((ListenableFuture)errorFuture, (FutureCallback)new FutureCallback<Void>(){

            public void onSuccess(@Nullable Void result) {
                StreamingSubscriberConnection.this.channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;
                StreamingSubscriberConnection.this.initialize();
            }

            public void onFailure(Throwable t) {
                Status errorStatus = Status.fromThrowable((Throwable)t);
                if (StatusUtil.isRetryable(errorStatus) && StreamingSubscriberConnection.this.isAlive()) {
                    long backoffMillis = StreamingSubscriberConnection.this.channelReconnectBackoff.getMillis();
                    StreamingSubscriberConnection.this.channelReconnectBackoff = StreamingSubscriberConnection.this.channelReconnectBackoff.plus(backoffMillis);
                    StreamingSubscriberConnection.this.executor.schedule(new Runnable(){

                        @Override
                        public void run() {
                            StreamingSubscriberConnection.this.initialize();
                        }
                    }, backoffMillis, TimeUnit.MILLISECONDS);
                } else if (StreamingSubscriberConnection.this.isAlive()) {
                    StreamingSubscriberConnection.this.notifyFailed(t);
                }
            }
        }, (Executor)this.executor);
    }

    private boolean isAlive() {
        return this.state() == Service.State.RUNNING || this.state() == Service.State.STARTING;
    }

    @Override
    public void sendAckOperations(List<String> acksToSend, List<MessageDispatcher.PendingModifyAckDeadline> ackDeadlineExtensions) {
        List ackChunks = Lists.partition(acksToSend, (int)10000);
        List modifyAckDeadlineChunks = Lists.partition(ackDeadlineExtensions, (int)10000);
        Iterator ackChunksIt = ackChunks.iterator();
        Iterator modifyAckDeadlineChunksIt = modifyAckDeadlineChunks.iterator();
        while (ackChunksIt.hasNext() || modifyAckDeadlineChunksIt.hasNext()) {
            StreamingPullRequest.Builder requestBuilder = StreamingPullRequest.newBuilder();
            if (modifyAckDeadlineChunksIt.hasNext()) {
                List modAckChunk = (List)modifyAckDeadlineChunksIt.next();
                for (MessageDispatcher.PendingModifyAckDeadline modifyAckDeadline : modAckChunk) {
                    for (String ackId : modifyAckDeadline.ackIds) {
                        requestBuilder.addModifyDeadlineSeconds(modifyAckDeadline.deadlineExtensionSeconds).addModifyDeadlineAckIds(ackId);
                    }
                }
            }
            if (ackChunksIt.hasNext()) {
                List ackChunk = (List)ackChunksIt.next();
                requestBuilder.addAllAckIds((Iterable)ackChunk);
            }
            this.requestObserver.onNext((Object)requestBuilder.build());
        }
    }

    public void updateStreamAckDeadline(int newAckDeadlineSeconds) {
        this.messageDispatcher.setMessageDeadlineSeconds(newAckDeadlineSeconds);
        this.requestObserver.onNext((Object)StreamingPullRequest.newBuilder().setStreamAckDeadlineSeconds(newAckDeadlineSeconds).build());
    }

    private class StreamingPullResponseObserver
    implements ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> {
        final SettableFuture<Void> errorFuture;

        StreamingPullResponseObserver(SettableFuture<Void> errorFuture) {
            this.errorFuture = errorFuture;
        }

        public void beforeStart(ClientCallStreamObserver<StreamingPullRequest> requestObserver) {
            StreamingSubscriberConnection.this.requestObserver = requestObserver;
            requestObserver.disableAutoInboundFlowControl();
        }

        public void onNext(StreamingPullResponse response) {
            StreamingSubscriberConnection.this.messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());
            if (StreamingSubscriberConnection.this.isAlive()) {
                StreamingSubscriberConnection.this.requestObserver.request(1);
            }
        }

        public void onError(Throwable t) {
            logger.log(Level.INFO, "Terminated streaming with exception", t);
            this.errorFuture.setException(t);
        }

        public void onCompleted() {
            logger.log(Level.INFO, "Streaming pull terminated successfully!");
            this.errorFuture.set(null);
        }
    }
}

