/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.v1;

import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.SequentialExecutorService;
import com.google.cloud.pubsub.v1.Waiter;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;
import org.threeten.bp.temporal.ChronoUnit;
import org.threeten.bp.temporal.Temporal;
import org.threeten.bp.temporal.TemporalAmount;
import org.threeten.bp.temporal.TemporalUnit;

class MessageDispatcher {
    private static final Logger logger = Logger.getLogger(MessageDispatcher.class.getName());
    private static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
    @InternalApi
    static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis((long)100L);
    private final Executor executor;
    private final SequentialExecutorService.AutoExecutor sequentialExecutor;
    private final ScheduledExecutorService systemExecutor;
    private final ApiClock clock;
    private final Duration ackExpirationPadding;
    private final Duration maxAckExtensionPeriod;
    private final int maxSecondsPerAckExtension;
    private final MessageReceiver receiver;
    private final AckProcessor ackProcessor;
    private final FlowController flowController;
    private final Waiter messagesWaiter;
    private final ConcurrentMap<String, AckHandler> pendingMessages = new ConcurrentHashMap<String, AckHandler>();
    private final LinkedBlockingQueue<String> pendingAcks = new LinkedBlockingQueue();
    private final LinkedBlockingQueue<String> pendingNacks = new LinkedBlockingQueue();
    private final LinkedBlockingQueue<String> pendingReceipts = new LinkedBlockingQueue();
    private final AtomicInteger messageDeadlineSeconds = new AtomicInteger(60);
    private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
    private final Lock jobLock;
    private ScheduledFuture<?> backgroundJob;
    private ScheduledFuture<?> setExtendedDeadlineFuture;
    private final Distribution ackLatencyDistribution;

