/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.kafka;

import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.kafka.KafkaSqlConnector;
import com.hazelcast.jet.sql.impl.connector.kafka.KafkaSqlTestSupport;
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.HazelcastSerialParametersRunnerFactory;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelJVMTest;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=HazelcastParametrizedRunner.class)
@Parameterized.UseParametersRunnerFactory(value=HazelcastSerialParametersRunnerFactory.class)
@Category(value={NightlyTest.class, ParallelJVMTest.class})
public class SqlAvroSchemaEvolutionTest
extends KafkaSqlTestSupport {
    private static final Schema ID_SCHEMA = (Schema)SchemaBuilder.record((String)"id").fields().optionalInt("id").endRecord();
    private static final Schema NAME_SCHEMA = (Schema)SchemaBuilder.record((String)"record").fields().optionalString("name").endRecord();
    private static final Schema NAME_SSN_SCHEMA = (Schema)SchemaBuilder.record((String)"record").fields().optionalString("name").optionalLong("ssn").endRecord();
    private static final Schema NAME_SSN_SCHEMA2 = (Schema)SchemaBuilder.record((String)"record2").fields().optionalString("name").optionalLong("ssn").endRecord();
    @Parameterized.Parameter(value=0)
    public String subjectNameStrategy;
    @Parameterized.Parameter(value=1)
    public boolean updateMapping;
    private boolean topicNameStrategy;
    private String valueSubjectName;
    private String name;

    @Parameterized.Parameters(name="{0}, updateMapping=[{1}]")
    public static Iterable<Object[]> parameters() {
        return SqlAvroSchemaEvolutionTest.cartesianProduct((List[])new List[]{List.of("TopicNameStrategy", "TopicRecordNameStrategy", "RecordNameStrategy"), List.of(Boolean.valueOf(false), Boolean.valueOf(true))});
    }

    @BeforeClass
    public static void initialize() throws Exception {
        SqlAvroSchemaEvolutionTest.createSchemaRegistry();
    }

    @Before
    public void before() throws Exception {
        this.name = SqlAvroSchemaEvolutionTest.createRandomTopic(1);
        kafkaTestSupport.setProducerProperties(this.name, Map.of("schema.registry.url", kafkaTestSupport.getSchemaRegistryURI().toString(), "value.subject.name.strategy", "io.confluent.kafka.serializers.subject." + this.subjectNameStrategy));
        this.topicNameStrategy = this.subjectNameStrategy.equals("TopicNameStrategy");
        switch (this.subjectNameStrategy) {
            case "TopicNameStrategy": {
                this.valueSubjectName = this.name + "-value";
                break;
            }
            case "TopicRecordNameStrategy": {
                this.valueSubjectName = this.name + "-record";
                break;
            }
            case "RecordNameStrategy": {
                this.valueSubjectName = "record";
            }
        }
    }

    private SqlTestSupport.SqlMapping kafkaMapping() {
        return (SqlTestSupport.SqlMapping)new SqlTestSupport.SqlMapping(this.name, KafkaSqlConnector.class).options(new Object[]{"keyFormat", "avro", "valueFormat", "avro", "bootstrap.servers", kafkaTestSupport.getBrokerConnectionString(), "schema.registry.url", kafkaTestSupport.getSchemaRegistryURI(), "auto.offset.reset", "earliest", "value.subject.name.strategy", "io.confluent.kafka.serializers.subject." + this.subjectNameStrategy});
    }

    @Test
    public void test_autoRegisterSchema() throws SchemaRegistryException {
        ((SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)this.kafkaMapping().fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR"})).options(new Object[]{"keyAvroRecordName", "id", "valueAvroRecordName", "record"})).create();
        this.insertInitialRecordAndAlterSchema();
        if (this.updateMapping) {
            ((SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)this.kafkaMapping().fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR", "ssn BIGINT"})).options(new Object[]{"keyAvroRecordName", "id", "valueAvroRecordName", this.topicNameStrategy ? "record" : "record2"})).createOrReplace();
        }
        this.insertAndAssertRecords();
    }

    @Test
    public void test_useLatestSchema() throws SchemaRegistryException {
        kafkaTestSupport.registerSchema(this.name + "-key", ID_SCHEMA);
        kafkaTestSupport.registerSchema(this.valueSubjectName, NAME_SCHEMA);
        ((SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)this.kafkaMapping().fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR"})).options(new Object[]{"auto.register.schemas", false, "use.latest.version", true, "keyAvroRecordName", "id", "valueAvroRecordName", "record"})).create();
        this.insertInitialRecordAndAlterSchema();
        if (this.updateMapping) {
            ((SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)this.kafkaMapping().fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR", "ssn BIGINT"})).options(new Object[]{"auto.register.schemas", false, "use.latest.version", true, "keyAvroRecordName", "id", "valueAvroRecordName", this.topicNameStrategy ? "record" : "record2"})).createOrReplace();
        }
        if (this.topicNameStrategy && !this.updateMapping) {
            Assertions.assertThatThrownBy(() -> this.insertRecord(29, "Bob")).hasMessageContaining("Error serializing Avro message");
        } else {
            this.insertAndAssertRecords();
        }
    }

    @Ignore(value="(key|value).schema.id configs are not supported currently. Key/value-specific serializer configs will be implemented by HZG-53.")
    @Test
    public void test_useSpecificSchema() throws SchemaRegistryException {
        int keySchemaId = kafkaTestSupport.registerSchema(this.name + "-key", ID_SCHEMA);
        int valueSchemaId = kafkaTestSupport.registerSchema(this.valueSubjectName, NAME_SCHEMA);
        ((SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)this.kafkaMapping().fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR"})).options(new Object[]{"auto.register.schemas", false, "key.schema.id", keySchemaId, "value.schema.id", valueSchemaId, "keyAvroRecordName", "id", "valueAvroRecordName", "record"})).create();
        int valueSchemaId2 = this.insertInitialRecordAndAlterSchema();
        if (this.updateMapping) {
            ((SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)this.kafkaMapping().fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR", "ssn BIGINT"})).options(new Object[]{"auto.register.schemas", false, "key.schema.id", keySchemaId, "value.schema.id", valueSchemaId2, "keyAvroRecordName", "id", "valueAvroRecordName", this.topicNameStrategy ? "record" : "record2"})).createOrReplace();
        }
        this.insertAndAssertRecords();
    }

    private int insertInitialRecordAndAlterSchema() throws SchemaRegistryException {
        int valueSchemaId;
        this.insertRecord(13, "Alice");
        Assert.assertEquals((long)1L, (long)kafkaTestSupport.getLatestSchemaVersion(this.valueSubjectName));
        if (this.topicNameStrategy) {
            valueSchemaId = kafkaTestSupport.registerSchema(this.valueSubjectName, NAME_SSN_SCHEMA);
            Assert.assertEquals((long)2L, (long)kafkaTestSupport.getLatestSchemaVersion(this.valueSubjectName));
        } else {
            valueSchemaId = kafkaTestSupport.registerSchema(this.valueSubjectName + "2", NAME_SSN_SCHEMA2);
            Assert.assertEquals((long)1L, (long)kafkaTestSupport.getLatestSchemaVersion(this.valueSubjectName));
            Assert.assertEquals((long)1L, (long)kafkaTestSupport.getLatestSchemaVersion(this.valueSubjectName + "2"));
        }
        return valueSchemaId;
    }

    private void insertAndAssertRecords() throws SchemaRegistryException {
        int fields = this.updateMapping ? 3 : 2;
        this.insertRecord(Arrays.copyOf(SqlAvroSchemaEvolutionTest.row(29, "Bob", 123456789L), fields));
        kafkaTestSupport.produce(this.name, (Object)SqlAvroSchemaEvolutionTest.createRecord(ID_SCHEMA, 31), (Object)SqlAvroSchemaEvolutionTest.createRecord(NAME_SCHEMA, "Carol"));
        kafkaTestSupport.produce(this.name, (Object)SqlAvroSchemaEvolutionTest.createRecord(ID_SCHEMA, 47), (Object)SqlAvroSchemaEvolutionTest.createRecord(this.topicNameStrategy ? NAME_SSN_SCHEMA : NAME_SSN_SCHEMA2, "Dave", 123456789L));
        this.insertRecord(Arrays.copyOf(SqlAvroSchemaEvolutionTest.row(53, "Erin", 987654321L), fields));
        if (this.topicNameStrategy) {
            Assert.assertEquals((long)2L, (long)kafkaTestSupport.getLatestSchemaVersion(this.valueSubjectName));
        } else {
            Assert.assertEquals((long)1L, (long)kafkaTestSupport.getLatestSchemaVersion(this.valueSubjectName));
            Assert.assertEquals((long)1L, (long)kafkaTestSupport.getLatestSchemaVersion(this.valueSubjectName + "2"));
        }
        Object[][] records = new Object[][]{{13, "Alice", null}, {29, "Bob", 123456789L}, {31, "Carol", null}, {47, "Dave", 123456789L}, {53, "Erin", 987654321L}};
        SqlAvroSchemaEvolutionTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + this.name, Arrays.stream(records).map(record -> new SqlTestSupport.Row(Arrays.copyOf(record, fields))).toList());
    }

    private void insertRecord(Object ... values) {
        SqlAvroSchemaEvolutionTest.insertLiterals(SqlAvroSchemaEvolutionTest.instance(), this.name, values);
    }
}

