/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.sink;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import lombok.Generated;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.AbstractSinkRecord;
import org.apache.pulsar.functions.instance.ProducerBuilderFactory;
import org.apache.pulsar.functions.instance.ProducerCache;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarSink<T>
implements Sink<T> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PulsarSink.class);
    private final PulsarClient client;
    private final PulsarSinkConfig pulsarSinkConfig;
    private final Map<String, String> properties;
    private final ClassLoader functionClassLoader;
    private ComponentStatsManager stats;
    private final ProducerCache producerCache;
    @VisibleForTesting
    PulsarSinkProcessor<T> pulsarSinkProcessor;
    private final TopicSchema topicSchema;
    private Schema<T> schema;
    private ProducerBuilderFactory producerBuilderFactory;

    public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties, ComponentStatsManager stats, ClassLoader functionClassLoader, ProducerCache producerCache) {
        this.client = client;
        this.pulsarSinkConfig = pulsarSinkConfig;
        this.topicSchema = new TopicSchema(client, functionClassLoader);
        this.properties = properties;
        this.stats = stats;
        this.functionClassLoader = functionClassLoader;
        this.producerCache = producerCache;
    }

    @Override
    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        log.info("Opening pulsar sink with config: {}", (Object)this.pulsarSinkConfig);
        this.schema = this.initializeSchema();
        if (this.schema == null) {
            log.info("Since output type is null, not creating any real sink");
            return;
        }
        this.producerBuilderFactory = new ProducerBuilderFactory(this.client, this.pulsarSinkConfig.getProducerConfig(), this.functionClassLoader, null);
        FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees();
        switch (processingGuarantees) {
            case ATMOST_ONCE: {
                this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor();
                break;
            }
            case ATLEAST_ONCE: {
                this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor();
                break;
            }
            case EFFECTIVELY_ONCE: {
                this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor();
                break;
            }
            case MANUAL: {
                this.pulsarSinkProcessor = new PulsarSinkManualProcessor();
            }
        }
    }

    @Override
    public void write(Record<T> record) {
        AbstractSinkRecord sinkRecord = (AbstractSinkRecord)record;
        TypedMessageBuilder<T> msg = this.pulsarSinkProcessor.newMessage(sinkRecord);
        if (record.getKey().isPresent() && (!(record.getSchema() instanceof KeyValueSchema) || ((KeyValueSchema)record.getSchema()).getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED)) {
            msg.key(record.getKey().get());
        }
        msg.value(record.getValue());
        if (!record.getProperties().isEmpty() && (sinkRecord.shouldAlwaysSetMessageProperties() || this.pulsarSinkConfig.isForwardSourceMessageProperty())) {
            msg.properties(record.getProperties());
        }
        if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
            PulsarRecord pulsarRecord = (PulsarRecord)sinkRecord.getSourceRecord();
            msg.property("__pfn_input_topic__", pulsarRecord.getTopicName().get()).property("__pfn_input_msg_id__", new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray()), StandardCharsets.UTF_8));
        } else {
            Optional<Long> eventTime = sinkRecord.getSourceRecord().getEventTime();
            eventTime.ifPresent(msg::eventTime);
        }
        this.pulsarSinkProcessor.sendOutputMessage(msg, sinkRecord);
    }

    @Override
    public void close() throws Exception {
        if (this.pulsarSinkProcessor != null) {
            this.pulsarSinkProcessor.close();
        }
    }

    Producer<T> createProducer(String topicName, Schema<T> schema, String producerName) {
        Schema<T> schemaToUse = schema != null ? schema : this.schema;
        try {
            log.info("Initializing producer {} on topic {} with schema {}", new Object[]{producerName, topicName, schemaToUse});
            return this.producerBuilderFactory.createProducerBuilder(topicName, schemaToUse, producerName).properties(this.properties).create();
        }
        catch (PulsarClientException e) {
            throw new RuntimeException("Failed to create Producer for topic " + topicName + " producerName " + producerName + " schema " + String.valueOf(schemaToUse), e);
        }
    }

    @VisibleForTesting
    Schema<T> initializeSchema() throws ClassNotFoundException {
        if (StringUtils.isEmpty(this.pulsarSinkConfig.getTypeClassName())) {
            return Schema.BYTES;
        }
        Class typeArg = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), this.functionClassLoader);
        if (Void.class.equals((Object)typeArg)) {
            return null;
        }
        ConsumerConfig consumerConfig = new ConsumerConfig();
        consumerConfig.setSchemaProperties(this.pulsarSinkConfig.getSchemaProperties());
        if (!StringUtils.isEmpty(this.pulsarSinkConfig.getSchemaType())) {
            if (GenericRecord.class.isAssignableFrom(typeArg)) {
                consumerConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
                SchemaType configuredSchemaType = SchemaType.valueOf(this.pulsarSinkConfig.getSchemaType());
                if (SchemaType.AUTO_CONSUME != configuredSchemaType) {
                    log.info("The configured schema type {} is not able to write GenericRecords. So overwrite the schema type to be {}", (Object)configuredSchemaType, (Object)SchemaType.AUTO_CONSUME);
                }
            } else {
                consumerConfig.setSchemaType(this.pulsarSinkConfig.getSchemaType());
            }
            return this.topicSchema.getSchema(this.pulsarSinkConfig.getTopic(), typeArg, consumerConfig, false);
        }
        consumerConfig.setSchemaType(this.pulsarSinkConfig.getSerdeClassName());
        return this.topicSchema.getSchema(this.pulsarSinkConfig.getTopic(), typeArg, consumerConfig, false, this.functionClassLoader);
    }

    @VisibleForTesting
    class PulsarSinkAtMostOnceProcessor
    extends PulsarSinkProcessorBase {
        public PulsarSinkAtMostOnceProcessor() {
            if (!(PulsarSink.this.schema instanceof AutoConsumeSchema)) {
                this.getProducer(PulsarSink.this.pulsarSinkConfig.getTopic(), PulsarSink.this.schema);
            } else if (log.isDebugEnabled()) {
                log.debug("The Pulsar producer is not initialized until the first record is published for `AUTO_CONSUME` schema.");
            }
        }

        @Override
        public TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> record) {
            Schema schemaToWrite = record.getSchema();
            if (!record.shouldSetSchema()) {
                schemaToWrite = PulsarSink.this.schema;
            }
            if (schemaToWrite != null) {
                return this.getProducer(record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic()), schemaToWrite).newMessage(schemaToWrite);
            }
            return this.getProducer(record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic()), null).newMessage();
        }

        @Override
        public void sendOutputMessage(TypedMessageBuilder<T> msg, AbstractSinkRecord<T> record) {
            ((CompletableFuture)msg.sendAsync().thenAccept(messageId -> {})).exceptionally(this.getPublishErrorHandler(record, false));
        }
    }

    private static interface PulsarSinkProcessor<T> {
        public TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> var1);

        public void sendOutputMessage(TypedMessageBuilder<T> var1, AbstractSinkRecord<T> var2);

        public void close() throws Exception;
    }

    @VisibleForTesting
    class PulsarSinkAtLeastOnceProcessor
    extends PulsarSinkAtMostOnceProcessor {
        PulsarSinkAtLeastOnceProcessor() {
        }

        @Override
        public void sendOutputMessage(TypedMessageBuilder<T> msg, AbstractSinkRecord<T> record) {
            ((CompletableFuture)msg.sendAsync().thenAccept(messageId -> record.ack())).exceptionally(this.getPublishErrorHandler(record, true));
        }
    }

    @VisibleForTesting
    class PulsarSinkEffectivelyOnceProcessor
    extends PulsarSinkProcessorBase {
        PulsarSinkEffectivelyOnceProcessor() {
        }

        @Override
        public TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> record) {
            String partitionId;
            if (!record.getPartitionId().isPresent()) {
                throw new RuntimeException("PartitionId needs to be specified for every record while in Effectively-once mode");
            }
            Schema schemaToWrite = record.getSchema();
            if (!record.shouldSetSchema()) {
                schemaToWrite = PulsarSink.this.schema;
            }
            String topicName = record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic());
            String producerName = partitionId = record.getPartitionId().get();
            Producer producer = this.getProducer(topicName, schemaToWrite, producerName, partitionId);
            if (schemaToWrite != null) {
                return producer.newMessage(schemaToWrite);
            }
            return producer.newMessage();
        }

        @Override
        public void sendOutputMessage(TypedMessageBuilder<T> msg, AbstractSinkRecord<T> record) {
            if (!record.getRecordSequence().isPresent()) {
                throw new RuntimeException("RecordSequence needs to be specified for every record while in Effectively-once mode");
            }
            msg.sequenceId(record.getRecordSequence().get());
            CompletableFuture<MessageId> future = msg.sendAsync();
            ((CompletableFuture)future.thenAccept(messageId -> record.ack())).exceptionally(this.getPublishErrorHandler(record, true));
        }
    }

    @VisibleForTesting
    class PulsarSinkManualProcessor
    extends PulsarSinkAtMostOnceProcessor {
        PulsarSinkManualProcessor() {
        }

        @Override
        public void sendOutputMessage(TypedMessageBuilder<T> msg, AbstractSinkRecord<T> record) {
            super.sendOutputMessage(msg, record);
        }
    }

    abstract class PulsarSinkProcessorBase
    implements PulsarSinkProcessor<T> {
        PulsarSinkProcessorBase() {
        }

        protected Producer<T> getProducer(String destinationTopic, Schema schema) {
            return this.getProducer(destinationTopic, schema, null, null);
        }

        protected Producer<T> getProducer(String topicName, Schema schema, String producerName, String partitionId) {
            return PulsarSink.this.producerCache.getOrCreateProducer(ProducerCache.CacheArea.SINK_RECORD_CACHE, topicName, partitionId, () -> {
                Producer producer = PulsarSink.this.createProducer(topicName, schema, producerName);
                log.info("Initialized producer with name '{}' on topic '{}' with schema {} partitionId {} -> {}", new Object[]{producerName, topicName, schema, partitionId, producer});
                return producer;
            });
        }

        @Override
        public void close() throws Exception {
        }

        public Function<Throwable, Void> getPublishErrorHandler(AbstractSinkRecord<T> record, boolean failSource) {
            return throwable -> {
                String errorMsg;
                Record<?> srcRecord = record.getSourceRecord();
                if (failSource) {
                    srcRecord.fail();
                }
                String topic = record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic());
                if (srcRecord instanceof PulsarRecord) {
                    errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src message id [%s]", topic, throwable.getMessage(), ((PulsarRecord)srcRecord).getMessageId());
                } else {
                    errorMsg = String.format("Failed to publish to topic [%s] with error [%s]", topic, throwable.getMessage());
                    if (record.getRecordSequence().isPresent()) {
                        errorMsg = String.format(errorMsg + " with src sequence id [%s]", record.getRecordSequence().get());
                    }
                }
                log.error(errorMsg);
                PulsarSink.this.stats.incrSinkExceptions(new Exception(errorMsg));
                return null;
            };
        }
    }
}

