/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql.transforms;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.DebeziumException;
import io.debezium.Module;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.data.Envelope;
import io.debezium.schema.FieldNameSelector;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.transforms.ConnectRecordUtil;
import io.debezium.transforms.outbox.EventRouterConfigDefinition;
import io.debezium.transforms.outbox.JsonSchemaData;
import io.debezium.util.BoundedConcurrentHashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.ReplaceField;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DecodeLogicalDecodingMessageContent<R extends ConnectRecord<R>>
implements Transformation<R>,
Versioned {
    private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLogicalDecodingMessageContent.class);
    public static final Field FIELDS_NULL_INCLUDE = Field.create((String)"fields.null.include").withDisplayName("Defines whether to include fields with null values to the decoded structure").withType(ConfigDef.Type.BOOLEAN).withDefault(false).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).withDescription("Defines whether to include fields with null values to the decoded structure");
    private ObjectMapper objectMapper;
    private JsonSchemaData jsonSchemaData;
    private BoundedConcurrentHashMap<Schema, Schema> logicalDecodingMessageContentSchemaCache;

    public ConfigDef config() {
        ConfigDef config = new ConfigDef();
        Field.group((ConfigDef)config, null, (Field[])new Field[]{FIELDS_NULL_INCLUDE});
        return config;
    }

    public void configure(Map<String, ?> configs) {
        Configuration config = Configuration.from(configs);
        this.objectMapper = new ObjectMapper();
        boolean fieldsNullInclude = config.getBoolean(FIELDS_NULL_INCLUDE);
        EventRouterConfigDefinition.JsonPayloadNullFieldBehavior nullFieldBehavior = fieldsNullInclude ? EventRouterConfigDefinition.JsonPayloadNullFieldBehavior.OPTIONAL_BYTES : EventRouterConfigDefinition.JsonPayloadNullFieldBehavior.IGNORE;
        this.jsonSchemaData = new JsonSchemaData(nullFieldBehavior, FieldNameSelector.defaultNonRelationalSelector((SchemaNameAdjuster)CommonConnectorConfig.FieldNameAdjustmentMode.NONE.createAdjuster()));
        this.logicalDecodingMessageContentSchemaCache = new BoundedConcurrentHashMap(10000, 10, BoundedConcurrentHashMap.Eviction.LRU);
    }

    public R apply(R record) {
        if (!Objects.equals(record.valueSchema().name(), "io.debezium.connector.postgresql.MessageValue")) {
            LOGGER.debug("Ignore not a logical decoding message. Message key: \"{}\"", record.key());
            return record;
        }
        Struct originalValue = Requirements.requireStruct((Object)record.value(), (String)"Retrieve a record value");
        Struct logicalDecodingMessageContent = this.getLogicalDecodingMessageContent(originalValue);
        R recordWithoutMessageField = this.removeLogicalDecodingMessageContentField(record);
        Schema updatedValueSchema = this.getUpdatedValueSchema(logicalDecodingMessageContent.schema(), recordWithoutMessageField.valueSchema());
        Struct updatedValue = this.getUpdatedValue(updatedValueSchema, originalValue, logicalDecodingMessageContent);
        return (R)record.newRecord(record.topic(), record.kafkaPartition(), null, null, updatedValueSchema, (Object)updatedValue, record.timestamp(), (Iterable)record.headers());
    }

    private Struct getLogicalDecodingMessageContent(Struct valueStruct) {
        Struct logicalDecodingMessageStruct = Requirements.requireStruct((Object)valueStruct.get("message"), (String)"Retrieve content of a logical decoding message");
        if (logicalDecodingMessageStruct.schema().field("content").schema().type() != Schema.Type.BYTES) {
            throw new DebeziumException("The content of a logical decoding message is non-binary");
        }
        byte[] logicalDecodingMessageContentBytes = logicalDecodingMessageStruct.getBytes("content");
        return this.convertLogicalDecodingMessageContentBytesToStruct(logicalDecodingMessageContentBytes);
    }

    private Struct convertLogicalDecodingMessageContentBytesToStruct(byte[] logicalDecodingMessageContent) {
        String logicalDecodingMessageContentString = new String(logicalDecodingMessageContent);
        JsonNode logicalDecodingMessageContentJson = this.parseLogicalDecodingMessageContentJsonString(logicalDecodingMessageContentString);
        Schema logicalDecodingMessageContentSchema = this.jsonSchemaData.toConnectSchema(null, logicalDecodingMessageContentJson);
        return (Struct)this.jsonSchemaData.toConnectData(logicalDecodingMessageContentJson, logicalDecodingMessageContentSchema);
    }

    private JsonNode parseLogicalDecodingMessageContentJsonString(String logicalDecodingMessageContentJsonString) {
        if (logicalDecodingMessageContentJsonString.startsWith("{") || logicalDecodingMessageContentJsonString.startsWith("[")) {
            try {
                return this.objectMapper.readTree(logicalDecodingMessageContentJsonString);
            }
            catch (JsonProcessingException e) {
                throw new DebeziumException((Throwable)e);
            }
        }
        throw new DebeziumException("Unable to parse logical decoding message content JSON string '" + logicalDecodingMessageContentJsonString + "'");
    }

    private R removeLogicalDecodingMessageContentField(R record) {
        ReplaceField dropFieldDelegate = ConnectRecordUtil.dropFieldFromValueDelegate((String)"message");
        return (R)dropFieldDelegate.apply(record);
    }

    private Schema getUpdatedValueSchema(Schema logicalDecodingMessageContentSchema, Schema debeziumEventSchema) {
        Schema valueSchema = (Schema)this.logicalDecodingMessageContentSchemaCache.get((Object)logicalDecodingMessageContentSchema);
        if (valueSchema == null) {
            valueSchema = this.getSchemaBuilder(logicalDecodingMessageContentSchema, debeziumEventSchema).build();
            this.logicalDecodingMessageContentSchemaCache.put((Object)logicalDecodingMessageContentSchema, (Object)valueSchema);
        }
        return valueSchema;
    }

    private SchemaBuilder getSchemaBuilder(Schema logicalDecodingMessageContentSchema, Schema debeziumEventSchema) {
        String schemaName = debeziumEventSchema.name() + Envelope.SCHEMA_NAME_SUFFIX;
        SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(schemaName);
        for (org.apache.kafka.connect.data.Field originalSchemaField : debeziumEventSchema.fields()) {
            schemaBuilder.field(originalSchemaField.name(), originalSchemaField.schema());
        }
        schemaBuilder.field("after", logicalDecodingMessageContentSchema);
        return schemaBuilder;
    }

    private Struct getUpdatedValue(Schema updatedValueSchema, Struct originalValue, Struct logicalDecodingMessageContent) {
        Struct updatedValue = new Struct(updatedValueSchema);
        for (org.apache.kafka.connect.data.Field field : updatedValueSchema.fields()) {
            updatedValue.put(field, switch (field.name()) {
                case "after" -> logicalDecodingMessageContent;
                case "op" -> Envelope.Operation.CREATE.code();
                default -> originalValue.get(field);
            });
        }
        return updatedValue;
    }

    public void close() {
    }

    public String version() {
        return Module.version();
    }
}

