/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.spi.v1;

import com.google.api.gax.grpc.FlowController;
import com.google.api.stats.Distribution;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.spi.v1.AckReply;
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
import com.google.cloud.pubsub.spi.v1.MessageWaiter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Interval;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

class MessageDispatcher {
    private static final Logger logger = Logger.getLogger(MessageDispatcher.class.getName());
    private static final int INITIAL_ACK_DEADLINE_EXTENSION_SECONDS = 2;
    @VisibleForTesting
    static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis((long)100L);
    private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 600;
    private final ScheduledExecutorService executor;
    private final Clock clock;
    private final Duration ackExpirationPadding;
    private final MessageReceiver receiver;
    private final AckProcessor ackProcessor;
    private final FlowController flowController;
    private final MessageWaiter messagesWaiter;
    private final PriorityQueue<ExtensionJob> outstandingAckHandlers;
    private final Set<String> pendingAcks;
    private final Set<String> pendingNacks;
    private final Lock alarmsLock;
    private int messageDeadlineSeconds;
    private ScheduledFuture<?> ackDeadlineExtensionAlarm;
    private Instant nextAckDeadlineExtensionAlarmTime;
    private ScheduledFuture<?> pendingAcksAlarm;
    private final Distribution ackLatencyDistribution;

    MessageDispatcher(MessageReceiver receiver, AckProcessor ackProcessor, Duration ackExpirationPadding, Distribution ackLatencyDistribution, FlowController flowController, ScheduledExecutorService executor, Clock clock) {
        this.executor = executor;
        this.ackExpirationPadding = ackExpirationPadding;
        this.receiver = receiver;
        this.ackProcessor = ackProcessor;
        this.flowController = flowController;
        this.outstandingAckHandlers = new PriorityQueue();
        this.pendingAcks = new HashSet<String>();
        this.pendingNacks = new HashSet<String>();
        this.ackLatencyDistribution = ackLatencyDistribution;
        this.alarmsLock = new ReentrantLock();
        this.nextAckDeadlineExtensionAlarmTime = new Instant(Long.MAX_VALUE);
        this.messagesWaiter = new MessageWaiter();
        this.clock = clock;
    }

    public void stop() {
        this.messagesWaiter.waitNoMessages();
        this.alarmsLock.lock();
        try {
            if (this.ackDeadlineExtensionAlarm != null) {
                this.ackDeadlineExtensionAlarm.cancel(true);
                this.ackDeadlineExtensionAlarm = null;
            }
        }
        finally {
            this.alarmsLock.unlock();
        }
        this.processOutstandingAckOperations();
    }

    public void setMessageDeadlineSeconds(int messageDeadlineSeconds) {
        this.messageDeadlineSeconds = messageDeadlineSeconds;
    }

