/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.sink.writer.topic;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
import org.apache.flink.connector.pulsar.common.metrics.ProducerMetricsInterceptor;
import org.apache.flink.connector.pulsar.common.schema.PulsarSchemaUtils;
import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
import org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.shade.com.google.common.base.Strings;
import org.apache.pulsar.shade.com.google.common.io.Closer;

@Internal
public class ProducerRegister
implements Closeable {
    private static final String FAIL_TO_CREATE_TOPIC = "Fail to create the non-exist topic, make sure you have enable the topic auto creation in Pulsar.";
    private final PulsarClient pulsarClient;
    @Nullable
    private final TransactionCoordinatorClient coordinatorClient;
    private final SinkConfiguration sinkConfiguration;
    private final PulsarCrypto pulsarCrypto;
    private final SinkWriterMetricGroup metricGroup;
    private final Map<String, Schema<byte[]>> schemas;
    private final Map<String, Map<SchemaHash, Producer<?>>> producers;
    private final Map<String, Transaction> transactions;

    public ProducerRegister(SinkConfiguration sinkConfiguration, PulsarCrypto pulsarCrypto, SinkWriterMetricGroup metricGroup) throws PulsarClientException {
        this.pulsarClient = PulsarClientFactory.createClient(sinkConfiguration);
        this.sinkConfiguration = sinkConfiguration;
        this.pulsarCrypto = pulsarCrypto;
        this.metricGroup = metricGroup;
        this.schemas = new HashMap<String, Schema<byte[]>>();
        this.producers = new HashMap();
        this.transactions = new HashMap<String, Transaction>();
        if (sinkConfiguration.isEnableMetrics()) {
            metricGroup.setCurrentSendTimeGauge(this::currentSendTimeGauge);
        }
        this.coordinatorClient = sinkConfiguration.getDeliveryGuarantee() == DeliveryGuarantee.EXACTLY_ONCE ? PulsarTransactionUtils.getTcClient(this.pulsarClient) : null;
    }

    public <T> TypedMessageBuilder<T> createMessageBuilder(String topic, @Nullable Schema<?> schema) throws PulsarClientException {
        if (schema == null || schema.getSchemaInfo().getType() == SchemaType.BYTES) {
            schema = this.getBytesSchema(topic);
        }
        ProducerBase producer = (ProducerBase)this.getOrCreateProducer(topic, schema);
        TransactionImpl transaction = null;
        if (this.sinkConfiguration.getDeliveryGuarantee() == DeliveryGuarantee.EXACTLY_ONCE) {
            transaction = (TransactionImpl)this.getOrCreateTransaction(topic);
        }
        return new TypedMessageBuilderImpl(producer, schema, transaction);
    }

    public List<PulsarCommittable> prepareCommit() {
        ArrayList<PulsarCommittable> committables = new ArrayList<PulsarCommittable>(this.transactions.size());
        for (Map.Entry<String, Transaction> entry : this.transactions.entrySet()) {
            String topic = entry.getKey();
            Transaction transaction = entry.getValue();
            TxnID txnID = transaction.getTxnID();
            committables.add(new PulsarCommittable(txnID, topic));
        }
        this.transactions.clear();
        return committables;
    }

    public void flush() throws IOException {
        for (Map<SchemaHash, Producer<?>> set : this.producers.values()) {
            for (Producer<?> producer : set.values()) {
                producer.flush();
            }
        }
    }

    @Override
    public void close() throws IOException {
        try (Closer closer = Closer.create();){
            closer.register(this::flush);
            closer.register(this::abortTransactions);
            closer.register(this.producers::clear);
            closer.register(this.pulsarClient);
        }
    }

    private <T> Producer<T> getOrCreateProducer(String topic, Schema<T> schema) throws PulsarClientException {
        SchemaHash hash;
        Map set = this.producers.computeIfAbsent(topic, t -> new HashMap());
        if (set.containsKey(hash = PulsarSchemaUtils.hash(schema))) {
            return (Producer)set.get(hash);
        }
        try {
            TopicName topicName = TopicName.get(topic);
            ((PulsarClientImpl)this.pulsarClient).getLookup().getPartitionedTopicMetadata(topicName).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new FlinkRuntimeException(FAIL_TO_CREATE_TOPIC, (Throwable)e);
        }
        catch (ExecutionException e) {
            throw new FlinkRuntimeException(FAIL_TO_CREATE_TOPIC, (Throwable)e);
        }
        ProducerBuilder<T> builder = PulsarSinkConfigUtils.createProducerBuilder(this.pulsarClient, schema, this.sinkConfiguration);
        this.configPulsarCrypto(builder);
        builder.topic(topic);
        builder.intercept(new ProducerMetricsInterceptor(this.metricGroup));
        Producer<T> producer = builder.create();
        this.exposeProducerMetrics(producer);
        set.put(hash, producer);
        return producer;
    }

    private void configPulsarCrypto(ProducerBuilder<?> builder) {
        CryptoKeyReader cryptoKeyReader = this.pulsarCrypto.cryptoKeyReader();
        if (cryptoKeyReader == null) {
            return;
        }
        builder.cryptoKeyReader(cryptoKeyReader);
        Set<String> encryptKeys = this.pulsarCrypto.encryptKeys();
        if (encryptKeys == null || encryptKeys.isEmpty()) {
            throw new IllegalArgumentException("You should provide encryptKeys in PulsarCrypto");
        }
        encryptKeys.forEach(builder::addEncryptionKey);
        MessageCrypto<MessageMetadata, MessageMetadata> messageCrypto = this.pulsarCrypto.messageCrypto();
        if (messageCrypto != null) {
            ProducerConfigurationData producerConfig = ((ProducerBuilderImpl)builder).getConf();
            producerConfig.setMessageCrypto(messageCrypto);
        }
    }

    private Transaction getOrCreateTransaction(String topic) throws PulsarClientException {
        if (this.transactions.containsKey(topic)) {
            return this.transactions.get(topic);
        }
        long timeoutMillis = this.sinkConfiguration.getTransactionTimeoutMillis();
        Transaction transaction = PulsarTransactionUtils.createTransaction(this.pulsarClient, timeoutMillis);
        this.transactions.put(topic, transaction);
        return transaction;
    }

    private Schema<byte[]> getBytesSchema(String topic) {
        if (this.sinkConfiguration.isValidateSinkMessageBytes()) {
            return this.schemas.computeIfAbsent(topic, t -> Schema.AUTO_PRODUCE_BYTES());
        }
        return Schema.BYTES;
    }

    private void abortTransactions() {
        if (this.coordinatorClient == null || this.transactions.isEmpty()) {
            return;
        }
        try (Closer closer = Closer.create();){
            for (Transaction transaction : this.transactions.values()) {
                TxnID txnID = transaction.getTxnID();
                closer.register(() -> this.coordinatorClient.abort(txnID));
            }
            this.transactions.clear();
        }
        catch (IOException e) {
            throw new FlinkRuntimeException((Throwable)e);
        }
    }

    private Long currentSendTimeGauge() {
        double sendTime = this.producers.values().stream().flatMap(set -> set.values().stream()).map(Producer::getStats).mapToDouble(ProducerStats::getSendLatencyMillis50pct).average().orElse(9.223372036854776E18);
        return Math.round(sendTime);
    }

    private void exposeProducerMetrics(Producer<?> producer) {
        if (this.sinkConfiguration.isEnableMetrics()) {
            String producerIdentity = producer.getProducerName();
            if (Strings.isNullOrEmpty(producerIdentity)) {
                producerIdentity = UUID.randomUUID().toString();
            }
            MetricGroup group = this.metricGroup.addGroup("PulsarProducer").addGroup(producer.getTopic()).addGroup(producerIdentity);
            ProducerStats stats = producer.getStats();
            group.gauge("numMsgsSent", stats::getNumMsgsSent);
            group.gauge("numBytesSent", stats::getNumBytesSent);
            group.gauge("numSendFailed", stats::getNumSendFailed);
            group.gauge("numAcksReceived", stats::getNumAcksReceived);
            group.gauge("sendMsgsRate", stats::getSendMsgsRate);
            group.gauge("sendBytesRate", stats::getSendBytesRate);
            group.gauge("sendLatencyMillis50pct", stats::getSendLatencyMillis50pct);
            group.gauge("sendLatencyMillis75pct", stats::getSendLatencyMillis75pct);
            group.gauge("sendLatencyMillis95pct", stats::getSendLatencyMillis95pct);
            group.gauge("sendLatencyMillis99pct", stats::getSendLatencyMillis99pct);
            group.gauge("sendLatencyMillis999pct", stats::getSendLatencyMillis999pct);
            group.gauge("sendLatencyMillisMax", stats::getSendLatencyMillisMax);
            group.gauge("totalMsgsSent", stats::getTotalMsgsSent);
            group.gauge("totalBytesSent", stats::getTotalBytesSent);
            group.gauge("totalSendFailed", stats::getTotalSendFailed);
            group.gauge("totalAcksReceived", stats::getTotalAcksReceived);
            group.gauge("pendingQueueSize", stats::getPendingQueueSize);
        }
    }
}

