/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.pulsar.core;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.commons.logging.LogFactory;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
import org.springframework.pulsar.core.ProducerBuilderCustomizer;
import org.springframework.pulsar.core.ProducerUtils;
import org.springframework.util.Assert;

public class CachingPulsarProducerFactory<T>
extends DefaultPulsarProducerFactory<T>
implements DisposableBean {
    private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
    private final Cache<ProducerCacheKey<T>, Producer<T>> producerCache;

    public CachingPulsarProducerFactory(PulsarClient pulsarClient, Map<String, Object> producerConfig, Duration cacheExpireAfterAccess, Long cacheMaximumSize, Integer cacheInitialCapacity) {
        super(pulsarClient, producerConfig);
        this.producerCache = Caffeine.newBuilder().expireAfterAccess(cacheExpireAfterAccess).maximumSize(cacheMaximumSize.longValue()).initialCapacity(cacheInitialCapacity.intValue()).scheduler(Scheduler.systemScheduler()).evictionListener((producerCacheKey, producer, cause) -> {
            this.logger.debug(() -> String.format("Producer %s evicted from cache due to %s", ProducerUtils.formatProducer(producer), cause));
            this.closeProducer((Producer<T>)producer);
        }).build();
    }

    @Override
    protected Producer<T> doCreateProducer(Schema<T> schema, @Nullable String topic, @Nullable Collection<String> encryptionKeys, @Nullable List<ProducerBuilderCustomizer<T>> customizers) {
        String topicName = ProducerUtils.resolveTopicName(topic, this);
        ProducerCacheKey<T> producerCacheKey = new ProducerCacheKey<T>(schema, topicName, (Set<String>)(encryptionKeys == null ? null : new HashSet<String>(encryptionKeys)), customizers);
        return (Producer)this.producerCache.get(producerCacheKey, st -> this.createCacheableProducer(st.schema, st.topic, st.encryptionKeys, customizers));
    }

    private Producer<T> createCacheableProducer(Schema<T> schema, String topic, @Nullable Collection<String> encryptionKeys, @Nullable List<ProducerBuilderCustomizer<T>> customizers) {
        try {
            Producer producer = super.doCreateProducer(schema, topic, encryptionKeys, customizers);
            return new ProducerWithCloseCallback<T>(producer, p -> this.logger.trace(() -> String.format("Client closed producer %s but will skip actual closing", ProducerUtils.formatProducer(producer))));
        }
        catch (PulsarClientException ex) {
            throw new RuntimeException(ex);
        }
    }

    public void destroy() {
        this.producerCache.asMap().forEach((producerCacheKey, producer) -> {
            this.producerCache.invalidate(producerCacheKey);
            this.closeProducer((Producer<T>)producer);
        });
    }

    private void closeProducer(Producer<T> producer) {
        Producer actualProducer = null;
        if (producer instanceof ProducerWithCloseCallback) {
            ProducerWithCloseCallback wrappedProducer = (ProducerWithCloseCallback)producer;
            actualProducer = wrappedProducer.getActualProducer();
        }
        if (actualProducer == null) {
            this.logger.warn(() -> String.format("Unable to get actual producer for %s - will skip closing it", ProducerUtils.formatProducer(producer)));
            return;
        }
        ProducerUtils.closeProducerAsync(actualProducer, this.logger);
    }

    static class ProducerCacheKey<T> {
        private final Schema<T> schema;
        private final SchemaHash schemaHash;
        private final String topic;
        @Nullable
        private final Set<String> encryptionKeys;
        @Nullable
        private final List<ProducerBuilderCustomizer<T>> customizers;

        ProducerCacheKey(Schema<T> schema, String topic, @Nullable Set<String> encryptionKeys, @Nullable List<ProducerBuilderCustomizer<T>> customizers) {
            Assert.notNull(schema, () -> "'schema' must be non-null");
            Assert.notNull((Object)topic, () -> "'topic' must be non-null");
            this.schema = schema;
            this.schemaHash = SchemaHash.of(this.schema);
            this.topic = topic;
            this.encryptionKeys = encryptionKeys;
            this.customizers = customizers;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ProducerCacheKey that = (ProducerCacheKey)o;
            return this.topic.equals(that.topic) && this.schemaHash.equals((Object)that.schemaHash) && Objects.equals(this.encryptionKeys, that.encryptionKeys) && Objects.equals(this.customizers, that.customizers);
        }

        public int hashCode() {
            return this.topic.hashCode() + this.schemaHash.hashCode() + Objects.hashCode(this.encryptionKeys) + Objects.hashCode(this.customizers);
        }
    }

    static class ProducerWithCloseCallback<T>
    implements Producer<T> {
        private final Producer<T> producer;
        private final Consumer<Producer<T>> closeCallback;

        ProducerWithCloseCallback(Producer<T> producer, Consumer<Producer<T>> closeCallback) {
            this.producer = producer;
            this.closeCallback = closeCallback;
        }

        public Producer<T> getActualProducer() {
            return this.producer;
        }

        public String getTopic() {
            return this.producer.getTopic();
        }

        public String getProducerName() {
            return this.producer.getProducerName();
        }

        public MessageId send(T message) throws PulsarClientException {
            return this.producer.send(message);
        }

        public CompletableFuture<MessageId> sendAsync(T message) {
            return this.producer.sendAsync(message);
        }

        public void flush() throws PulsarClientException {
            this.producer.flush();
        }

        public CompletableFuture<Void> flushAsync() {
            return this.producer.flushAsync();
        }

        public TypedMessageBuilder<T> newMessage() {
            return this.producer.newMessage();
        }

        public <V> TypedMessageBuilder<V> newMessage(Schema<V> schema) {
            return this.producer.newMessage(schema);
        }

        public TypedMessageBuilder<T> newMessage(Transaction txn) {
            return this.producer.newMessage(txn);
        }

        public long getLastSequenceId() {
            return this.producer.getLastSequenceId();
        }

        public ProducerStats getStats() {
            return this.producer.getStats();
        }

        public void close() throws PulsarClientException {
            this.closeCallback.accept(this.producer);
        }

        public CompletableFuture<Void> closeAsync() {
            this.closeCallback.accept(this.producer);
            return CompletableFuture.completedFuture(null);
        }

        public boolean isConnected() {
            return this.producer.isConnected();
        }

        public long getLastDisconnectedTimestamp() {
            return this.producer.getLastDisconnectedTimestamp();
        }

        public int getNumOfPartitions() {
            return this.producer.getNumOfPartitions();
        }
    }
}