    public int getMessageDeadlineSeconds() {
        return this.messageDeadlineSeconds;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processReceivedMessages(List<ReceivedMessage> responseMessages) {
        int receivedMessagesCount = responseMessages.size();
        if (receivedMessagesCount == 0) {
            return;
        }
        Instant now = new Instant(this.clock.millis());
        int totalByteCount = 0;
        ArrayList<AckHandler> ackHandlers = new ArrayList<AckHandler>(responseMessages.size());
        for (ReceivedMessage pubsubMessage : responseMessages) {
            int messageSize = pubsubMessage.getMessage().getSerializedSize();
            totalByteCount += messageSize;
            ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize));
        }
        Instant expiration = now.plus((long)(this.messageDeadlineSeconds * 1000));
        logger.log(Level.INFO, "Received " + responseMessages.size() + " messages at " + now);
        this.messagesWaiter.incrementPendingMessages(responseMessages.size());
        Iterator acksIterator = ackHandlers.iterator();
        for (ReceivedMessage userMessage : responseMessages) {
            final PubsubMessage message = userMessage.getMessage();
            AckHandler ackHandler = (AckHandler)acksIterator.next();
            final SettableFuture response = SettableFuture.create();
            final AckReplyConsumer consumer = new AckReplyConsumer(){

                @Override
                public void accept(AckReply reply, Throwable t) {
                    if (reply != null) {
                        response.set((Object)reply);
                    } else {
                        response.setException(t);
                    }
                }
            };
            Futures.addCallback((ListenableFuture)response, (FutureCallback)ackHandler);
            this.executor.submit(new Runnable(){

                @Override
                public void run() {
                    MessageDispatcher.this.receiver.receiveMessage(message, consumer);
                }
            });
        }
        PriorityQueue<ExtensionJob> messageSize = this.outstandingAckHandlers;
        synchronized (messageSize) {
            this.outstandingAckHandlers.add(new ExtensionJob(expiration, 2, ackHandlers));
        }
        this.setupNextAckDeadlineExtensionAlarm(expiration);
        try {
            this.flowController.reserve(receivedMessagesCount, totalByteCount);
        }
        catch (FlowController.FlowControlException unexpectedException) {
            throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
        }
    }

    private void setupPendingAcksAlarm() {
        this.alarmsLock.lock();
        try {
            if (this.pendingAcksAlarm == null) {
                this.pendingAcksAlarm = this.executor.schedule(new Runnable(){

                    @Override
                    public void run() {
                        MessageDispatcher.this.alarmsLock.lock();
                        try {
                            MessageDispatcher.this.pendingAcksAlarm = null;
                        }
                        finally {
                            MessageDispatcher.this.alarmsLock.unlock();
                        }
                        MessageDispatcher.this.processOutstandingAckOperations();
                    }
                }, PENDING_ACKS_SEND_DELAY.getMillis(), TimeUnit.MILLISECONDS);
            }
        }
        finally {
            this.alarmsLock.unlock();
        }
    }

    private void setupNextAckDeadlineExtensionAlarm(Instant expiration) {
        Instant possibleNextAlarmTime = expiration.minus((ReadableDuration)this.ackExpirationPadding);
        this.alarmsLock.lock();
        try {
            if (this.nextAckDeadlineExtensionAlarmTime.isAfter((ReadableInstant)possibleNextAlarmTime)) {
                logger.log(Level.INFO, "Scheduling next alarm time: " + possibleNextAlarmTime + ", last alarm set to time: " + this.nextAckDeadlineExtensionAlarmTime);
                if (this.ackDeadlineExtensionAlarm != null) {
                    logger.log(Level.INFO, "Canceling previous alarm");
                    this.ackDeadlineExtensionAlarm.cancel(false);
                }
                this.nextAckDeadlineExtensionAlarmTime = possibleNextAlarmTime;
                this.ackDeadlineExtensionAlarm = this.executor.schedule(new AckDeadlineAlarm(), this.nextAckDeadlineExtensionAlarmTime.getMillis() - this.clock.millis(), TimeUnit.MILLISECONDS);
            }
        }
        finally {
            this.alarmsLock.unlock();
        }
    }

    private void processOutstandingAckOperations() {
        this.processOutstandingAckOperations(new ArrayList<PendingModifyAckDeadline>());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processOutstandingAckOperations(List<PendingModifyAckDeadline> ackDeadlineExtensions) {
        ArrayList modifyAckDeadlinesToSend = Lists.newArrayList(ackDeadlineExtensions);
        ArrayList<Object> acksToSend = new ArrayList(this.pendingAcks.size());
        Set<String> set = this.pendingAcks;
        synchronized (set) {
            if (!this.pendingAcks.isEmpty()) {
                try {
                    acksToSend = new ArrayList<String>(this.pendingAcks);
                    logger.log(Level.INFO, "Sending {} acks", acksToSend.size());
                }
                finally {
                    this.pendingAcks.clear();
                }
            }
        }
        PendingModifyAckDeadline nacksToSend = new PendingModifyAckDeadline(0);
        Set<String> set2 = this.pendingNacks;
        synchronized (set2) {
            if (!this.pendingNacks.isEmpty()) {
                try {
                    for (String ackId : this.pendingNacks) {
                        nacksToSend.addAckId(ackId);
                    }
                    logger.log(Level.INFO, "Sending {} nacks", this.pendingNacks.size());
                }
                finally {
                    this.pendingNacks.clear();
                }
                modifyAckDeadlinesToSend.add(nacksToSend);
            }
        }
        this.ackProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend);
    }

    private class AckDeadlineAlarm
    implements Runnable {
        private AckDeadlineAlarm() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            MessageDispatcher.this.alarmsLock.lock();
            try {
                MessageDispatcher.this.nextAckDeadlineExtensionAlarmTime = new Instant(Long.MAX_VALUE);
                MessageDispatcher.this.ackDeadlineExtensionAlarm = null;
                if (MessageDispatcher.this.pendingAcksAlarm != null) {
                    MessageDispatcher.this.pendingAcksAlarm.cancel(false);
                    MessageDispatcher.this.pendingAcksAlarm = null;
                }
            }
            finally {
                MessageDispatcher.this.alarmsLock.unlock();
            }
            Instant now = new Instant(MessageDispatcher.this.clock.millis());
            Instant cutOverTime = new Instant((long)Math.ceil((double)now.plus((ReadableDuration)MessageDispatcher.this.ackExpirationPadding).plus(500L).getMillis() / 1000.0) * 1000L);
            logger.log(Level.INFO, "Running alarm sent outstanding acks, at now time: " + now + ", with cutover time: " + cutOverTime + ", padding: " + MessageDispatcher.this.ackExpirationPadding);
            Instant nextScheduleExpiration = null;
            ArrayList<PendingModifyAckDeadline> modifyAckDeadlinesToSend = new ArrayList<PendingModifyAckDeadline>();
            ArrayList<ExtensionJob> renewJobs = new ArrayList<ExtensionJob>();
            PriorityQueue priorityQueue = MessageDispatcher.this.outstandingAckHandlers;
            synchronized (priorityQueue) {
                while (!MessageDispatcher.this.outstandingAckHandlers.isEmpty() && ((ExtensionJob)((MessageDispatcher)MessageDispatcher.this).outstandingAckHandlers.peek()).expiration.compareTo((ReadableInstant)cutOverTime) <= 0) {
                    ExtensionJob job = (ExtensionJob)MessageDispatcher.this.outstandingAckHandlers.poll();
                    int i = 0;
                    while (i < job.ackHandlers.size()) {
                        if (job.ackHandlers.get(i).acked.get()) {
                            Collections.swap(job.ackHandlers, i, job.ackHandlers.size() - 1);
                            job.ackHandlers.remove(job.ackHandlers.size() - 1);
                            continue;
                        }
                        ++i;
                    }
                    if (job.ackHandlers.isEmpty()) continue;
                    job.extendExpiration(now);
                    int extensionSeconds = Ints.saturatedCast((long)new Interval((ReadableInstant)now, (ReadableInstant)job.expiration).toDuration().getStandardSeconds());
                    PendingModifyAckDeadline pendingModAckDeadline = new PendingModifyAckDeadline(extensionSeconds);
                    for (AckHandler ackHandler : job.ackHandlers) {
                        pendingModAckDeadline.addAckId(ackHandler.ackId);
                    }
                    modifyAckDeadlinesToSend.add(pendingModAckDeadline);
                    renewJobs.add(job);
                }
                for (ExtensionJob job : renewJobs) {
                    MessageDispatcher.this.outstandingAckHandlers.add(job);
                }
                if (!MessageDispatcher.this.outstandingAckHandlers.isEmpty()) {
                    nextScheduleExpiration = ((ExtensionJob)((MessageDispatcher)MessageDispatcher.this).outstandingAckHandlers.peek()).expiration;
                }
            }
            MessageDispatcher.this.processOutstandingAckOperations(modifyAckDeadlinesToSend);
            if (nextScheduleExpiration != null) {
                logger.log(Level.INFO, "Scheduling based on outstanding, now time: " + now + ", next schedule time: " + nextScheduleExpiration);
                MessageDispatcher.this.setupNextAckDeadlineExtensionAlarm(nextScheduleExpiration);
            }
        }
    }

    public static interface AckProcessor {
        public void sendAckOperations(List<String> var1, List<PendingModifyAckDeadline> var2);
    }

    private class AckHandler
    implements FutureCallback<AckReply> {
        private final String ackId;
        private final int outstandingBytes;
        private final AtomicBoolean acked;
        private final Instant receivedTime;

        AckHandler(String ackId, int outstandingBytes) {
            this.ackId = ackId;
            this.outstandingBytes = outstandingBytes;
            this.acked = new AtomicBoolean(false);
            this.receivedTime = new Instant(MessageDispatcher.this.clock.millis());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onFailure(Throwable t) {
            logger.log(Level.WARNING, "MessageReceiver failed to processes ack ID: " + this.ackId + ", the message will be nacked.", t);
            Set set = MessageDispatcher.this.pendingNacks;
            synchronized (set) {
                MessageDispatcher.this.pendingNacks.add(this.ackId);
            }
            MessageDispatcher.this.setupPendingAcksAlarm();
            MessageDispatcher.this.flowController.release(1, this.outstandingBytes);
            MessageDispatcher.this.messagesWaiter.incrementPendingMessages(-1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onSuccess(AckReply reply) {
            this.acked.getAndSet(true);
            switch (reply) {
                case ACK: {
                    Set set = MessageDispatcher.this.pendingAcks;
                    synchronized (set) {
                        MessageDispatcher.this.pendingAcks.add(this.ackId);
                    }
                    MessageDispatcher.this.setupPendingAcksAlarm();
                    MessageDispatcher.this.flowController.release(1, this.outstandingBytes);
                    MessageDispatcher.this.ackLatencyDistribution.record(Ints.saturatedCast((long)((long)Math.ceil((double)(MessageDispatcher.this.clock.millis() - this.receivedTime.getMillis()) / 1000.0))));
                    MessageDispatcher.this.messagesWaiter.incrementPendingMessages(-1);
                    return;
                }
                case NACK: {
                    Set set = MessageDispatcher.this.pendingNacks;
                    synchronized (set) {
                        MessageDispatcher.this.pendingNacks.add(this.ackId);
                    }
                    MessageDispatcher.this.setupPendingAcksAlarm();
                    MessageDispatcher.this.flowController.release(1, this.outstandingBytes);
                    MessageDispatcher.this.messagesWaiter.incrementPendingMessages(-1);
                    return;
                }
            }
            throw new IllegalArgumentException(String.format("AckReply: %s not supported", new Object[]{reply}));
        }
    }

    static class PendingModifyAckDeadline {
        final List<String> ackIds = new ArrayList<String>();
        final int deadlineExtensionSeconds;

        PendingModifyAckDeadline(int deadlineExtensionSeconds) {
            this.deadlineExtensionSeconds = deadlineExtensionSeconds;
        }

        PendingModifyAckDeadline(String ackId, int deadlineExtensionSeconds) {
            this(deadlineExtensionSeconds);
            this.addAckId(ackId);
        }

        public void addAckId(String ackId) {
            this.ackIds.add(ackId);
        }

        public String toString() {
            return String.format("PendingModifyAckDeadline{extension: %d sec, ackIds: %s}", this.deadlineExtensionSeconds, this.ackIds);
        }
    }

    private static class ExtensionJob
    implements Comparable<ExtensionJob> {
        Instant expiration;
        int nextExtensionSeconds;
        ArrayList<AckHandler> ackHandlers;

        ExtensionJob(Instant expiration, int initialAckDeadlineExtension, ArrayList<AckHandler> ackHandlers) {
            this.expiration = expiration;
            this.nextExtensionSeconds = initialAckDeadlineExtension;
            this.ackHandlers = ackHandlers;
        }

        void extendExpiration(Instant now) {
            this.expiration = now.plus((ReadableDuration)Duration.standardSeconds((long)this.nextExtensionSeconds));
            this.nextExtensionSeconds = Math.min(2 * this.nextExtensionSeconds, 600);
        }

        @Override
        public int compareTo(ExtensionJob other) {
            return this.expiration.compareTo((ReadableInstant)other.expiration);
        }

        public String toString() {
            ArrayList<String> ackIds = new ArrayList<String>();
            for (AckHandler ah : this.ackHandlers) {
                ackIds.add(ah.ackId);
            }
            return String.format("ExtensionJob {expiration: %s, nextExtensionSeconds: %d, ackIds: %s}", this.expiration, this.nextExtensionSeconds, ackIds);
        }
    }
}

