/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class KafkaSinkBuilder<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkBuilder.class);
    private static final Duration DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Duration.ofHours(1L);
    private static final String[] warnKeys = new String[]{"key.serializer", "value.serializer"};
    private static final int MAXIMUM_PREFIX_BYTES = 64000;
    private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE;
    private String transactionalIdPrefix = "kafka-sink";
    private final Properties kafkaProducerConfig = new Properties();
    private KafkaRecordSerializationSchema<IN> recordSerializer;

    KafkaSinkBuilder() {
        this.kafkaProducerConfig.put("key.serializer", ByteArraySerializer.class.getName());
        this.kafkaProducerConfig.put("value.serializer", ByteArraySerializer.class.getName());
        this.kafkaProducerConfig.put("transaction.timeout.ms", (Object)((int)DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMillis()));
    }

    public KafkaSinkBuilder<IN> setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
        this.deliveryGuarantee = (DeliveryGuarantee)Preconditions.checkNotNull((Object)deliveryGuarantee, (String)"deliveryGuarantee");
        return this;
    }

    @Deprecated
    public KafkaSinkBuilder<IN> setDeliverGuarantee(DeliveryGuarantee deliveryGuarantee) {
        this.deliveryGuarantee = (DeliveryGuarantee)Preconditions.checkNotNull((Object)deliveryGuarantee, (String)"deliveryGuarantee");
        return this;
    }

    public KafkaSinkBuilder<IN> setKafkaProducerConfig(Properties props) {
        Preconditions.checkNotNull((Object)props);
        Arrays.stream(warnKeys).filter(props::containsKey).forEach(k -> LOG.warn("Overwriting the '{}' is not recommended", k));
        this.kafkaProducerConfig.putAll((Map<?, ?>)props);
        return this;
    }

    public KafkaSinkBuilder<IN> setProperty(String key, String value) {
        Preconditions.checkNotNull((Object)key);
        Arrays.stream(warnKeys).filter(key::equals).forEach(k -> LOG.warn("Overwriting the '{}' is not recommended", k));
        this.kafkaProducerConfig.setProperty(key, value);
        return this;
    }

    public KafkaSinkBuilder<IN> setRecordSerializer(KafkaRecordSerializationSchema<IN> recordSerializer) {
        this.recordSerializer = (KafkaRecordSerializationSchema)Preconditions.checkNotNull(recordSerializer, (String)"recordSerializer");
        ClosureCleaner.clean(this.recordSerializer, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
        return this;
    }

    public KafkaSinkBuilder<IN> setTransactionalIdPrefix(String transactionalIdPrefix) {
        this.transactionalIdPrefix = (String)Preconditions.checkNotNull((Object)transactionalIdPrefix, (String)"transactionalIdPrefix");
        Preconditions.checkState((transactionalIdPrefix.getBytes(StandardCharsets.UTF_8).length <= 64000 ? 1 : 0) != 0, (Object)"The configured prefix is too long and the resulting transactionalId might exceed Kafka's transactionalIds size.");
        return this;
    }

    public KafkaSinkBuilder<IN> setBootstrapServers(String bootstrapServers) {
        return this.setProperty("bootstrap.servers", bootstrapServers);
    }

    private void sanityCheck() {
        Preconditions.checkNotNull((Object)this.kafkaProducerConfig.getProperty("bootstrap.servers"), (String)"bootstrapServers");
        if (this.deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
            Preconditions.checkState((this.transactionalIdPrefix != null ? 1 : 0) != 0, (Object)"EXACTLY_ONCE delivery guarantee requires a transactionIdPrefix to be set to provide unique transaction names across multiple KafkaSinks writing to the same Kafka cluster.");
        }
        Preconditions.checkNotNull(this.recordSerializer, (String)"recordSerializer");
    }

    public KafkaSink<IN> build() {
        this.sanityCheck();
        return new KafkaSink<IN>(this.deliveryGuarantee, this.kafkaProducerConfig, this.transactionalIdPrefix, this.recordSerializer);
    }
}

