/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Avro;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.SerializationFormat;
import io.debezium.server.ConnectorLifecycle;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.spi.CreationalContext;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.health.Liveness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
@Startup
public class DebeziumServer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumServer.class);
    private static final String PROP_PREFIX = "debezium.";
    private static final String PROP_SOURCE_PREFIX = "debezium.source.";
    private static final String PROP_SINK_PREFIX = "debezium.sink.";
    private static final String PROP_FORMAT_PREFIX = "debezium.format.";
    private static final String PROP_TRANSFORMS_PREFIX = "debezium.transforms.";
    private static final String PROP_KEY_FORMAT_PREFIX = "debezium.format.key.";
    private static final String PROP_VALUE_FORMAT_PREFIX = "debezium.format.value.";
    private static final String PROP_TRANSFORMS = "debezium.transforms";
    private static final String PROP_SINK_TYPE = "debezium.sink.type";
    private static final String PROP_KEY_FORMAT = "debezium.format.key";
    private static final String PROP_VALUE_FORMAT = "debezium.format.value";
    private static final String PROP_TERMINATION_WAIT = "debezium.termination.wait";
    private static final String FORMAT_JSON = Json.class.getSimpleName().toLowerCase();
    private static final String FORMAT_AVRO = Avro.class.getSimpleName().toLowerCase();
    private ExecutorService executor = Executors.newSingleThreadExecutor();
    @Inject
    BeanManager beanManager;
    @Inject
    @Liveness
    ConnectorLifecycle health;
    private Bean<DebeziumEngine.ChangeConsumer<ChangeEvent<?, ?>>> consumerBean;
    private CreationalContext<DebeziumEngine.ChangeConsumer<ChangeEvent<?, ?>>> consumerBeanCreationalContext;
    private DebeziumEngine.ChangeConsumer<ChangeEvent<?, ?>> consumer;
    private DebeziumEngine<?> engine;

    @PostConstruct
    public void start() {
        Config config = ConfigProvider.getConfig();
        String name = (String)config.getValue(PROP_SINK_TYPE, String.class);
        Set beans = this.beanManager.getBeans(name).stream().filter(x -> DebeziumEngine.ChangeConsumer.class.isAssignableFrom(x.getBeanClass())).collect(Collectors.toSet());
        LOGGER.debug("Found {} candidate consumer(s)", (Object)beans.size());
        if (beans.size() == 0) {
            throw new DebeziumException("No Debezium consumer named '" + name + "' is available");
        }
        if (beans.size() > 1) {
            throw new DebeziumException("Multiple Debezium consumers named '" + name + "' were found");
        }
        this.consumerBean = (Bean)beans.iterator().next();
        this.consumerBeanCreationalContext = this.beanManager.createCreationalContext(this.consumerBean);
        this.consumer = (DebeziumEngine.ChangeConsumer)this.consumerBean.create(this.consumerBeanCreationalContext);
        LOGGER.info("Consumer '{}' instantiated", (Object)this.consumer.getClass().getName());
        Class<? extends SerializationFormat<?>> keyFormat = this.getFormat(config, PROP_KEY_FORMAT);
        Class<? extends SerializationFormat<?>> valueFormat = this.getFormat(config, PROP_VALUE_FORMAT);
        Properties props = new Properties();
        this.configToProperties(config, props, PROP_SOURCE_PREFIX, "");
        this.configToProperties(config, props, PROP_FORMAT_PREFIX, "key.converter.");
        this.configToProperties(config, props, PROP_FORMAT_PREFIX, "value.converter.");
        this.configToProperties(config, props, PROP_KEY_FORMAT_PREFIX, "key.converter.");
        this.configToProperties(config, props, PROP_VALUE_FORMAT_PREFIX, "value.converter.");
        Optional transforms = config.getOptionalValue(PROP_TRANSFORMS, String.class);
        if (transforms.isPresent()) {
            props.setProperty("transforms", (String)transforms.get());
            this.configToProperties(config, props, PROP_TRANSFORMS_PREFIX, "transforms.");
        }
        props.setProperty("name", name);
        LOGGER.debug("Configuration for DebeziumEngine: {}", (Object)props);
        DebeziumEngine.Builder<?> builder = null;
        if (keyFormat == Json.class && valueFormat == Json.class) {
            builder = this.createJsonJson(this.consumer);
        } else if (keyFormat == Json.class && valueFormat == Avro.class) {
            builder = this.createJsonAvro(this.consumer);
        } else if (keyFormat == Avro.class && valueFormat == Avro.class) {
            builder = this.createAvroAvro(this.consumer);
        }
        this.engine = builder.using(props).using((DebeziumEngine.ConnectorCallback)this.health).using((DebeziumEngine.CompletionCallback)this.health).build();
        this.executor.execute(() -> this.engine.run());
        LOGGER.info("Engine executor started");
    }

    private DebeziumEngine.Builder<?> createJsonJson(DebeziumEngine.ChangeConsumer<?> consumer) {
        return DebeziumEngine.create(Json.class, Json.class).notifying(consumer);
    }

    private DebeziumEngine.Builder<?> createAvroAvro(DebeziumEngine.ChangeConsumer<?> consumer) {
        return DebeziumEngine.create(Avro.class, Avro.class).notifying(consumer);
    }

    private DebeziumEngine.Builder<?> createJsonAvro(DebeziumEngine.ChangeConsumer<?> consumer) {
        return DebeziumEngine.create(Json.class, Avro.class).notifying(consumer);
    }

    private void configToProperties(Config config, Properties props, String oldPrefix, String newPrefix) {
        for (String name : config.getPropertyNames()) {
            if (!name.startsWith(oldPrefix)) continue;
            props.setProperty(newPrefix + name.substring(oldPrefix.length()), (String)config.getValue(name, String.class));
        }
    }

    private Class<? extends SerializationFormat<?>> getFormat(Config config, String property) {
        String formatName = config.getOptionalValue(property, String.class).orElse(FORMAT_JSON);
        if (FORMAT_JSON.equals(formatName)) {
            return Json.class;
        }
        if (FORMAT_AVRO.equals(formatName)) {
            return Avro.class;
        }
        throw new DebeziumException("Unknown format '" + formatName + "' for option '" + property + "'");
    }

    public void stop(@Observes ShutdownEvent event) {
        try {
            LOGGER.info("Received request to stop the engine");
            Config config = ConfigProvider.getConfig();
            this.engine.close();
            this.executor.shutdown();
            this.executor.awaitTermination(config.getOptionalValue(PROP_TERMINATION_WAIT, Integer.class).orElse(10).intValue(), TimeUnit.SECONDS);
        }
        catch (Exception e) {
            LOGGER.error("Exception while shuttting down Debezium", (Throwable)e);
        }
        this.consumerBean.destroy(this.consumer, this.consumerBeanCreationalContext);
    }

    DebeziumEngine.ChangeConsumer<?> getConsumer() {
        return this.consumer;
    }
}

