/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.spi.v1;

import com.google.api.gax.grpc.FlowController;
import com.google.api.stats.Distribution;
import com.google.auth.Credentials;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.spi.v1.MessageDispatcher;
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
import com.google.cloud.pubsub.spi.v1.StatusUtil;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.GetSubscriptionRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.SubscriberGrpc;
import com.google.pubsub.v1.Subscription;
import io.grpc.Channel;
import io.grpc.StatusRuntimeException;
import io.grpc.auth.MoreCallCredentials;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;

final class PollingSubscriberConnection
extends AbstractService
implements MessageDispatcher.AckProcessor {
    private static final int MAX_PER_REQUEST_CHANGES = 1000;
    private static final Duration DEFAULT_TIMEOUT = Duration.standardSeconds((long)10L);
    private static final int DEFAULT_MAX_MESSAGES = 1000;
    private static final Duration INITIAL_BACKOFF = Duration.millis((long)100L);
    private static final Duration MAX_BACKOFF = Duration.standardSeconds((long)10L);
    private static final Logger logger = Logger.getLogger(PollingSubscriberConnection.class.getName());
    private final String subscription;
    private final ScheduledExecutorService executor;
    private final SubscriberGrpc.SubscriberFutureStub stub;
    private final MessageDispatcher messageDispatcher;

    public PollingSubscriberConnection(String subscription, Credentials credentials, MessageReceiver receiver, Duration ackExpirationPadding, Distribution ackLatencyDistribution, Channel channel, FlowController flowController, ScheduledExecutorService executor, Clock clock) {
        this.subscription = subscription;
        this.executor = executor;
        this.stub = (SubscriberGrpc.SubscriberFutureStub)SubscriberGrpc.newFutureStub((Channel)channel).withCallCredentials(MoreCallCredentials.from((Credentials)credentials));
        this.messageDispatcher = new MessageDispatcher(receiver, this, ackExpirationPadding, ackLatencyDistribution, flowController, executor, clock);
    }

    protected void doStart() {
        logger.log(Level.INFO, "Starting subscriber.");
        this.initialize();
        this.notifyStarted();
    }

    private void initialize() {
        ListenableFuture subscriptionInfo = ((SubscriberGrpc.SubscriberFutureStub)this.stub.withDeadlineAfter(DEFAULT_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)).getSubscription(GetSubscriptionRequest.newBuilder().setSubscription(this.subscription).build());
        Futures.addCallback((ListenableFuture)subscriptionInfo, (FutureCallback)new FutureCallback<Subscription>(){

            public void onSuccess(Subscription result) {
                PollingSubscriberConnection.this.messageDispatcher.setMessageDeadlineSeconds(result.getAckDeadlineSeconds());
                PollingSubscriberConnection.this.pullMessages(INITIAL_BACKOFF);
            }

            public void onFailure(Throwable cause) {
                PollingSubscriberConnection.this.notifyFailed(cause);
            }
        });
    }

    protected void doStop() {
        this.messageDispatcher.stop();
        this.notifyStopped();
    }

    private void pullMessages(final Duration backoff) {
        ListenableFuture pullResult = ((SubscriberGrpc.SubscriberFutureStub)this.stub.withDeadlineAfter(DEFAULT_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)).pull(PullRequest.newBuilder().setSubscription(this.subscription).setMaxMessages(1000).setReturnImmediately(true).build());
        Futures.addCallback((ListenableFuture)pullResult, (FutureCallback)new FutureCallback<PullResponse>(){

            public void onSuccess(PullResponse pullResponse) {
                PollingSubscriberConnection.this.messageDispatcher.processReceivedMessages(pullResponse.getReceivedMessagesList());
                if (pullResponse.getReceivedMessagesCount() == 0) {
                    PollingSubscriberConnection.this.executor.schedule(new Runnable(){

                        @Override
                        public void run() {
                            Duration newBackoff = backoff.multipliedBy(2L);
                            if (newBackoff.isLongerThan((ReadableDuration)MAX_BACKOFF)) {
                                newBackoff = MAX_BACKOFF;
                            }
                            PollingSubscriberConnection.this.pullMessages(newBackoff);
                        }
                    }, backoff.getMillis(), TimeUnit.MILLISECONDS);
                    return;
                }
                PollingSubscriberConnection.this.pullMessages(INITIAL_BACKOFF);
            }

            public void onFailure(Throwable cause) {
                if (!(cause instanceof StatusRuntimeException) || StatusUtil.isRetryable(((StatusRuntimeException)cause).getStatus())) {
                    logger.log(Level.SEVERE, "Failed to pull messages (recoverable): ", cause);
                    PollingSubscriberConnection.this.executor.schedule(new Runnable(){

                        @Override
                        public void run() {
                            Duration newBackoff = backoff.multipliedBy(2L);
                            if (newBackoff.isLongerThan((ReadableDuration)MAX_BACKOFF)) {
                                newBackoff = MAX_BACKOFF;
                            }
                            PollingSubscriberConnection.this.pullMessages(newBackoff);
                        }
                    }, backoff.getMillis(), TimeUnit.MILLISECONDS);
                    return;
                }
                PollingSubscriberConnection.this.notifyFailed(cause);
            }
        });
    }

    @Override
    public void sendAckOperations(List<String> acksToSend, List<MessageDispatcher.PendingModifyAckDeadline> ackDeadlineExtensions) {
        List modifyAckDeadlineChunks = Lists.partition(ackDeadlineExtensions, (int)1000);
        for (List modAckChunk : modifyAckDeadlineChunks) {
            for (MessageDispatcher.PendingModifyAckDeadline modifyAckDeadline : modAckChunk) {
                ((SubscriberGrpc.SubscriberFutureStub)this.stub.withDeadlineAfter(DEFAULT_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)).modifyAckDeadline(ModifyAckDeadlineRequest.newBuilder().setSubscription(this.subscription).addAllAckIds(modifyAckDeadline.ackIds).setAckDeadlineSeconds(modifyAckDeadline.deadlineExtensionSeconds).build());
            }
        }
        for (List ackChunk : Lists.partition(acksToSend, (int)1000)) {
            ((SubscriberGrpc.SubscriberFutureStub)this.stub.withDeadlineAfter(DEFAULT_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)).acknowledge(AcknowledgeRequest.newBuilder().setSubscription(this.subscription).addAllAckIds((Iterable)ackChunk).build());
        }
    }
}

