/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.kafka.client.producer.KafkaProducer;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.eclipse.hono.client.kafka.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.KafkaProducerFactory;

public class CachingKafkaProducerFactory<K, V>
implements KafkaProducerFactory<K, V> {
    private final Map<String, KafkaProducer<K, V>> activeProducers = new HashMap<String, KafkaProducer<K, V>>();
    private final BiFunction<String, Map<String, String>, KafkaProducer<K, V>> producerInstanceSupplier;

    public CachingKafkaProducerFactory(BiFunction<String, Map<String, String>, KafkaProducer<K, V>> producerInstanceSupplier) {
        this.producerInstanceSupplier = producerInstanceSupplier;
    }

    @Override
    public KafkaProducer<K, V> getOrCreateProducer(String producerName, KafkaProducerConfigProperties config) {
        this.activeProducers.computeIfAbsent(producerName, name -> {
            KafkaProducer<K, V> producer = this.producerInstanceSupplier.apply(producerName, config.getProducerConfig(producerName));
            return producer.exceptionHandler(this.getExceptionHandler((String)name, producer));
        });
        return this.activeProducers.get(producerName);
    }

    private Handler<Throwable> getExceptionHandler(String producerName, KafkaProducer<K, V> producer) {
        return t -> {
            if (CachingKafkaProducerFactory.isFatalError(t)) {
                this.activeProducers.remove(producerName);
                producer.close();
            }
        };
    }

    public Optional<KafkaProducer<K, V>> getProducer(String producerName) {
        return Optional.ofNullable(this.activeProducers.get(producerName));
    }

    @Override
    public Future<Void> closeProducer(String producerName) {
        KafkaProducer<K, V> producer = this.activeProducers.remove(producerName);
        if (producer == null) {
            return Future.succeededFuture();
        }
        Promise promise = Promise.promise();
        producer.close((Handler)promise);
        return promise.future();
    }

    public static boolean isFatalError(Throwable error) {
        return error instanceof ProducerFencedException || error instanceof OutOfOrderSequenceException || error instanceof AuthorizationException || error instanceof UnsupportedVersionException || error instanceof UnsupportedForMessageFormatException;
    }
}

