/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.transforms.openlineage;

import io.debezium.Module;
import io.debezium.config.Configuration;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.openlineage.ConnectorContext;
import io.debezium.openlineage.DebeziumOpenLineageEmitter;
import io.debezium.openlineage.dataset.DatasetMetadata;
import io.debezium.transforms.SmtManager;
import io.debezium.util.BoundedConcurrentHashMap;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpenLineage<R extends ConnectRecord<R>>
implements Transformation<R>,
Versioned {
    private static final Logger LOGGER = LoggerFactory.getLogger(OpenLineage.class);
    private static final int CACHE_SIZE = 64;
    private ZonedDateTime lastEmissionTime;
    private final BoundedConcurrentHashMap<String, Boolean> recentlySeenTopics = new BoundedConcurrentHashMap(64);
    private final BoundedConcurrentHashMap<Schema, Boolean> recentlySeenSchemas = new BoundedConcurrentHashMap(64);
    private SmtManager<R> smtManager;

    public ConfigDef config() {
        return new ConfigDef();
    }

    public void configure(Map<String, ?> props) {
        Configuration config = Configuration.from(props);
        this.smtManager = new SmtManager(config);
    }

    public R apply(R record) {
        if (this.isInvalidLineageRecord(record)) {
            return record;
        }
        if (this.recentlySeenTopics.put(record.topic(), true) == null || this.recentlySeenSchemas.put(record.valueSchema(), true) == null) {
            if (this.recentlySeenSchemas.put(record.valueSchema(), true) == null) {
                List<DatasetMetadata.FieldDefinition> fieldDefinitions = record.valueSchema().fields().stream().map(this::buildFieldDefinition).toList();
                ConnectorContext connectorContext = ConnectorContext.from(record.headers());
                DebeziumOpenLineageEmitter.emit(connectorContext, BaseSourceTask.State.RUNNING, List.of(new DatasetMetadata(record.topic(), DatasetMetadata.DatasetType.OUTPUT, fieldDefinitions)));
                this.lastEmissionTime = ZonedDateTime.now();
            }
            LOGGER.debug("Emitting running event for output dataset {}", (Object)record.topic());
            return record;
        }
        return record;
    }

    private boolean isInvalidLineageRecord(R record) {
        return record.value() == null || this.smtManager.isValidSchemaChange(record) || this.smtManager.isValidNotification(record) || this.smtManager.isValidHeartBeat(record);
    }

    private DatasetMetadata.FieldDefinition buildFieldDefinition(Field field) {
        Schema schema = field.schema();
        String name = field.name();
        String typeName = schema.type().name();
        String description = schema.doc();
        if (schema.type() == Schema.Type.STRUCT && schema.fields() != null && !schema.fields().isEmpty()) {
            List<DatasetMetadata.FieldDefinition> nestedFields = schema.fields().stream().map(this::buildFieldDefinition).toList();
            return new DatasetMetadata.FieldDefinition(name, typeName, description, nestedFields);
        }
        return new DatasetMetadata.FieldDefinition(name, typeName, description);
    }

    public void close() {
    }

    public String version() {
        return Module.version();
    }
}

