/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.ConfirmationStatus;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageBuilder;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.Resource;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.compression.CompressionCodec;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.MessageAccumulator;
import com.rabbitmq.stream.impl.ProducerUtils;
import com.rabbitmq.stream.impl.ResourceBase;
import com.rabbitmq.stream.impl.StreamEnvironment;
import com.rabbitmq.stream.impl.Utils;
import io.netty.buffer.ByteBuf;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class StreamProducer
extends ResourceBase
implements Producer {
    private static final AtomicLong ID_SEQUENCE = new AtomicLong(0L);
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamProducer.class);
    private static final ConfirmationHandler NO_OP_CONFIRMATION_HANDLER = confirmationStatus -> {};
    private final long id;
    private final MessageAccumulator accumulator;
    private final ConcurrentMap<Long, ProducerUtils.AccumulatedEntity> unconfirmedMessages;
    private final int batchSize;
    private final String name;
    private final String stream;
    private final Client.OutboundEntityWriteCallback writeCallback;
    private final Semaphore unconfirmedMessagesSemaphore;
    private final Runnable closingCallback;
    private final StreamEnvironment environment;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final int maxUnconfirmedMessages;
    private final Codec codec;
    private final ToLongFunction<Object> publishSequenceFunction = entity -> ((ProducerUtils.AccumulatedEntity)entity).publishingId();
    private final long enqueueTimeoutMs;
    private final boolean blockOnMaxUnconfirmed;
    private final boolean retryOnRecovery;
    private volatile Client client;
    private volatile byte publisherId;
    private volatile ScheduledFuture<?> confirmTimeoutFuture;
    private final short publishVersion;
    private final Lock lock = new ReentrantLock();
    private static final Client.OutboundEntityWriteCallback OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK = new OutboundMessageFilterValueWriterCallback();

    StreamProducer(String name, String stream, int subEntrySize, int batchSize, boolean dynamicBatch, Compression compression, Duration batchPublishingDelay, int maxUnconfirmedMessages, Duration confirmTimeout, Duration enqueueTimeout, boolean retryOnRecovery, Function<Message, String> filterValueExtractor, List<Resource.StateListener> listeners, StreamEnvironment environment) {
        super(listeners);
        AtomicReference<Runnable> taskReference;
        if (filterValueExtractor != null && !environment.filteringSupported()) {
            throw new IllegalArgumentException("Filtering is not supported by the broker (requires RabbitMQ 3.13+ and stream_filtering feature flag activated");
        }
        this.id = ID_SEQUENCE.getAndIncrement();
        this.environment = environment;
        this.name = name;
        this.stream = stream;
        this.enqueueTimeoutMs = enqueueTimeout.toMillis();
        this.retryOnRecovery = retryOnRecovery;
        this.blockOnMaxUnconfirmed = enqueueTimeout.isZero();
        this.closingCallback = environment.registerProducer(this, name, this.stream);
        AtomicLong publishingSequence = new AtomicLong(this.computeFirstValueOfPublishingSequence());
        ToLongFunction<Message> accumulatorPublishSequenceFunction = msg -> {
            if (msg.hasPublishingId()) {
                return msg.getPublishingId();
            }
            return publishingSequence.getAndIncrement();
        };
        final Client.OutboundEntityWriteCallback delegateWriteCallback = subEntrySize <= 1 ? (filterValueExtractor == null ? Client.OUTBOUND_MESSAGE_WRITE_CALLBACK : OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK) : Client.OUTBOUND_MESSAGE_BATCH_WRITE_CALLBACK;
        this.maxUnconfirmedMessages = maxUnconfirmedMessages;
        this.unconfirmedMessagesSemaphore = new Semaphore(maxUnconfirmedMessages, true);
        this.unconfirmedMessages = new ConcurrentHashMap<Long, ProducerUtils.AccumulatedEntity>(this.maxUnconfirmedMessages, 0.75f, 2);
        if (filterValueExtractor == null) {
            this.publishVersion = 1;
            this.writeCallback = new Client.OutboundEntityWriteCallback(){

                @Override
                public int write(ByteBuf bb, Object entity, long publishingId) {
                    ProducerUtils.AccumulatedEntity accumulatedEntity = (ProducerUtils.AccumulatedEntity)entity;
                    StreamProducer.this.unconfirmedMessages.put(publishingId, accumulatedEntity);
                    return delegateWriteCallback.write(bb, accumulatedEntity.encodedEntity(), publishingId);
                }

                @Override
                public int fragmentLength(Object entity) {
                    return delegateWriteCallback.fragmentLength(((ProducerUtils.AccumulatedEntity)entity).encodedEntity());
                }
            };
        } else {
            this.publishVersion = (short)2;
            this.writeCallback = new Client.OutboundEntityWriteCallback(){

                @Override
                public int write(ByteBuf bb, Object entity, long publishingId) {
                    ProducerUtils.AccumulatedEntity accumulatedEntity = (ProducerUtils.AccumulatedEntity)entity;
                    StreamProducer.this.unconfirmedMessages.put(publishingId, accumulatedEntity);
                    return delegateWriteCallback.write(bb, accumulatedEntity, publishingId);
                }

                @Override
                public int fragmentLength(Object entity) {
                    return delegateWriteCallback.fragmentLength(entity);
                }
            };
        }
        CompressionCodec compressionCodec = null;
        if (compression != null) {
            compressionCodec = environment.compressionCodecFactory().get(compression);
        }
        this.accumulator = ProducerUtils.createMessageAccumulator(dynamicBatch, subEntrySize, batchSize, maxUnconfirmedMessages, compressionCodec, environment.codec(), environment.byteBufAllocator(), this.client.maxFrameSize(), accumulatorPublishSequenceFunction, filterValueExtractor, environment.clock(), stream, environment.observationCollector(), this, this.id);
        boolean backgroundBatchPublishingTaskRequired = !dynamicBatch && batchPublishingDelay.toMillis() > 0L;
        LOGGER.debug("Background batch publishing task required? {}", (Object)backgroundBatchPublishingTaskRequired);
        if (backgroundBatchPublishingTaskRequired) {
            taskReference = new AtomicReference<Runnable>();
            Runnable task = () -> {
                if (this.canSend()) {
                    this.accumulator.flush(false);
                }
                if (this.state() != Resource.State.CLOSED) {
                    environment.scheduledExecutorService().schedule(Utils.namedRunnable((Runnable)taskReference.get(), "Background batch publishing task for publisher %d on stream '%s'", this.id, this.stream), batchPublishingDelay.toMillis(), TimeUnit.MILLISECONDS);
                }
            };
            taskReference.set(task);
            environment.scheduledExecutorService().schedule(Utils.namedRunnable(task, "Background batch publishing task for publisher %d on stream '%s'", this.id, this.stream), batchPublishingDelay.toMillis(), TimeUnit.MILLISECONDS);
        }
        this.batchSize = batchSize;
        this.codec = environment.codec();
        if (!confirmTimeout.isZero()) {
            taskReference = new AtomicReference();
            Runnable confirmTimeoutTask = this.confirmTimeoutTask(confirmTimeout);
            Runnable wrapperTask = () -> {
                try {
                    confirmTimeoutTask.run();
                }
                catch (Exception e) {
                    LOGGER.info("Error while executing confirm timeout check task: {}", e.getCause());
                }
                if (this.state() != Resource.State.CLOSED) {
                    this.confirmTimeoutFuture = this.environment.scheduledExecutorService().schedule(Utils.namedRunnable((Runnable)taskReference.get(), "Background confirm timeout task for producer %d on stream %s", this.id, this.stream), confirmTimeout.toMillis(), TimeUnit.MILLISECONDS);
                }
            };
            taskReference.set(wrapperTask);
            this.confirmTimeoutFuture = this.environment.scheduledExecutorService().schedule(Utils.namedRunnable((Runnable)taskReference.get(), "Background confirm timeout task for producer %d on stream %s", this.id, this.stream), confirmTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        this.state(Resource.State.OPEN);
    }

    private Runnable confirmTimeoutTask(Duration confirmTimeout) {
        return () -> {
            long limit = this.environment.clock().time() - confirmTimeout.toNanos();
            TreeMap<Long, ProducerUtils.AccumulatedEntity> unconfirmedSnapshot = new TreeMap<Long, ProducerUtils.AccumulatedEntity>(this.unconfirmedMessages);
            int count = 0;
            for (Map.Entry unconfirmedEntry : unconfirmedSnapshot.entrySet()) {
                if (((ProducerUtils.AccumulatedEntity)unconfirmedEntry.getValue()).time() >= limit) break;
                if (Thread.currentThread().isInterrupted()) {
                    return;
                }
                this.error((Long)unconfirmedEntry.getKey(), (short)10004);
                ++count;
            }
            if (count > 0) {
                LOGGER.debug("{} outbound message(s) had reached the confirm timeout (limit {}) for producer {} on stream '{}', application notified with callback", new Object[]{count, limit, this.id, this.stream});
            }
        };
    }

    private long computeFirstValueOfPublishingSequence() {
        if (this.name == null || this.name.isEmpty()) {
            return 0L;
        }
        long lastPublishingId = this.client.queryPublisherSequence(this.name, this.stream);
        if (lastPublishingId == 0L) {
            return 0L;
        }
        return lastPublishingId + 1L;
    }

    void confirm(long publishingId) {
        ProducerUtils.AccumulatedEntity accumulatedEntity = (ProducerUtils.AccumulatedEntity)this.unconfirmedMessages.remove(publishingId);
        if (accumulatedEntity != null) {
            int confirmedCount = accumulatedEntity.confirmationCallback().handle(true, (short)1);
            this.unconfirmedMessagesSemaphore.release(confirmedCount);
        } else {
            this.unconfirmedMessagesSemaphore.release();
        }
    }

    int unconfirmedCount() {
        return this.unconfirmedMessages.size();
    }

    void error(long publishingId, short errorCode) {
        ProducerUtils.AccumulatedEntity accumulatedEntity = (ProducerUtils.AccumulatedEntity)this.unconfirmedMessages.remove(publishingId);
        if (accumulatedEntity != null) {
            int nackedCount = accumulatedEntity.confirmationCallback().handle(false, errorCode);
            this.unconfirmedMessagesSemaphore.release(nackedCount);
        } else {
            this.unconfirmedMessagesSemaphore.release();
        }
    }

    @Override
    public MessageBuilder messageBuilder() {
        return this.codec.messageBuilder();
    }

    @Override
    public long getLastPublishingId() {
        this.checkNotClosed();
        if (this.name != null && !this.name.isEmpty()) {
            if (this.canSend()) {
                try {
                    return this.client.queryPublisherSequence(this.name, this.stream);
                }
                catch (Exception e) {
                    throw new IllegalStateException("Error while trying to query last publishing ID for producer " + this.name + " on stream " + this.stream);
                }
            }
            throw new IllegalStateException("The producer has no connection");
        }
        throw new IllegalStateException("The producer has no name");
    }

    @Override
    public void send(Message message, ConfirmationHandler confirmationHandler) {
        if (confirmationHandler == null) {
            confirmationHandler = NO_OP_CONFIRMATION_HANDLER;
        }
        try {
            if (this.canSend()) {
                if (this.blockOnMaxUnconfirmed) {
                    this.unconfirmedMessagesSemaphore.acquire();
                    this.doSend(message, confirmationHandler);
                } else if (this.unconfirmedMessagesSemaphore.tryAcquire(this.enqueueTimeoutMs, TimeUnit.MILLISECONDS)) {
                    this.doSend(message, confirmationHandler);
                } else {
                    confirmationHandler.handle(new ConfirmationStatus(message, false, 10001));
                }
            } else {
                this.failPublishing(message, confirmationHandler);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new StreamException("Interrupted while waiting to accumulate outbound message", e);
        }
    }

    private void doSend(Message message, ConfirmationHandler confirmationHandler) {
        if (this.canSend()) {
            this.accumulator.add(message, confirmationHandler);
        } else {
            this.failPublishing(message, confirmationHandler);
        }
    }

    private void failPublishing(Message message, ConfirmationHandler confirmationHandler) {
        if (this.state() == Resource.State.RECOVERING) {
            confirmationHandler.handle(new ConfirmationStatus(message, false, 10002));
        } else if (this.state() == Resource.State.CLOSED) {
            confirmationHandler.handle(new ConfirmationStatus(message, false, 10003));
        } else {
            confirmationHandler.handle(new ConfirmationStatus(message, false, 10002));
        }
    }

    boolean canSend() {
        return this.state() == Resource.State.OPEN;
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.state(Resource.State.CLOSING);
            if (this.state() == Resource.State.OPEN && this.client != null) {
                LOGGER.debug("Deleting producer {}", (Object)this.publisherId);
                Client.Response response = this.client.deletePublisher(this.publisherId);
                if (!response.isOk()) {
                    LOGGER.info("Could not delete publisher {} on producer closing: {}", (Object)this.publisherId, (Object)Utils.formatConstant(response.getResponseCode()));
                }
            } else {
                LOGGER.debug("No need to delete producer {}, it is currently unavailable", (Object)this.publisherId);
            }
            this.environment.removeProducer(this);
            this.closeFromEnvironment();
        }
    }

    void closeFromEnvironment() {
        this.accumulator.close();
        this.closingCallback.run();
        this.cancelConfirmTimeoutTask();
        this.closed.set(true);
        this.state(Resource.State.CLOSED);
        LOGGER.debug("Closed publisher {} successfully", (Object)this.publisherId);
    }

    void closeAfterStreamDeletion(short code) {
        if (this.closed.compareAndSet(false, true)) {
            if (!this.unconfirmedMessages.isEmpty()) {
                Iterator iterator = this.unconfirmedMessages.entrySet().iterator();
                while (iterator.hasNext()) {
                    ProducerUtils.AccumulatedEntity entry = (ProducerUtils.AccumulatedEntity)iterator.next().getValue();
                    int confirmedCount = entry.confirmationCallback().handle(false, code);
                    this.unconfirmedMessagesSemaphore.release(confirmedCount);
                    iterator.remove();
                }
            }
            this.cancelConfirmTimeoutTask();
            this.environment.removeProducer(this);
            this.state(Resource.State.CLOSED);
        }
    }

    private void cancelConfirmTimeoutTask() {
        if (this.confirmTimeoutFuture != null) {
            this.confirmTimeoutFuture.cancel(true);
        }
    }

    void publishInternal(List<Object> messages) {
        this.client.publishInternal(this.publishVersion, this.publisherId, messages, this.writeCallback, this.publishSequenceFunction);
    }

    boolean isOpen() {
        return !this.closed.get();
    }

    void unavailable() {
        this.state(Resource.State.RECOVERING);
    }

    void running() {
        this.executeInLock(() -> {
            LOGGER.debug("Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)", (Object)this.unconfirmedMessages.size(), (Object)this.accumulator.size());
            if (this.retryOnRecovery) {
                LOGGER.debug("Re-publishing {} unconfirmed message(s)", (Object)this.unconfirmedMessages.size());
                if (!this.unconfirmedMessages.isEmpty()) {
                    TreeMap<Long, ProducerUtils.AccumulatedEntity> messagesToResend = new TreeMap<Long, ProducerUtils.AccumulatedEntity>(this.unconfirmedMessages);
                    this.unconfirmedMessages.clear();
                    Iterator resendIterator = messagesToResend.entrySet().iterator();
                    while (resendIterator.hasNext()) {
                        ArrayList<Object> messages = new ArrayList<Object>(this.batchSize);
                        for (int batchCount = 0; batchCount != this.batchSize; ++batchCount) {
                            Object accMessage;
                            Object e = accMessage = resendIterator.hasNext() ? (Object)resendIterator.next().getValue() : null;
                            if (accMessage == null) break;
                            messages.add(accMessage);
                        }
                        this.client.publishInternal(this.publishVersion, this.publisherId, messages, this.writeCallback, this.publishSequenceFunction);
                    }
                }
            } else {
                LOGGER.debug("Skipping republishing of {} unconfirmed messages", (Object)this.unconfirmedMessages.size());
                TreeMap<Long, ProducerUtils.AccumulatedEntity> messagesToFail = new TreeMap<Long, ProducerUtils.AccumulatedEntity>(this.unconfirmedMessages);
                this.unconfirmedMessages.clear();
                for (ProducerUtils.AccumulatedEntity accumulatedEntity : messagesToFail.values()) {
                    try {
                        int permits = accumulatedEntity.confirmationCallback().handle(false, (short)10004);
                        this.unconfirmedMessagesSemaphore.release(permits);
                    }
                    catch (Exception e) {
                        LOGGER.debug("Error while nack-ing outbound message: {}", (Object)e.getMessage());
                        this.unconfirmedMessagesSemaphore.release(1);
                    }
                }
            }
            this.accumulator.flush(true);
            int toRelease = this.maxUnconfirmedMessages - this.unconfirmedMessagesSemaphore.availablePermits();
            if (toRelease > 0) {
                this.unconfirmedMessagesSemaphore.release(toRelease);
                if (!this.unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) {
                    LOGGER.debug("Could not acquire {} permit(s) for message republishing", (Object)this.unconfirmedMessages.size());
                }
            }
        });
        this.state(Resource.State.OPEN);
    }

    void setClient(Client client) {
        this.executeInLock(() -> {
            this.client = client;
        });
    }

    void setPublisherId(byte publisherId) {
        this.executeInLock(() -> {
            this.publisherId = publisherId;
        });
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        StreamProducer that = (StreamProducer)o;
        return this.id == that.id && this.stream.equals(that.stream);
    }

    public int hashCode() {
        return Objects.hash(this.id, this.stream);
    }

    public String toString() {
        Client client = this.client;
        return "{ \"producer_id\" : " + this.id + ",\"stream\" : \"" + this.stream + "\",\"publishing_client\" : " + (String)(client == null ? "null" : "\"" + client.connectionName() + "\"") + "}";
    }

    private void checkNotClosed() {
        if (this.closed.get()) {
            throw new IllegalStateException("This producer instance has been closed");
        }
    }

    void lock() {
        this.lock.lock();
    }

    void unlock() {
        this.lock.unlock();
    }

    private void executeInLock(Runnable action) {
        this.lock();
        try {
            action.run();
        }
        finally {
            this.unlock();
        }
    }

    long id() {
        return this.id;
    }

    private static final class OutboundMessageFilterValueWriterCallback
    implements Client.OutboundEntityWriteCallback {
        private OutboundMessageFilterValueWriterCallback() {
        }

        @Override
        public int write(ByteBuf bb, Object entity, long publishingId) {
            ProducerUtils.AccumulatedEntity accumulatedEntity = (ProducerUtils.AccumulatedEntity)entity;
            String filterValue = accumulatedEntity.filterValue();
            if (filterValue == null) {
                bb.writeShort(-1);
            } else {
                bb.writeShort(filterValue.length());
                bb.writeBytes(filterValue.getBytes(StandardCharsets.UTF_8));
            }
            Codec.EncodedMessage messageToPublish = (Codec.EncodedMessage)accumulatedEntity.encodedEntity();
            bb.writeInt(messageToPublish.getSize());
            bb.writeBytes(messageToPublish.getData(), 0, messageToPublish.getSize());
            return 1;
        }

        @Override
        public int fragmentLength(Object entity) {
            ProducerUtils.AccumulatedEntity accumulatedEntity = (ProducerUtils.AccumulatedEntity)entity;
            Codec.EncodedMessage message = (Codec.EncodedMessage)accumulatedEntity.encodedEntity();
            String filterValue = accumulatedEntity.filterValue();
            if (filterValue == null) {
                return 14 + message.getSize();
            }
            return 10 + accumulatedEntity.filterValue().length() + 4 + message.getSize();
        }
    }
}

