/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.kafka;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.nio.Bits;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.datamodel.Tuple4;
import com.hazelcast.jet.kafka.HazelcastKafkaAvroDeserializer;
import com.hazelcast.jet.kafka.HazelcastKafkaAvroSerializer;
import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.kafka.KafkaSqlConnector;
import com.hazelcast.jet.sql.impl.connector.kafka.KafkaSqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.keyvalue.KvMetadataAvroResolver;
import com.hazelcast.jet.sql.impl.connector.test.TestAllTypesSqlConnector;
import com.hazelcast.shaded.com.google.common.collect.ImmutableMap;
import com.hazelcast.shaded.com.google.common.collect.Lists;
import com.hazelcast.sql.SqlRow;
import com.hazelcast.sql.impl.type.QueryDataType;
import com.hazelcast.sql.impl.type.QueryDataTypeFamily;
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.HazelcastSerialParametersRunnerFactory;
import com.hazelcast.test.annotation.NightlyTest;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=HazelcastParametrizedRunner.class)
@Parameterized.UseParametersRunnerFactory(value=HazelcastSerialParametersRunnerFactory.class)
public class SqlAvroTest
extends KafkaSqlTestSupport {
    private static final int INITIAL_PARTITION_COUNT = 4;
    private static final Schema ID_SCHEMA = (Schema)SchemaBuilder.record((String)"jet.sql").fields().optionalInt("id").endRecord();
    private static final Schema NAME_SCHEMA = (Schema)SchemaBuilder.record((String)"jet.sql").fields().optionalString("name").endRecord();
    private static final Schema NAME_SSN_SCHEMA = (Schema)SchemaBuilder.record((String)"jet.sql").fields().optionalString("name").optionalLong("ssn").endRecord();
    private static final Schema ALL_TYPES_SCHEMA = (Schema)SchemaBuilder.record((String)"jet.sql").fields().optionalString("string").optionalBoolean("boolean").optionalInt("byte").optionalInt("short").optionalInt("int").optionalLong("long").optionalFloat("float").optionalDouble("double").optionalString("decimal").optionalString("time").optionalString("date").optionalString("timestamp").optionalString("timestampTz").name("map").type(KvMetadataAvroResolver.Schemas.OBJECT_SCHEMA).withDefault(null).name("object").type(KvMetadataAvroResolver.Schemas.OBJECT_SCHEMA).withDefault(null).endRecord();
    @Parameterized.Parameter
    public boolean useSchemaRegistry;
    private SqlTestSupport.Type mapping;
    private Schema keySchema;
    private Schema valueSchema;
    private Map<String, String> clientProperties;

    @Parameterized.Parameters(name="useSchemaRegistry=[{0}]")
    public static Iterable<Object> parameters() {
        return List.of(Boolean.valueOf(false), Boolean.valueOf(true));
    }

    @BeforeClass
    public static void setup() throws Exception {
        SqlAvroTest.setup(1, null);
        SqlAvroTest.createSchemaRegistry();
    }

    private SqlTestSupport.SqlMapping kafkaMapping(String name) {
        return this.kafkaMapping(name, null, null);
    }

    private SqlTestSupport.SqlMapping kafkaMapping(String name, Schema keySchema, Schema valueSchema) {
        this.keySchema = keySchema;
        this.valueSchema = valueSchema;
        this.clientProperties = this.useSchemaRegistry ? ImmutableMap.of((Object)"schema.registry.url", (Object)kafkaTestSupport.getSchemaRegistryURI().toString()) : ImmutableMap.of((Object)"keyAvroSchema", (Object)keySchema.toString(), (Object)"valueAvroSchema", (Object)valueSchema.toString());
        kafkaTestSupport.setProducerProperties(name, this.clientProperties);
        return (SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)new KafkaMapping(name).options(new Object[]{"keyFormat", "avro", "valueFormat", "avro", "bootstrap.servers", kafkaTestSupport.getBrokerConnectionString(), "auto.offset.reset", "earliest"})).optionsIf(this.useSchemaRegistry, new Object[]{"schema.registry.url", kafkaTestSupport.getSchemaRegistryURI()})).optionsIf(!this.useSchemaRegistry, new Object[]{"keyAvroSchema", keySchema, "valueAvroSchema", valueSchema});
    }

    @Test
    public void test_nestedField() {
        String name = SqlAvroTest.createRandomTopic();
        ((SqlTestSupport.SqlType)new SqlTestSupport.SqlType("Organization").fields(new String[]{"id INT", "name VARCHAR"})).create();
        Schema orgSchema = (Schema)SchemaBuilder.record((String)"Organization").fields().optionalInt("id").optionalString("name").endRecord();
        Schema personSchema = (Schema)((SchemaBuilder.FieldAssembler)SchemaBuilder.record((String)"Person").fields().optionalString("name").name("organization").type().optional().type(orgSchema)).endRecord();
        ((SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)this.kafkaMapping(name, ID_SCHEMA, personSchema).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR", "organization Organization"})).optionsIf(this.useSchemaRegistry, new Object[]{"valueAvroRecordName", "Person"})).create();
        this.insertAndAssertRecord(SqlAvroTest.row(1, "Alice", SqlAvroTest.row(1, "Umbrella Corporation")), SqlAvroTest.row(1, "Alice", SqlAvroTest.row(1, "Umbrella Corporation")), SqlAvroTest.row(1, "Alice", SqlAvroTest.createRecord(orgSchema, 1, "Umbrella Corporation")));
        SqlAvroTest.assertRowsEventuallyInAnyOrder("SELECT name, (organization).name FROM " + name, List.of(new SqlTestSupport.Row("Alice", "Umbrella Corporation")));
    }

    @Test
    public void when_mappingOrTypeSchemaHasMissingField_then_fail() {
        Assume.assumeFalse((boolean)this.useSchemaRegistry);
        ((SqlTestSupport.SqlType)new SqlTestSupport.SqlType("Parent").fields(new String[]{"name VARCHAR", "phone VARCHAR"})).create();
        Schema parentSchema = (Schema)SchemaBuilder.record((String)"Parent").fields().requiredString("name").endRecord();
        Schema studentSchema = (Schema)SchemaBuilder.record((String)"Student").fields().requiredString("name").name("parent").type(parentSchema).noDefault().endRecord();
        Assertions.assertThatThrownBy(() -> ((SqlTestSupport.SqlMapping)this.kafkaMapping("kafka", ID_SCHEMA, studentSchema).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR", "address VARCHAR", "parent Parent"})).create()).hasMessage("Field 'address' does not exist in schema");
        Assertions.assertThatThrownBy(() -> ((SqlTestSupport.SqlMapping)this.kafkaMapping("kafka", ID_SCHEMA, studentSchema).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR", "parent Parent"})).create()).hasMessage("Field 'phone' does not exist in schema");
    }

    @Test
    public void test_mappingAndTypeHasMissingOptionalField() {
        Assume.assumeFalse((boolean)this.useSchemaRegistry);
        String name = SqlAvroTest.createRandomTopic();
        ((SqlTestSupport.SqlType)new SqlTestSupport.SqlType("Parent").fields(new String[]{"name VARCHAR", "phone VARCHAR"})).create();
        Schema parentSchema = (Schema)SchemaBuilder.record((String)"Parent").fields().requiredString("name").name("address").type().stringType().stringDefault("<unknown>").requiredString("phone").endRecord();
        Schema studentSchema = (Schema)SchemaBuilder.record((String)"Student").fields().requiredString("name").optionalLong("ssn").name("parent").type(parentSchema).noDefault().endRecord();
        ((SqlTestSupport.SqlMapping)this.kafkaMapping(name, ID_SCHEMA, studentSchema).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR", "parent Parent"})).create();
        this.insertAndAssertRecord(SqlAvroTest.row(1, "Alice", SqlAvroTest.row("Bob", "(111) 111-1111")), SqlAvroTest.row(1, "Alice", SqlAvroTest.row("Bob", "(111) 111-1111")), SqlAvroTest.row(1, "Alice", SqlAvroTest.createRecord(parentSchema, "Bob", "<unknown>", "(111) 111-1111")));
        kafkaTestSupport.produce(name, (Object)SqlAvroTest.createRecord(ID_SCHEMA, 3), (Object)SqlAvroTest.createRecord(studentSchema, "Dave", 123456789L, SqlAvroTest.row("Erin", "Insignificant St. 34", "(999) 999-9999")));
        SqlAvroTest.assertRowsEventuallyInAnyOrder("SELECT name, (parent).name, (parent).phone FROM " + name, List.of(new SqlTestSupport.Row("Alice", "Bob", "(111) 111-1111"), new SqlTestSupport.Row("Dave", "Erin", "(999) 999-9999")));
    }

    @Test
    public void when_mappingOrTypeHasMissingMandatoryField_then_fail() {
        Assume.assumeFalse((boolean)this.useSchemaRegistry);
        ((SqlTestSupport.SqlType)new SqlTestSupport.SqlType("Parent").fields(new String[]{"name VARCHAR"})).create();
        Schema parentSchema = (Schema)SchemaBuilder.record((String)"Parent").fields().requiredString("name").requiredString("phone").endRecord();
        Schema studentSchema = (Schema)SchemaBuilder.record((String)"Student").fields().requiredString("name").name("parent").type(parentSchema).noDefault().endRecord();
        Assertions.assertThatThrownBy(() -> ((SqlTestSupport.SqlMapping)this.kafkaMapping("kafka", ID_SCHEMA, studentSchema).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR"})).create()).hasMessage("Mandatory field 'parent' is not mapped to any column");
        Assertions.assertThatThrownBy(() -> ((SqlTestSupport.SqlMapping)this.kafkaMapping("kafka", ID_SCHEMA, studentSchema).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR", "parent Parent"})).create()).hasMessage("Mandatory field 'phone' is not mapped to any column");
    }

    @Test
    public void test_mappingHasDifferentFieldOrder() {
        Assume.assumeFalse((boolean)this.useSchemaRegistry);
        String name = SqlAvroTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)this.kafkaMapping(name, ID_SCHEMA, NAME_SSN_SCHEMA).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "ssn BIGINT", "name VARCHAR"})).create();
        this.insertAndAssertRecord(1, 123456789L, "Alice");
    }

    @Test
    public void test_nonNullField() {
        Assume.assumeFalse((boolean)this.useSchemaRegistry);
        String name = SqlAvroTest.createRandomTopic();
        Schema schema = (Schema)SchemaBuilder.record((String)"jet.sql").fields().requiredString("name").endRecord();
        ((SqlTestSupport.SqlMapping)this.kafkaMapping(name, ID_SCHEMA, schema).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR"})).create();
        this.insertAndAssertRecord(1, "Alice");
        Assertions.assertThatThrownBy(() -> this.insertRecord(2, null)).hasMessageContaining("Field name type:STRING pos:0 does not accept null values");
    }

    @Test
    public void when_valueCannotBeConverted_then_fail() {
        Assume.assumeFalse((boolean)this.useSchemaRegistry);
        String name = SqlAvroTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)this.kafkaMapping(name, ID_SCHEMA, NAME_SCHEMA).fields(new String[]{"id BIGINT EXTERNAL NAME \"__key.id\"", "name VARCHAR"})).create();
        Assertions.assertThatThrownBy(() -> this.insertRecord(Long.MAX_VALUE, "Alice")).hasMessageContaining("Cannot convert 9223372036854775807 to INT (field=id)");
    }

    @Test
    public void test_unionWithString() {
        Assume.assumeFalse((boolean)this.useSchemaRegistry);
        String name = SqlAvroTest.createRandomTopic();
        Schema schema = (Schema)((SchemaBuilder.NullDefault)((SchemaBuilder.UnionAccumulator)((SchemaBuilder.UnionAccumulator)SchemaBuilder.record((String)"jet.sql").fields().optionalString("ssn").name("info").type().unionOf().nullType().and().booleanType()).and().stringType()).endUnion()).nullDefault().endRecord();
        ((SqlTestSupport.SqlMapping)this.kafkaMapping(name, ID_SCHEMA, schema).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "ssn INT", "info OBJECT"})).create();
        this.insertAndAssertRecord(SqlAvroTest.row(1, 123456789, 42), SqlAvroTest.row(1, "123456789", "42"), SqlAvroTest.row(1, 123456789, "42"));
    }

    @Test
    public void test_nonInclusiveUnion() {
        Assume.assumeFalse((boolean)this.useSchemaRegistry);
        String name = SqlAvroTest.createRandomTopic();
        Schema schema = (Schema)((SchemaBuilder.NullDefault)((SchemaBuilder.UnionAccumulator)((SchemaBuilder.UnionAccumulator)SchemaBuilder.record((String)"jet.sql").fields().name("info").type().unionOf().nullType().and().booleanType()).and().intType()).endUnion()).nullDefault().endRecord();
        ((SqlTestSupport.SqlMapping)this.kafkaMapping(name, ID_SCHEMA, schema).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "info OBJECT"})).create();
        this.insertRecord(1, null);
        this.insertRecord(2, true);
        this.insertRecord(3, "true");
        this.insertRecord(4, 42);
        this.insertRecord(5, "42");
        SqlAvroTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, List.of(new SqlTestSupport.Row(1, null), new SqlTestSupport.Row(2, true), new SqlTestSupport.Row(3, true), new SqlTestSupport.Row(4, 42), new SqlTestSupport.Row(5, 42)));
        Assertions.assertThatThrownBy(() -> this.insertRecord(6, Long.MAX_VALUE)).hasMessageContaining("Not in union [\"null\",\"boolean\",\"int\"]: 9223372036854775807 (Long) (field=info)");
    }

    @Test
    public void test_fieldResolution() {
        Assume.assumeFalse((boolean)this.useSchemaRegistry);
        Schema childSchema = (Schema)((SchemaBuilder.NullDefault)((SchemaBuilder.UnionAccumulator)((SchemaBuilder.UnionAccumulator)((SchemaBuilder.IntDefault)((SchemaBuilder.UnionAccumulator)((SchemaBuilder.UnionAccumulator)((SchemaBuilder.IntDefault)((SchemaBuilder.UnionAccumulator)SchemaBuilder.record((String)"child").fields().requiredBoolean("requiredBoolean").nullableBoolean("nullableBoolean", true).optionalBoolean("optionalBoolean").requiredInt("requiredInt").nullableInt("nullableInt", 1).optionalInt("optionalInt").requiredLong("requiredLong").nullableLong("nullableLong", 1L).optionalLong("optionalLong").requiredFloat("requiredFloat").nullableFloat("nullableFloat", 1.0f).optionalFloat("optionalFloat").requiredDouble("requiredDouble").nullableDouble("nullableDouble", 1.0).optionalDouble("optionalDouble").requiredString("requiredString").nullableString("nullableString", "A").optionalString("optionalString").name("requiredUnion").type().unionOf().intType().and().stringType()).endUnion()).noDefault().name("nullableUnion").type().unionOf().intType().and().stringType()).and().nullType()).endUnion()).intDefault(1).name("optionalUnion").type().unionOf().nullType().and().intType()).and().stringType()).endUnion()).nullDefault().endRecord();
        String name = SqlAvroTest.createRandomTopic();
        this.kafkaMapping(name, ID_SCHEMA, childSchema).create();
        new SqlTestSupport.SqlInsert(name).literals("id", 1, "requiredBoolean", false, "requiredInt", 0, "requiredLong", 0L, "requiredFloat", Float.valueOf(0.0f), "requiredDouble", 0.0, "requiredString", "", "requiredUnion", 0).literals("nullableBoolean", true, "nullableInt", 1, "nullableLong", 1L, "nullableFloat", Float.valueOf(1.0f), "nullableDouble", 1.0, "nullableString", "A", "nullableUnion", 1).execute();
        Object[] allRow = new Object[]{1, false, true, null, 0, 1, null, 0L, 1L, null, Float.valueOf(0.0f), Float.valueOf(1.0f), null, 0.0, 1.0, null, "", "A", null, 0, 1, null};
        SqlAvroTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, List.of(new SqlTestSupport.Row(allRow)));
        Schema parentSchema = (Schema)((SchemaBuilder.FieldAssembler)SchemaBuilder.record((String)"parent").fields().name("requiredChild").type(childSchema).noDefault().name("nullableChild").type((Schema)SchemaBuilder.nullable().type(childSchema)).withDefault((Object)new GenericRecordBuilder(childSchema).set("requiredBoolean", (Object)false).set("requiredInt", (Object)0).set("requiredLong", (Object)0L).set("requiredFloat", (Object)Float.valueOf(0.0f)).set("requiredDouble", (Object)0.0).set("requiredString", (Object)"").set("requiredUnion", (Object)0).build()).name("optionalChild").type().optional().type(childSchema)).endRecord();
        new SqlTestSupport.SqlType("Child").create();
        name = SqlAvroTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)this.kafkaMapping(name, ID_SCHEMA, parentSchema).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "requiredChild Child", "nullableChild Child", "optionalChild Child"})).create();
        Object[] valueRow = Arrays.copyOfRange(allRow, 1, allRow.length);
        new SqlTestSupport.SqlInsert(name).literals("id", 1, "requiredChild", valueRow).literals("nullableChild", valueRow).execute();
        GenericRecord record = SqlAvroTest.createRecord(childSchema, valueRow);
        SqlAvroTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, List.of(new SqlTestSupport.Row(1, record, record, null)));
    }

    @Test
    @Category(value={NightlyTest.class})
    public void test_allConversions() {
        Assume.assumeFalse((boolean)this.useSchemaRegistry);
        ArrayList<Object> conversions = new ArrayList<Object>();
        conversions.addAll(SqlAvroTest.cartesian(List.of(Tuple2.tuple2((Object)QueryDataTypeFamily.BOOLEAN, (Object)true)), List.of(Tuple2.tuple2((Object)Schema.Type.BOOLEAN, (Object)true), Tuple2.tuple2((Object)Schema.Type.STRING, (Object)"true"))));
        conversions.addAll(SqlAvroTest.cartesian(List.of(Tuple2.tuple2((Object)QueryDataTypeFamily.TINYINT, (Object)1), Tuple2.tuple2((Object)QueryDataTypeFamily.SMALLINT, (Object)1), Tuple2.tuple2((Object)QueryDataTypeFamily.INTEGER, (Object)1), Tuple2.tuple2((Object)QueryDataTypeFamily.BIGINT, (Object)1L), Tuple2.tuple2((Object)QueryDataTypeFamily.REAL, (Object)Float.valueOf(1.0f)), Tuple2.tuple2((Object)QueryDataTypeFamily.DOUBLE, (Object)1.0), Tuple2.tuple2((Object)QueryDataTypeFamily.DECIMAL, (Object)BigDecimal.ONE)), List.of(Tuple2.tuple2((Object)Schema.Type.INT, (Object)1), Tuple2.tuple2((Object)Schema.Type.LONG, (Object)1L), Tuple2.tuple2((Object)Schema.Type.FLOAT, (Object)Float.valueOf(1.0f)), Tuple2.tuple2((Object)Schema.Type.DOUBLE, (Object)1.0))));
        conversions.addAll(SqlAvroTest.cartesian(List.of(Tuple2.tuple2((Object)QueryDataTypeFamily.TINYINT, (Object)1), Tuple2.tuple2((Object)QueryDataTypeFamily.SMALLINT, (Object)1), Tuple2.tuple2((Object)QueryDataTypeFamily.INTEGER, (Object)1), Tuple2.tuple2((Object)QueryDataTypeFamily.BIGINT, (Object)1L), Tuple2.tuple2((Object)QueryDataTypeFamily.DECIMAL, (Object)BigDecimal.ONE)), List.of(Tuple2.tuple2((Object)Schema.Type.STRING, (Object)"1"))));
        conversions.addAll(SqlAvroTest.cartesian(List.of(Tuple2.tuple2((Object)QueryDataTypeFamily.REAL, (Object)Float.valueOf(1.0f)), Tuple2.tuple2((Object)QueryDataTypeFamily.DOUBLE, (Object)1.0)), List.of(Tuple2.tuple2((Object)Schema.Type.STRING, (Object)"1.0"))));
        conversions.addAll(List.of(Tuple4.tuple4((Object)QueryDataTypeFamily.TIME, (Object)LocalTime.of(12, 23, 34), (Object)Schema.Type.STRING, (Object)"12:23:34"), Tuple4.tuple4((Object)QueryDataTypeFamily.DATE, (Object)LocalDate.of(2020, 4, 15), (Object)Schema.Type.STRING, (Object)"2020-04-15"), Tuple4.tuple4((Object)QueryDataTypeFamily.TIMESTAMP, (Object)LocalDateTime.of(2020, 4, 15, 12, 23, 34, 1000000), (Object)Schema.Type.STRING, (Object)"2020-04-15T12:23:34.001"), Tuple4.tuple4((Object)QueryDataTypeFamily.TIMESTAMP_WITH_TIME_ZONE, (Object)OffsetDateTime.of(2020, 4, 15, 12, 23, 34, 200000000, ZoneOffset.UTC), (Object)Schema.Type.STRING, (Object)"2020-04-15T12:23:34.200Z")));
        conversions.addAll(SqlAvroTest.cartesian(List.of(Tuple2.tuple2((Object)QueryDataTypeFamily.VARCHAR, (Object)"1")), List.of(Tuple2.tuple2((Object)Schema.Type.INT, (Object)1), Tuple2.tuple2((Object)Schema.Type.LONG, (Object)1L), Tuple2.tuple2((Object)Schema.Type.STRING, (Object)"1"))));
        conversions.addAll(SqlAvroTest.cartesian(List.of(Tuple2.tuple2((Object)QueryDataTypeFamily.VARCHAR, (Object)"1.0")), List.of(Tuple2.tuple2((Object)Schema.Type.FLOAT, (Object)Float.valueOf(1.0f)), Tuple2.tuple2((Object)Schema.Type.DOUBLE, (Object)1.0))));
        conversions.add(Tuple4.tuple4((Object)QueryDataTypeFamily.VARCHAR, (Object)"true", (Object)Schema.Type.BOOLEAN, (Object)true));
        conversions.add(Tuple4.tuple4((Object)QueryDataTypeFamily.OBJECT, (Object)"string", (Object)Schema.Type.UNION, (Object)"string"));
        conversions.add(Tuple4.tuple4((Object)QueryDataTypeFamily.OBJECT, null, (Object)Schema.Type.NULL, null));
        ImmutableMap mappingFieldTypes = ImmutableMap.builder().put((Object)QueryDataType.BOOLEAN, (Object)"BOOLEAN").put((Object)QueryDataType.TINYINT, (Object)"TINYINT").put((Object)QueryDataType.SMALLINT, (Object)"SMALLINT").put((Object)QueryDataType.INT, (Object)"INT").put((Object)QueryDataType.BIGINT, (Object)"BIGINT").put((Object)QueryDataType.REAL, (Object)"REAL").put((Object)QueryDataType.DOUBLE, (Object)"DOUBLE").put((Object)QueryDataType.DECIMAL, (Object)"DECIMAL").put((Object)QueryDataType.VARCHAR, (Object)"VARCHAR").put((Object)QueryDataType.TIME, (Object)"TIME").put((Object)QueryDataType.DATE, (Object)"DATE").put((Object)QueryDataType.TIMESTAMP, (Object)"TIMESTAMP").put((Object)QueryDataType.TIMESTAMP_WITH_TZ_OFFSET_DATE_TIME, (Object)"TIMESTAMP WITH TIME ZONE").put((Object)QueryDataType.OBJECT, (Object)"OBJECT").put((Object)QueryDataType.JSON, (Object)"JSON").build();
        ArrayList<Schema> schemaFieldTypes = new ArrayList<Schema>();
        Stream.of(Schema.Type.BOOLEAN, Schema.Type.INT, Schema.Type.LONG, Schema.Type.FLOAT, Schema.Type.DOUBLE, Schema.Type.STRING, Schema.Type.NULL, Schema.Type.BYTES).forEach(type -> schemaFieldTypes.add(Schema.create((Schema.Type)type)));
        schemaFieldTypes.addAll(List.of(KvMetadataAvroResolver.Schemas.OBJECT_SCHEMA, (Schema)SchemaBuilder.array().items(Schema.create((Schema.Type)Schema.Type.INT)), (Schema)SchemaBuilder.map().values(Schema.create((Schema.Type)Schema.Type.INT)), (Schema)SchemaBuilder.enumeration((String)"enum").symbols(new String[]{"symbol"}), (Schema)SchemaBuilder.fixed((String)"fixed").size(0), (Schema)SchemaBuilder.record((String)"record").fields().endRecord()));
        for (Map.Entry entry : mappingFieldTypes.entrySet()) {
            QueryDataType mappingFieldType = (QueryDataType)entry.getKey();
            QueryDataTypeFamily mappingFieldTypeFamily = mappingFieldType.getTypeFamily();
            String sqlFieldType = (String)entry.getValue();
            boolean mappingFieldTypeSupported = conversions.stream().anyMatch(c -> mappingFieldTypeFamily == c.f0());
            for (Schema fieldSchema : schemaFieldTypes) {
                Schema.Type schemaFieldType = fieldSchema.getType();
                Schema valueSchema = (Schema)SchemaBuilder.record((String)"jet.sql").fields().name("info").type(KvMetadataAvroResolver.optional((Schema)fieldSchema)).withDefault(null).endRecord();
                Consumer<String> createKafkaMapping = name -> ((SqlTestSupport.SqlMapping)this.kafkaMapping((String)name, ID_SCHEMA, valueSchema).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "info " + sqlFieldType})).create();
                if (mappingFieldTypeSupported) {
                    Tuple2 conversion = conversions.stream().filter(c -> mappingFieldTypeFamily == c.f0() && schemaFieldType == c.f2()).map(c -> Tuple2.tuple2((Object)c.f1(), (Object)c.f3())).findFirst().orElse(null);
                    if (conversion != null) {
                        String name2 = SqlAvroTest.createRandomTopic();
                        createKafkaMapping.accept(name2);
                        System.out.println(">> " + String.valueOf(mappingFieldType) + " <- " + String.valueOf(conversion.f1()) + ":" + String.valueOf(schemaFieldType));
                        this.insertAndAssertRecord(SqlAvroTest.row(1, conversion.f0()), SqlAvroTest.row(1, conversion.f1()));
                        continue;
                    }
                    Assertions.assertThatThrownBy(() -> createKafkaMapping.accept("kafka")).hasMessage(String.valueOf(schemaFieldType) + " schema type is incompatible with " + String.valueOf(mappingFieldType) + " mapping type");
                    continue;
                }
                Assertions.assertThatThrownBy(() -> createKafkaMapping.accept("kafka")).hasMessage("Unsupported type: " + String.valueOf(mappingFieldType));
            }
        }
    }

    @Test
    public void test_nulls() {
        String name = SqlAvroTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)this.kafkaMapping(name, ID_SCHEMA, NAME_SCHEMA).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR"})).create();
        this.insertAndAssertRecord(new Object[]{null, null});
    }

    @Test
    public void test_fieldsMapping() {
        String name = SqlAvroTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)this.kafkaMapping(name, NAME_SCHEMA, NAME_SCHEMA).fields(new String[]{"key_name VARCHAR EXTERNAL NAME \"__key.name\"", "value_name VARCHAR EXTERNAL NAME \"this.name\""})).create();
        this.insertAndAssertRecord("Alice", "Bob");
    }

    @Test
    public void test_schemaEvolution() {
        String name = SqlAvroTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)this.kafkaMapping(name, ID_SCHEMA, NAME_SCHEMA).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR"})).create();
        this.insertRecord(13, "Alice");
        ((SqlTestSupport.SqlMapping)this.kafkaMapping(name, ID_SCHEMA, NAME_SSN_SCHEMA).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR", "ssn BIGINT"})).createOrReplace();
        this.insertRecord(69, "Bob", 123456789);
        Runnable assertRecords = () -> SqlAvroTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, List.of(new SqlTestSupport.Row(13, "Alice", null), new SqlTestSupport.Row(69, "Bob", 123456789L)));
        if (this.useSchemaRegistry) {
            assertRecords.run();
        } else {
            Assertions.assertThatThrownBy(assertRecords::run).hasMessageContaining("Error deserializing VALUE");
        }
    }

    @Test
    public void test_allTypes() {
        String from = SqlAvroTest.randomName();
        TestAllTypesSqlConnector.create(sqlService, from);
        String to = SqlAvroTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)this.kafkaMapping(to, ID_SCHEMA, ALL_TYPES_SCHEMA).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "string VARCHAR", "\"boolean\" BOOLEAN", "byte TINYINT", "short SMALLINT", "\"int\" INT", "long BIGINT", "\"float\" REAL", "\"double\" DOUBLE", "\"decimal\" DECIMAL", "\"time\" TIME", "\"date\" DATE", "\"timestamp\" TIMESTAMP", "timestampTz TIMESTAMP WITH TIME ZONE", "map OBJECT", "object OBJECT"})).create();
        sqlService.execute("INSERT INTO " + to + " SELECT 1, f.* FROM " + from + " f", new Object[0]);
        SqlAvroTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + to, List.of(new SqlTestSupport.Row(1, "string", true, (byte)127, (short)Short.MAX_VALUE, Integer.MAX_VALUE, Long.MAX_VALUE, Float.valueOf(1.234568E9f), 1.234512345678901E14, new BigDecimal("9223372036854775.123"), LocalTime.of(12, 23, 34), LocalDate.of(2020, 4, 15), LocalDateTime.of(2020, 4, 15, 12, 23, 34, 1000000), OffsetDateTime.of(2020, 4, 15, 12, 23, 34, 200000000, ZoneOffset.UTC), "{42=43}", null)));
    }

    @Test
    public void when_explicitTopLevelField_then_fail_key() {
        this.when_explicitTopLevelField_then_fail("__key", "this");
    }

    @Test
    public void when_explicitTopLevelField_then_fail_this() {
        this.when_explicitTopLevelField_then_fail("this", "__key");
    }

    private void when_explicitTopLevelField_then_fail(String field, String otherField) {
        Assertions.assertThatThrownBy(() -> ((SqlTestSupport.SqlMapping)this.kafkaMapping("kafka", NAME_SCHEMA, NAME_SCHEMA).fields(new String[]{field + " VARCHAR", "f VARCHAR EXTERNAL NAME \"" + otherField + ".name\""})).create()).hasMessage("Cannot use the '" + field + "' field with Avro serialization");
    }

    @Test
    public void test_writingToTopLevel() {
        String name = SqlAvroTest.randomName();
        ((SqlTestSupport.SqlMapping)this.kafkaMapping(name, ID_SCHEMA, NAME_SCHEMA).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR"})).create();
        Assertions.assertThatThrownBy(() -> sqlService.execute("INSERT INTO " + name + "(__key, name) VALUES ('{\"id\":1}', null)", new Object[0])).hasMessageContaining("Writing to top-level fields of type OBJECT not supported");
        Assertions.assertThatThrownBy(() -> sqlService.execute("INSERT INTO " + name + "(id, this) VALUES (1, '{\"name\":\"foo\"}')", new Object[0])).hasMessageContaining("Writing to top-level fields of type OBJECT not supported");
    }

    @Test
    public void test_topLevelFieldExtraction() {
        String name = SqlAvroTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)this.kafkaMapping(name, ID_SCHEMA, NAME_SCHEMA).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR"})).create();
        this.insertRecord(1, "Alice");
        SqlAvroTest.assertRowsEventuallyInAnyOrder("SELECT __key, this FROM " + name, List.of(new SqlTestSupport.Row(new GenericRecordBuilder(ID_SCHEMA).set("id", (Object)1).build(), new GenericRecordBuilder(NAME_SCHEMA).set("name", (Object)"Alice").build())));
    }

    @Test
    public void test_explicitKeyAndValueSerializers() {
        String name = SqlAvroTest.createRandomTopic();
        Class serializerClass = this.useSchemaRegistry ? KafkaAvroSerializer.class : HazelcastKafkaAvroSerializer.class;
        Class deserializerClass = this.useSchemaRegistry ? KafkaAvroDeserializer.class : HazelcastKafkaAvroDeserializer.class;
        ((SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)this.kafkaMapping(name, NAME_SCHEMA, NAME_SCHEMA).fields(new String[]{"key_name VARCHAR EXTERNAL NAME \"__key.name\"", "value_name VARCHAR EXTERNAL NAME \"this.name\""})).options(new Object[]{"key.serializer", serializerClass.getCanonicalName(), "key.deserializer", deserializerClass.getCanonicalName(), "value.serializer", serializerClass.getCanonicalName(), "value.deserializer", deserializerClass.getCanonicalName()})).create();
        this.insertAndAssertRecord("Alice", "Bob");
    }

    @Test
    public void test_keyAndValueSchemaAvailableInInformationSchema() {
        Assume.assumeFalse((boolean)this.useSchemaRegistry);
        String name = SqlAvroTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)this.kafkaMapping(name, ID_SCHEMA, NAME_SCHEMA).fields(new String[]{"id BIGINT EXTERNAL NAME \"__key.id\"", "name VARCHAR"})).create();
        String options = (String)((SqlRow)sqlService.execute("SELECT mapping_options FROM information_schema.mappings", new Object[0]).iterator().next()).getObject(0);
        BiConsumer<String, Schema> assertContainsSchema = (option, schema) -> SqlAvroTest.assertContains((String)options, (String)String.format("\"%s\":\"%s\"", option, schema.toString().replace("\"", "\\\"")));
        assertContainsSchema.accept("keyAvroSchema", this.keySchema);
        assertContainsSchema.accept("valueAvroSchema", this.valueSchema);
    }

    @Test
    public void test_schemaIdForTwoQueriesIsEqual() {
        Assume.assumeTrue((boolean)this.useSchemaRegistry);
        String name = SqlAvroTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)this.kafkaMapping(name).fields(new String[]{"__key INT", "field1 VARCHAR"})).options(new Object[]{"keyFormat", "java", "keyJavaClass", Integer.class.getCanonicalName()})).create();
        this.insertRecord(42, "foo");
        this.insertRecord(43, "bar");
        try (KafkaConsumer consumer = kafkaTestSupport.createConsumer(IntegerDeserializer.class, ByteArrayDeserializer.class, Collections.emptyMap(), new String[]{name});){
            long timeLimit = System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
            ArrayList<Integer> schemaIds = new ArrayList<Integer>();
            while (schemaIds.size() < 2) {
                if (System.nanoTime() > timeLimit) {
                    Assert.fail((String)"Timeout waiting for the records from Kafka");
                }
                ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
                for (ConsumerRecord record : records) {
                    int id = Bits.readInt((byte[])((byte[])record.value()), (int)1, (boolean)true);
                    schemaIds.add(id);
                }
            }
            Assert.assertEquals((String)"The schemaIds of the two records don't match", schemaIds.get(0), schemaIds.get(1));
        }
    }

    private static String createRandomTopic() {
        return SqlAvroTest.createRandomTopic(4);
    }

    private void insertRecord(Object ... values) {
        SqlAvroTest.insertLiterals(SqlAvroTest.instance(), this.mapping.name, values);
    }

    private void insertAndAssertRecord(Object ... values) {
        this.insertAndAssertRecord(values, values, values);
    }

    private void insertAndAssertRecord(@Nonnull Object[] sqlValues, @Nonnull Object[] avroValues) {
        this.insertAndAssertRecord(sqlValues, avroValues, sqlValues);
    }

    private void insertAndAssertRecord(@Nonnull Object[] insertValues, @Nonnull Object[] avroValues, @Nonnull Object[] selectValues) {
        this.insertRecord(insertValues);
        kafkaTestSupport.assertTopicContentsEventually(this.mapping.name, Map.of(SqlAvroTest.createRecord(this.keySchema, Arrays.copyOfRange(this.mapping.fields, 0, 1), Arrays.copyOfRange(avroValues, 0, 1)), SqlAvroTest.createRecord(this.valueSchema, Arrays.copyOfRange(this.mapping.fields, 1, this.mapping.fields.length), Arrays.copyOfRange(avroValues, 1, this.mapping.fields.length))), this.useSchemaRegistry ? KafkaAvroDeserializer.class : HazelcastKafkaAvroDeserializer.class, this.useSchemaRegistry ? KafkaAvroDeserializer.class : HazelcastKafkaAvroDeserializer.class, this.clientProperties);
        SqlAvroTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + this.mapping.name, List.of(new SqlTestSupport.Row(selectValues)));
    }

    private static <T1, T2, T3, T4> List<Tuple4<T1, T2, T3, T4>> cartesian(List<Tuple2<T1, T2>> list1, List<Tuple2<T3, T4>> list2) {
        return Lists.cartesianProduct((List[])new List[]{list1, list2}).stream().map(t -> Tuple4.tuple4((Object)((Tuple2)t.get(0)).f0(), (Object)((Tuple2)t.get(0)).f1(), (Object)((Tuple2)t.get(1)).f0(), (Object)((Tuple2)t.get(1)).f1())).collect(Collectors.toList());
    }

    private class KafkaMapping
    extends SqlTestSupport.SqlMapping {
        KafkaMapping(String name) {
            super(name, KafkaSqlConnector.class);
        }

        @Override
        protected void create(HazelcastInstance instance, boolean replace, boolean ifNotExists) {
            super.create(instance, replace, ifNotExists);
            SqlAvroTest.this.mapping = this.toTypeTree();
        }
    }
}

