/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.embedded;

import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.engine.ChangeEventFormat;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Avro;
import io.debezium.engine.format.ChangeEvent;
import io.debezium.engine.format.CloudEvents;
import io.debezium.engine.format.Json;
import io.debezium.engine.spi.OffsetCommitPolicy;
import java.io.IOException;
import java.time.Clock;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.Converter;

public class ConvertingEngineBuilder<R>
implements DebeziumEngine.Builder<R> {
    private static final String CONVERTER_PREFIX = "converter";
    private static final String FIELD_CLASS = "class";
    private static final String TOPIC_NAME = "debezium";
    private final DebeziumEngine.Builder<SourceRecord> delegate;
    private Class<? extends ChangeEventFormat<R>> format;
    private Configuration config;
    private Function<SourceRecord, R> toFormat = record -> record;
    private Function<R, SourceRecord> fromFormat = record -> (SourceRecord)record;

    public ConvertingEngineBuilder() {
        this.delegate = EmbeddedEngine.create();
    }

    public DebeziumEngine.Builder<R> notifying(Consumer<R> consumer) {
        if (this.isFormat(Connect.class)) {
            this.delegate.notifying(record -> consumer.accept(record));
        } else {
            this.delegate.notifying(record -> consumer.accept(this.toFormat.apply((SourceRecord)record)));
        }
        return this;
    }

    private boolean isFormat(Class<? extends ChangeEventFormat<?>> format) {
        return this.format == format;
    }

    public DebeziumEngine.Builder<R> notifying(DebeziumEngine.ChangeConsumer<R> handler) {
        if (this.isFormat(Connect.class)) {
            this.delegate.notifying((records, committer) -> handler.handleBatch(records, committer));
        } else {
            this.delegate.notifying((records, committer) -> handler.handleBatch(records.stream().map(x -> this.toFormat.apply((SourceRecord)x)).collect(Collectors.toList()), new DebeziumEngine.RecordCommitter<R>(){

                public void markProcessed(R record) throws InterruptedException {
                    committer.markProcessed(ConvertingEngineBuilder.this.fromFormat.apply(record));
                }

                public void markBatchFinished() {
                    committer.markBatchFinished();
                }
            }));
        }
        return this;
    }

    public DebeziumEngine.Builder<R> using(Properties config) {
        this.config = Configuration.from((Properties)config);
        this.delegate.using(config);
        return this;
    }

    public DebeziumEngine.Builder<R> using(ClassLoader classLoader) {
        this.delegate.using(classLoader);
        return this;
    }

    public DebeziumEngine.Builder<R> using(Clock clock) {
        this.delegate.using(clock);
        return this;
    }

    public DebeziumEngine.Builder<R> using(DebeziumEngine.CompletionCallback completionCallback) {
        this.delegate.using(completionCallback);
        return this;
    }

    public DebeziumEngine.Builder<R> using(DebeziumEngine.ConnectorCallback connectorCallback) {
        this.delegate.using(connectorCallback);
        return this;
    }

    public DebeziumEngine.Builder<R> using(OffsetCommitPolicy policy) {
        this.delegate.using(policy);
        return this;
    }

    public DebeziumEngine.Builder<R> asType(Class<? extends ChangeEventFormat<R>> format) {
        this.format = format;
        return this;
    }

    public DebeziumEngine<R> build() {
        final DebeziumEngine engine = this.delegate.build();
        Configuration converterConfig = this.config.subset(CONVERTER_PREFIX, true);
        if (!this.isFormat(Connect.class)) {
            if (this.isFormat(Json.class)) {
                converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.json.JsonConverter").build();
            } else if (this.isFormat(CloudEvents.class)) {
                converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "io.debezium.converters.CloudEventsConverter").build();
            } else if (this.isFormat(Avro.class)) {
                converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "io.confluent.connect.avro.AvroConverter").build();
            } else {
                throw new DebeziumException("Converter '" + this.format.getSimpleName() + "' is not supported");
            }
            Converter keyConverter = (Converter)converterConfig.getInstance(FIELD_CLASS, Converter.class);
            Converter valueConverter = (Converter)converterConfig.getInstance(FIELD_CLASS, Converter.class);
            keyConverter.configure(converterConfig.asMap(), true);
            valueConverter.configure(converterConfig.asMap(), false);
            this.toFormat = record -> {
                byte[] key = keyConverter.fromConnectData(TOPIC_NAME, record.keySchema(), record.key());
                byte[] value = valueConverter.fromConnectData(TOPIC_NAME, record.valueSchema(), record.value());
                return new ChangeEvent((Object)(key != null ? new String(key) : null), (Object)(value != null ? new String(value) : null), record);
            };
            this.fromFormat = record -> (SourceRecord)((ChangeEvent)record).reference();
        }
        return new DebeziumEngine<R>(){

            public void run() {
                engine.run();
            }

            public void close() throws IOException {
                engine.close();
            }
        };
    }
}