    MessageDispatcher(MessageReceiver receiver, AckProcessor ackProcessor, Duration ackExpirationPadding, Duration maxAckExtensionPeriod, Duration maxDurationPerAckExtension, Distribution ackLatencyDistribution, FlowController flowController, Executor executor, ScheduledExecutorService systemExecutor, ApiClock clock) {
        this.executor = executor;
        this.systemExecutor = systemExecutor;
        this.ackExpirationPadding = ackExpirationPadding;
        this.maxAckExtensionPeriod = maxAckExtensionPeriod;
        this.maxSecondsPerAckExtension = Math.toIntExact(maxDurationPerAckExtension.getSeconds());
        this.receiver = receiver;
        this.ackProcessor = ackProcessor;
        this.flowController = flowController;
        this.ackLatencyDistribution = ackLatencyDistribution;
        this.jobLock = new ReentrantLock();
        this.messagesWaiter = new Waiter();
        this.clock = clock;
        this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor);
    }

    void start() {
        final Runnable setExtendDeadline = new Runnable(){

            @Override
            public void run() {
                MessageDispatcher.this.extendDeadline.set(true);
            }
        };
        this.jobLock.lock();
        try {
            this.backgroundJob = this.systemExecutor.scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    try {
                        if (MessageDispatcher.this.extendDeadline.getAndSet(false)) {
                            int newDeadlineSec = MessageDispatcher.this.computeDeadlineSeconds();
                            MessageDispatcher.this.messageDeadlineSeconds.set(newDeadlineSec);
                            MessageDispatcher.this.extendDeadlines();
                            if (MessageDispatcher.this.setExtendedDeadlineFuture != null && !MessageDispatcher.this.backgroundJob.isDone()) {
                                MessageDispatcher.this.setExtendedDeadlineFuture.cancel(true);
                            }
                            MessageDispatcher.this.setExtendedDeadlineFuture = MessageDispatcher.this.systemExecutor.schedule(setExtendDeadline, (long)newDeadlineSec - MessageDispatcher.this.ackExpirationPadding.getSeconds(), TimeUnit.SECONDS);
                        }
                        MessageDispatcher.this.processOutstandingAckOperations();
                    }
                    catch (Throwable t) {
                        logger.log(Level.WARNING, "failed to run periodic job", t);
                    }
                }
            }, PENDING_ACKS_SEND_DELAY.toMillis(), PENDING_ACKS_SEND_DELAY.toMillis(), TimeUnit.MILLISECONDS);
        }
        finally {
            this.jobLock.unlock();
        }
    }

    void stop() {
        this.messagesWaiter.waitComplete();
        this.jobLock.lock();
        try {
            if (this.backgroundJob != null) {
                this.backgroundJob.cancel(false);
            }
            if (this.setExtendedDeadlineFuture != null) {
                this.setExtendedDeadlineFuture.cancel(true);
            }
            this.backgroundJob = null;
            this.setExtendedDeadlineFuture = null;
        }
        finally {
            this.jobLock.unlock();
        }
        this.processOutstandingAckOperations();
    }

    @InternalApi
    void setMessageDeadlineSeconds(int sec) {
        this.messageDeadlineSeconds.set(sec);
    }

    @InternalApi
    int getMessageDeadlineSeconds() {
        return this.messageDeadlineSeconds.get();
    }

    void processReceivedMessages(List<ReceivedMessage> messages) {
        Instant totalExpiration = this.now().plus((TemporalAmount)this.maxAckExtensionPeriod);
        ArrayList<OutstandingMessage> outstandingBatch = new ArrayList<OutstandingMessage>(messages.size());
        for (ReceivedMessage message : messages) {
            AckHandler ackHandler = new AckHandler(message.getAckId(), message.getMessage().getSerializedSize(), totalExpiration);
            if (this.pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) continue;
            outstandingBatch.add(new OutstandingMessage(message, ackHandler));
            this.pendingReceipts.add(message.getAckId());
        }
        this.processBatch(outstandingBatch);
    }

    private void processBatch(List<OutstandingMessage> batch) {
        this.messagesWaiter.incrementPendingCount(batch.size());
        for (OutstandingMessage message : batch) {
            try {
                this.flowController.reserve(1L, (long)message.receivedMessage.getMessage().getSerializedSize());
            }
            catch (FlowController.FlowControlException unexpectedException) {
                throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
            }
            this.processOutstandingMessage(this.addDeliveryInfoCount(message.receivedMessage), message.ackHandler);
        }
    }

    private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) {
        PubsubMessage originalMessage = receivedMessage.getMessage();
        int deliveryAttempt = receivedMessage.getDeliveryAttempt();
        if (deliveryAttempt > 0) {
            return PubsubMessage.newBuilder((PubsubMessage)originalMessage).putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)).build();
        }
        return originalMessage;
    }

    private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
        final SettableApiFuture response = SettableApiFuture.create();
        final AckReplyConsumer consumer = new AckReplyConsumer(){

            @Override
            public void ack() {
                response.set((Object)AckReply.ACK);
            }

            @Override
            public void nack() {
                response.set((Object)AckReply.NACK);
            }
        };
        ApiFutures.addCallback((ApiFuture)response, (ApiFutureCallback)ackHandler, (Executor)MoreExecutors.directExecutor());
        Runnable deliverMessageTask = new Runnable(){

            @Override
            public void run() {
                try {
                    if (ackHandler.totalExpiration.plusSeconds((long)MessageDispatcher.this.messageDeadlineSeconds.get()).isBefore(MessageDispatcher.this.now())) {
                        ackHandler.forget();
                        return;
                    }
                    MessageDispatcher.this.receiver.receiveMessage(message, consumer);
                }
                catch (Exception e) {
                    response.setException((Throwable)e);
                }
            }
        };
        if (message.getOrderingKey().isEmpty()) {
            this.executor.execute(deliverMessageTask);
        } else {
            this.sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
        }
    }

    @InternalApi
    int computeDeadlineSeconds() {
        int sec = this.ackLatencyDistribution.getPercentile(99.9);
        if (this.maxSecondsPerAckExtension > 0 && sec > this.maxSecondsPerAckExtension) {
            sec = this.maxSecondsPerAckExtension;
        }
        if (sec < 10) {
            sec = 10;
        } else if (sec > 600) {
            sec = 600;
        }
        return sec;
    }

    @InternalApi
    void extendDeadlines() {
        int extendSeconds = this.getMessageDeadlineSeconds();
        ArrayList<PendingModifyAckDeadline> modacks = new ArrayList<PendingModifyAckDeadline>();
        PendingModifyAckDeadline modack = new PendingModifyAckDeadline(extendSeconds, new String[0]);
        Instant now = this.now();
        Instant extendTo = now.plusSeconds((long)extendSeconds);
        for (Map.Entry entry : this.pendingMessages.entrySet()) {
            String ackId = (String)entry.getKey();
            Instant totalExpiration = ((AckHandler)entry.getValue()).totalExpiration;
            if (totalExpiration.isAfter(extendTo)) {
                modack.ackIds.add(ackId);
                continue;
            }
            ((AckHandler)entry.getValue()).forget();
            if (!totalExpiration.isAfter(now)) continue;
            int sec = Math.max(1, (int)now.until((Temporal)totalExpiration, (TemporalUnit)ChronoUnit.SECONDS));
            modacks.add(new PendingModifyAckDeadline(sec, ackId));
        }
        logger.log(Level.FINER, "Sending {0} modacks", modack.ackIds.size() + modacks.size());
        modacks.add(modack);
        List<String> acksToSend = Collections.emptyList();
        this.ackProcessor.sendAckOperations(acksToSend, modacks);
    }

    @InternalApi
    void processOutstandingAckOperations() {
        ArrayList<PendingModifyAckDeadline> modifyAckDeadlinesToSend = new ArrayList<PendingModifyAckDeadline>();
        ArrayList<String> acksToSend = new ArrayList<String>();
        this.pendingAcks.drainTo(acksToSend);
        logger.log(Level.FINER, "Sending {0} acks", acksToSend.size());
        PendingModifyAckDeadline nacksToSend = new PendingModifyAckDeadline(0, new String[0]);
        this.pendingNacks.drainTo(nacksToSend.ackIds);
        logger.log(Level.FINER, "Sending {0} nacks", nacksToSend.ackIds.size());
        if (!nacksToSend.ackIds.isEmpty()) {
            modifyAckDeadlinesToSend.add(nacksToSend);
        }
        PendingModifyAckDeadline receiptsToSend = new PendingModifyAckDeadline(this.getMessageDeadlineSeconds(), new String[0]);
        this.pendingReceipts.drainTo(receiptsToSend.ackIds);
        logger.log(Level.FINER, "Sending {0} receipts", receiptsToSend.ackIds.size());
        if (!receiptsToSend.ackIds.isEmpty()) {
            modifyAckDeadlinesToSend.add(receiptsToSend);
        }
        this.ackProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend);
    }

    private Instant now() {
        return Instant.ofEpochMilli((long)this.clock.millisTime());
    }

    private static class OutstandingMessage {
        private final ReceivedMessage receivedMessage;
        private final AckHandler ackHandler;

        private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
            this.receivedMessage = receivedMessage;
            this.ackHandler = ackHandler;
        }
    }

    static interface AckProcessor {
        public void sendAckOperations(List<String> var1, List<PendingModifyAckDeadline> var2);
    }

    private class AckHandler
    implements ApiFutureCallback<AckReply> {
        private final String ackId;
        private final int outstandingBytes;
        private final long receivedTimeMillis;
        private final Instant totalExpiration;

        private AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) {
            this.ackId = ackId;
            this.outstandingBytes = outstandingBytes;
            this.receivedTimeMillis = MessageDispatcher.this.clock.millisTime();
            this.totalExpiration = totalExpiration;
        }

        private void forget() {
            if (MessageDispatcher.this.pendingMessages.remove(this.ackId) == null) {
                return;
            }
            MessageDispatcher.this.flowController.release(1L, (long)this.outstandingBytes);
            MessageDispatcher.this.messagesWaiter.incrementPendingCount(-1);
        }

        public void onFailure(Throwable t) {
            logger.log(Level.WARNING, "MessageReceiver failed to process ack ID: " + this.ackId + ", the message will be nacked.", t);
            MessageDispatcher.this.pendingNacks.add(this.ackId);
            this.forget();
        }

        public void onSuccess(AckReply reply) {
            LinkedBlockingQueue destination;
            switch (reply) {
                case ACK: {
                    destination = MessageDispatcher.this.pendingAcks;
                    MessageDispatcher.this.ackLatencyDistribution.record(Ints.saturatedCast((long)((long)Math.ceil((double)(MessageDispatcher.this.clock.millisTime() - this.receivedTimeMillis) / 1000.0))));
                    break;
                }
                case NACK: {
                    destination = MessageDispatcher.this.pendingNacks;
                    break;
                }
                default: {
                    throw new IllegalArgumentException(String.format("AckReply: %s not supported", new Object[]{reply}));
                }
            }
            destination.add(this.ackId);
            this.forget();
        }
    }

    public static enum AckReply {
        ACK,
        NACK;

    }

    static class PendingModifyAckDeadline {
        final List<String> ackIds;
        final int deadlineExtensionSeconds;

        PendingModifyAckDeadline(int deadlineExtensionSeconds, String ... ackIds) {
            this(deadlineExtensionSeconds, Arrays.asList(ackIds));
        }

        private PendingModifyAckDeadline(int deadlineExtensionSeconds, Collection<String> ackIds) {
            this.ackIds = new ArrayList<String>(ackIds);
            this.deadlineExtensionSeconds = deadlineExtensionSeconds;
        }

        public String toString() {
            return String.format("PendingModifyAckDeadline{extension: %d sec, ackIds: %s}", this.deadlineExtensionSeconds, this.ackIds);
        }
    }
}

