/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.v1;

import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.cloud.pubsub.v1.AckReplyConsumerImpl;
import com.google.cloud.pubsub.v1.AckReplyConsumerWithResponseImpl;
import com.google.cloud.pubsub.v1.AckRequestData;
import com.google.cloud.pubsub.v1.AckResponse;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse;
import com.google.cloud.pubsub.v1.ModackRequestData;
import com.google.cloud.pubsub.v1.OpenTelemetryPubsubTracer;
import com.google.cloud.pubsub.v1.PubsubMessageWrapper;
import com.google.cloud.pubsub.v1.SequentialExecutorService;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.Waiter;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.SubscriptionName;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

class MessageDispatcher {
    private static final Logger logger = Logger.getLogger(MessageDispatcher.class.getName());
    @InternalApi
    static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
    @InternalApi
    static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100L);
    private final Executor executor;
    private final SequentialExecutorService.AutoExecutor sequentialExecutor;
    private final ScheduledExecutorService systemExecutor;
    private final ApiClock clock;
    private final Duration ackExpirationPadding;
    private final Duration maxAckExtensionPeriod;
    private int minDurationPerAckExtensionSeconds;
    private final boolean minDurationPerAckExtensionDefaultUsed;
    private final int maxDurationPerAckExtensionSeconds;
    private final boolean maxDurationPerAckExtensionDefaultUsed;
    private MessageReceiver receiver;
    private MessageReceiverWithAckResponse receiverWithAckResponse;
    private final AckProcessor ackProcessor;
    private final FlowController flowController;
    private AtomicBoolean exactlyOnceDeliveryEnabled = new AtomicBoolean(false);
    private AtomicBoolean messageOrderingEnabled = new AtomicBoolean(false);
    private final Waiter messagesWaiter;
    private final ConcurrentMap<String, AckHandler> pendingMessages = new ConcurrentHashMap<String, AckHandler>();
    private final LinkedBlockingQueue<AckRequestData> pendingAcks = new LinkedBlockingQueue();
    private final LinkedBlockingQueue<AckRequestData> pendingNacks = new LinkedBlockingQueue();
    private final LinkedBlockingQueue<AckRequestData> pendingReceipts = new LinkedBlockingQueue();
    private final LinkedHashMap<String, ReceiptCompleteData> outstandingReceipts = new LinkedHashMap();
    private final AtomicInteger messageDeadlineSeconds = new AtomicInteger();
    private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
    private final Lock jobLock;
    private ScheduledFuture<?> backgroundJob;
    private ScheduledFuture<?> setExtendedDeadlineFuture;
    private final Distribution ackLatencyDistribution;
    private final SubscriptionName subscriptionNameObject;
    private final boolean enableOpenTelemetryTracing;
    private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);

    private MessageDispatcher(Builder builder) {
        this.executor = builder.executor;
        this.systemExecutor = builder.systemExecutor;
        this.ackExpirationPadding = builder.ackExpirationPadding;
        this.maxAckExtensionPeriod = builder.maxAckExtensionPeriod;
        this.minDurationPerAckExtensionSeconds = Math.toIntExact(builder.minDurationPerAckExtension.getSeconds());
        this.minDurationPerAckExtensionDefaultUsed = builder.minDurationPerAckExtensionDefaultUsed;
        this.maxDurationPerAckExtensionSeconds = Math.toIntExact(builder.maxDurationPerAckExtension.getSeconds());
        this.maxDurationPerAckExtensionDefaultUsed = builder.maxDurationPerAckExtensionDefaultUsed;
        if (this.minDurationPerAckExtensionDefaultUsed) {
            this.messageDeadlineSeconds.set(Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()));
        } else {
            this.messageDeadlineSeconds.set(this.minDurationPerAckExtensionSeconds);
        }
        this.receiver = builder.receiver;
        this.receiverWithAckResponse = builder.receiverWithAckResponse;
        this.ackProcessor = builder.ackProcessor;
        this.flowController = builder.flowController;
        this.ackLatencyDistribution = builder.ackLatencyDistribution;
        this.clock = builder.clock;
        this.jobLock = new ReentrantLock();
        this.messagesWaiter = new Waiter();
        this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor);
        this.subscriptionNameObject = SubscriptionName.parse((String)builder.subscriptionName);
        this.enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
        if (builder.tracer != null) {
            this.tracer = builder.tracer;
        }
    }

    private boolean shouldSetMessageFuture() {
        return this.receiverWithAckResponse != null;
    }

    void start() {
        final Runnable setExtendDeadline = new Runnable(){

            @Override
            public void run() {
                MessageDispatcher.this.extendDeadline.set(true);
            }
        };
        this.jobLock.lock();
        try {
            this.backgroundJob = this.systemExecutor.scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    try {
                        if (MessageDispatcher.this.extendDeadline.getAndSet(false)) {
                            int newDeadlineSec = MessageDispatcher.this.computeDeadlineSeconds();
                            MessageDispatcher.this.messageDeadlineSeconds.set(newDeadlineSec);
                            MessageDispatcher.this.extendDeadlines();
                            if (MessageDispatcher.this.setExtendedDeadlineFuture != null && !MessageDispatcher.this.backgroundJob.isDone()) {
                                MessageDispatcher.this.setExtendedDeadlineFuture.cancel(true);
                            }
                            MessageDispatcher.this.setExtendedDeadlineFuture = MessageDispatcher.this.systemExecutor.schedule(setExtendDeadline, (long)newDeadlineSec - MessageDispatcher.this.ackExpirationPadding.getSeconds(), TimeUnit.SECONDS);
                        }
                        MessageDispatcher.this.processOutstandingOperations();
                    }
                    catch (Throwable t) {
                        logger.log(Level.WARNING, "failed to run periodic job", t);
                    }
                }
            }, PENDING_ACKS_SEND_DELAY.toMillis(), PENDING_ACKS_SEND_DELAY.toMillis(), TimeUnit.MILLISECONDS);
        }
        finally {
            this.jobLock.unlock();
        }
    }

    void stop() {
        this.messagesWaiter.waitComplete();
        this.jobLock.lock();
        try {
            if (this.backgroundJob != null) {
                this.backgroundJob.cancel(false);
            }
            if (this.setExtendedDeadlineFuture != null) {
                this.setExtendedDeadlineFuture.cancel(true);
            }
            this.backgroundJob = null;
            this.setExtendedDeadlineFuture = null;
        }
        finally {
            this.jobLock.unlock();
        }
        this.processOutstandingOperations();
    }

    @InternalApi
    void setMessageDeadlineSeconds(int sec) {
        this.messageDeadlineSeconds.set(sec);
    }

    @InternalApi
    int getMessageDeadlineSeconds() {
        return this.messageDeadlineSeconds.get();
    }

    @InternalApi
    void setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) {
        if (exactlyOnceDeliveryEnabled == this.exactlyOnceDeliveryEnabled.get()) {
            return;
        }
        this.exactlyOnceDeliveryEnabled.set(exactlyOnceDeliveryEnabled);
        if (!this.minDurationPerAckExtensionDefaultUsed) {
            return;
        }
        int possibleNewMinAckDeadlineExtensionSeconds = exactlyOnceDeliveryEnabled ? Math.toIntExact(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY.getSeconds()) : Math.toIntExact(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION.getSeconds());
        this.minDurationPerAckExtensionSeconds = !this.maxDurationPerAckExtensionDefaultUsed && possibleNewMinAckDeadlineExtensionSeconds > this.maxDurationPerAckExtensionSeconds ? this.maxDurationPerAckExtensionSeconds : possibleNewMinAckDeadlineExtensionSeconds;
    }

    @InternalApi
    void setMessageOrderingEnabled(boolean messageOrderingEnabled) {
        this.messageOrderingEnabled.set(messageOrderingEnabled);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void processReceivedMessages(List<ReceivedMessage> messages) {
        Instant totalExpiration = this.now().plus(this.maxAckExtensionPeriod);
        ArrayList<OutstandingMessage> outstandingBatch = new ArrayList<OutstandingMessage>(messages.size());
        for (ReceivedMessage message : messages) {
            AckRequestData.Builder builder = AckRequestData.newBuilder(message.getAckId());
            if (this.shouldSetMessageFuture()) {
                builder.setMessageFuture((SettableApiFuture<AckResponse>)SettableApiFuture.create());
            }
            PubsubMessageWrapper messageWrapper = PubsubMessageWrapper.newBuilder(message.getMessage(), this.subscriptionNameObject, message.getAckId(), message.getDeliveryAttempt()).build();
            builder.setMessageWrapper(messageWrapper);
            this.tracer.startSubscriberSpan(messageWrapper, this.exactlyOnceDeliveryEnabled.get());
            AckRequestData ackRequestData = builder.build();
            AckHandler ackHandler = new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration);
            OutstandingMessage outstandingMessage = new OutstandingMessage(ackHandler);
            if (this.exactlyOnceDeliveryEnabled.get()) {
                LinkedHashMap<String, ReceiptCompleteData> linkedHashMap = this.outstandingReceipts;
                synchronized (linkedHashMap) {
                    this.outstandingReceipts.put(message.getAckId(), new ReceiptCompleteData(outstandingMessage));
                }
            } else {
                if (this.pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) continue;
                outstandingBatch.add(outstandingMessage);
            }
            this.pendingReceipts.add(ackRequestData);
        }
        this.processBatch(outstandingBatch);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void notifyAckSuccess(AckRequestData ackRequestData) {
        LinkedHashMap<String, ReceiptCompleteData> linkedHashMap = this.outstandingReceipts;
        synchronized (linkedHashMap) {
            if (this.outstandingReceipts.containsKey(ackRequestData.getAckId())) {
                Map.Entry<String, ReceiptCompleteData> receipt;
                this.outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete();
                ArrayList<OutstandingMessage> outstandingBatch = new ArrayList<OutstandingMessage>();
                Iterator<Map.Entry<String, ReceiptCompleteData>> it = this.outstandingReceipts.entrySet().iterator();
                while (it.hasNext() && (receipt = it.next()).getValue().isReceiptComplete().booleanValue()) {
                    it.remove();
                    if (this.pendingMessages.putIfAbsent(receipt.getKey(), receipt.getValue().getOutstandingMessage().ackHandler) != null) continue;
                    outstandingBatch.add(receipt.getValue().getOutstandingMessage());
                }
                this.processBatch(outstandingBatch);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void notifyAckFailed(AckRequestData ackRequestData) {
        LinkedHashMap<String, ReceiptCompleteData> linkedHashMap = this.outstandingReceipts;
        synchronized (linkedHashMap) {
            this.outstandingReceipts.remove(ackRequestData.getAckId());
        }
        if (this.pendingMessages.remove(ackRequestData.getAckId()) == null) {
            return;
        }
        this.flowController.release(1L, (long)ackRequestData.getMessageWrapper().getSerializedSize());
        this.messagesWaiter.incrementPendingCount(-1);
    }

    private void processBatch(List<OutstandingMessage> batch) {
        this.messagesWaiter.incrementPendingCount(batch.size());
        for (OutstandingMessage message : batch) {
            this.tracer.startSubscribeConcurrencyControlSpan(message.messageWrapper());
            try {
                this.flowController.reserve(1L, (long)message.messageWrapper().getPubsubMessage().getSerializedSize());
                this.tracer.endSubscribeConcurrencyControlSpan(message.messageWrapper());
            }
            catch (FlowController.FlowControlException unexpectedException) {
                this.tracer.setSubscribeConcurrencyControlSpanException(message.messageWrapper(), unexpectedException);
                throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
            }
            this.addDeliveryInfoCount(message.messageWrapper());
            this.processOutstandingMessage(message.ackHandler);
        }
    }

    private void addDeliveryInfoCount(PubsubMessageWrapper messageWrapper) {
        PubsubMessage originalMessage = messageWrapper.getPubsubMessage();
        int deliveryAttempt = messageWrapper.getDeliveryAttempt();
        if (deliveryAttempt > 0) {
            messageWrapper.setPubsubMessage(PubsubMessage.newBuilder((PubsubMessage)originalMessage).putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)).build());
        }
    }

    private void processOutstandingMessage(final AckHandler ackHandler) {
        final PubsubMessageWrapper messageWrapper = ackHandler.ackRequestData.getMessageWrapper();
        final PubsubMessage message = messageWrapper.getPubsubMessage();
        final SettableApiFuture ackReplySettableApiFuture = SettableApiFuture.create();
        ApiFutures.addCallback((ApiFuture)ackReplySettableApiFuture, (ApiFutureCallback)ackHandler, (Executor)MoreExecutors.directExecutor());
        Runnable deliverMessageTask = new Runnable(){

            @Override
            public void run() {
                try {
                    if (ackHandler.totalExpiration.plusSeconds(MessageDispatcher.this.messageDeadlineSeconds.get()).isBefore(MessageDispatcher.this.now())) {
                        ackHandler.forget();
                        MessageDispatcher.this.tracer.setSubscriberSpanExpirationResult(messageWrapper);
                        return;
                    }
                    MessageDispatcher.this.tracer.startSubscribeProcessSpan(messageWrapper);
                    if (MessageDispatcher.this.shouldSetMessageFuture()) {
                        SettableApiFuture<AckResponse> messageFuture = ackHandler.getMessageFutureIfExists();
                        AckReplyConsumerWithResponseImpl ackReplyConsumerWithResponse = new AckReplyConsumerWithResponseImpl((SettableApiFuture<AckReply>)ackReplySettableApiFuture, messageFuture);
                        MessageDispatcher.this.receiverWithAckResponse.receiveMessage(message, ackReplyConsumerWithResponse);
                    } else {
                        AckReplyConsumerImpl ackReplyConsumer = new AckReplyConsumerImpl((SettableApiFuture<AckReply>)ackReplySettableApiFuture);
                        MessageDispatcher.this.receiver.receiveMessage(message, ackReplyConsumer);
                    }
                }
                catch (Exception e) {
                    ackReplySettableApiFuture.setException((Throwable)e);
                }
            }
        };
        if (!this.messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) {
            this.executor.execute(deliverMessageTask);
        } else {
            this.tracer.startSubscribeSchedulerSpan(messageWrapper);
            this.sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
            this.tracer.endSubscribeSchedulerSpan(messageWrapper);
        }
    }

    @InternalApi
    int computeDeadlineSeconds() {
        int deadlineSeconds = this.ackLatencyDistribution.getPercentile(99.9);
        if (!this.maxDurationPerAckExtensionDefaultUsed && deadlineSeconds > this.maxDurationPerAckExtensionSeconds) {
            deadlineSeconds = this.maxDurationPerAckExtensionSeconds;
        } else if (deadlineSeconds < this.minDurationPerAckExtensionSeconds) {
            deadlineSeconds = this.minDurationPerAckExtensionSeconds;
        }
        if ((long)deadlineSeconds < Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()) {
            deadlineSeconds = Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds());
        } else if ((long)deadlineSeconds > Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds()) {
            deadlineSeconds = Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds());
        }
        return deadlineSeconds;
    }

    @InternalApi
    void extendDeadlines() {
        int extendSeconds = this.getMessageDeadlineSeconds();
        int numAckIdToSend = 0;
        HashMap<Integer, ModackRequestData> deadlineExtensionModacks = new HashMap<Integer, ModackRequestData>();
        Instant now = this.now();
        Instant extendTo = now.plusSeconds(extendSeconds);
        for (Map.Entry entry : this.pendingMessages.entrySet()) {
            String ackId = (String)entry.getKey();
            Instant totalExpiration = ((AckHandler)entry.getValue()).totalExpiration;
            if (totalExpiration.isAfter(extendTo)) {
                ModackRequestData modackRequestData = deadlineExtensionModacks.computeIfAbsent(extendSeconds, deadlineExtensionSeconds -> new ModackRequestData((int)deadlineExtensionSeconds));
                modackRequestData.addAckRequestData(((AckHandler)entry.getValue()).getAckRequestData());
                ++numAckIdToSend;
                continue;
            }
            ((AckHandler)entry.getValue()).forget();
            if (!totalExpiration.isAfter(now)) continue;
            int sec = Math.max(1, (int)now.until(totalExpiration, ChronoUnit.SECONDS));
            ModackRequestData modackRequestData = deadlineExtensionModacks.computeIfAbsent(sec, extensionSeconds -> new ModackRequestData((int)extensionSeconds));
            modackRequestData.addAckRequestData(((AckHandler)entry.getValue()).getAckRequestData());
            ++numAckIdToSend;
        }
        if (numAckIdToSend > 0) {
            logger.log(Level.FINER, "Sending {0} modacks", numAckIdToSend);
            this.ackProcessor.sendModackOperations(new ArrayList<ModackRequestData>(deadlineExtensionModacks.values()));
        }
    }

    @InternalApi
    void processOutstandingOperations() {
        ArrayList<ModackRequestData> modackRequestData = new ArrayList<ModackRequestData>();
        ArrayList<AckRequestData> nackRequestDataList = new ArrayList<AckRequestData>();
        this.pendingNacks.drainTo(nackRequestDataList);
        if (!nackRequestDataList.isEmpty()) {
            modackRequestData.add(new ModackRequestData(0, nackRequestDataList));
        }
        logger.log(Level.FINER, "Sending {0} nacks", nackRequestDataList.size());
        ArrayList<AckRequestData> ackRequestDataReceipts = new ArrayList<AckRequestData>();
        this.pendingReceipts.drainTo(ackRequestDataReceipts);
        if (!ackRequestDataReceipts.isEmpty()) {
            ModackRequestData receiptModack = new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts);
            receiptModack.setIsReceiptModack(true);
            modackRequestData.add(receiptModack);
        }
        logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size());
        this.ackProcessor.sendModackOperations(modackRequestData);
        ArrayList<AckRequestData> ackRequestDataList = new ArrayList<AckRequestData>();
        this.pendingAcks.drainTo(ackRequestDataList);
        logger.log(Level.FINER, "Sending {0} acks", ackRequestDataList.size());
        this.ackProcessor.sendAckOperations(ackRequestDataList);
    }

    private Instant now() {
        return Instant.ofEpochMilli(this.clock.millisTime());
    }

    public static Builder newBuilder(MessageReceiver receiver) {
        return new Builder(receiver);
    }

    public static Builder newBuilder(MessageReceiverWithAckResponse receiverWithAckResponse) {
        return new Builder(receiverWithAckResponse);
    }

    public static final class Builder {
        private MessageReceiver receiver;
        private MessageReceiverWithAckResponse receiverWithAckResponse;
        private AckProcessor ackProcessor;
        private Duration ackExpirationPadding;
        private Duration maxAckExtensionPeriod;
        private Duration minDurationPerAckExtension;
        private boolean minDurationPerAckExtensionDefaultUsed;
        private Duration maxDurationPerAckExtension;
        private boolean maxDurationPerAckExtensionDefaultUsed;
        private Distribution ackLatencyDistribution;
        private FlowController flowController;
        private Executor executor;
        private ScheduledExecutorService systemExecutor;
        private ApiClock clock;
        private String subscriptionName;
        private boolean enableOpenTelemetryTracing;
        private OpenTelemetryPubsubTracer tracer;

        protected Builder(MessageReceiver receiver) {
            this.receiver = receiver;
        }

        protected Builder(MessageReceiverWithAckResponse receiverWithAckResponse) {
            this.receiverWithAckResponse = receiverWithAckResponse;
        }

        public Builder setAckProcessor(AckProcessor ackProcessor) {
            this.ackProcessor = ackProcessor;
            return this;
        }

        public Builder setAckExpirationPadding(Duration ackExpirationPadding) {
            this.ackExpirationPadding = ackExpirationPadding;
            return this;
        }

        public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
            this.maxAckExtensionPeriod = maxAckExtensionPeriod;
            return this;
        }

        public Builder setMinDurationPerAckExtension(Duration minDurationPerAckExtension) {
            this.minDurationPerAckExtension = minDurationPerAckExtension;
            return this;
        }

        public Builder setMinDurationPerAckExtensionDefaultUsed(boolean minDurationPerAckExtensionDefaultUsed) {
            this.minDurationPerAckExtensionDefaultUsed = minDurationPerAckExtensionDefaultUsed;
            return this;
        }

        public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) {
            this.maxDurationPerAckExtension = maxDurationPerAckExtension;
            return this;
        }

        public Builder setMaxDurationPerAckExtensionDefaultUsed(boolean maxDurationPerAckExtensionDefaultUsed) {
            this.maxDurationPerAckExtensionDefaultUsed = maxDurationPerAckExtensionDefaultUsed;
            return this;
        }

        public Builder setAckLatencyDistribution(Distribution ackLatencyDistribution) {
            this.ackLatencyDistribution = ackLatencyDistribution;
            return this;
        }

        public Builder setFlowController(FlowController flowController) {
            this.flowController = flowController;
            return this;
        }

        public Builder setExecutor(Executor executor) {
            this.executor = executor;
            return this;
        }

        public Builder setSystemExecutor(ScheduledExecutorService systemExecutor) {
            this.systemExecutor = systemExecutor;
            return this;
        }

        public Builder setApiClock(ApiClock clock) {
            this.clock = clock;
            return this;
        }

        public Builder setSubscriptionName(String subscriptionName) {
            this.subscriptionName = subscriptionName;
            return this;
        }

        public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
            this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
            return this;
        }

        public Builder setTracer(OpenTelemetryPubsubTracer tracer) {
            this.tracer = tracer;
            return this;
        }

        public MessageDispatcher build() {
            return new MessageDispatcher(this);
        }
    }

    static interface AckProcessor {
        public void sendAckOperations(List<AckRequestData> var1);

        public void sendModackOperations(List<ModackRequestData> var1);
    }

    private class AckHandler
    implements ApiFutureCallback<AckReply> {
        private final AckRequestData ackRequestData;
        private final int outstandingBytes;
        private final long receivedTimeMillis;
        private final Instant totalExpiration;

        private AckHandler(AckRequestData ackRequestData, int outstandingBytes, Instant totalExpiration) {
            this.ackRequestData = ackRequestData;
            this.outstandingBytes = outstandingBytes;
            this.receivedTimeMillis = MessageDispatcher.this.clock.millisTime();
            this.totalExpiration = totalExpiration;
        }

        public AckRequestData getAckRequestData() {
            return this.ackRequestData;
        }

        public SettableApiFuture<AckResponse> getMessageFutureIfExists() {
            return this.ackRequestData.getMessageFutureIfExists();
        }

        private void forget() {
            if (MessageDispatcher.this.pendingMessages.remove(this.ackRequestData.getAckId()) == null) {
                return;
            }
            MessageDispatcher.this.flowController.release(1L, (long)this.outstandingBytes);
            MessageDispatcher.this.messagesWaiter.incrementPendingCount(-1);
        }

        public void onFailure(Throwable t) {
            logger.log(Level.WARNING, "MessageReceiver failed to process ack ID: " + this.ackRequestData.getAckId() + ", the message will be nacked.", t);
            this.ackRequestData.setResponse(AckResponse.OTHER, false);
            MessageDispatcher.this.pendingNacks.add(this.ackRequestData);
            MessageDispatcher.this.tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
            this.forget();
        }

        public void onSuccess(AckReply reply) {
            switch (reply) {
                case ACK: {
                    MessageDispatcher.this.pendingAcks.add(this.ackRequestData);
                    MessageDispatcher.this.ackLatencyDistribution.record(Ints.saturatedCast((long)((long)Math.ceil((double)(MessageDispatcher.this.clock.millisTime() - this.receivedTimeMillis) / 1000.0))));
                    MessageDispatcher.this.tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack");
                    break;
                }
                case NACK: {
                    MessageDispatcher.this.pendingNacks.add(this.ackRequestData);
                    MessageDispatcher.this.tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "nack");
                    break;
                }
                default: {
                    throw new IllegalArgumentException(String.format("AckReply: %s not supported", new Object[]{reply}));
                }
            }
            this.forget();
        }
    }

    private static class OutstandingMessage {
        private final AckHandler ackHandler;

        private OutstandingMessage(AckHandler ackHandler) {
            this.ackHandler = ackHandler;
        }

        public PubsubMessageWrapper messageWrapper() {
            return this.ackHandler.ackRequestData.getMessageWrapper();
        }
    }

    private static class ReceiptCompleteData {
        private OutstandingMessage outstandingMessage;
        private Boolean receiptComplete;

        private ReceiptCompleteData(OutstandingMessage outstandingMessage) {
            this.outstandingMessage = outstandingMessage;
            this.receiptComplete = false;
        }

        private OutstandingMessage getOutstandingMessage() {
            return this.outstandingMessage;
        }

        private Boolean isReceiptComplete() {
            return this.receiptComplete;
        }

        private void notifyReceiptComplete() {
            this.receiptComplete = true;
        }
    }

    public static enum AckReply {
        ACK,
        NACK;

    }
}

