/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.kafka;

import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.kafka.KafkaSqlConnector;
import com.hazelcast.jet.sql.impl.connector.kafka.KafkaSqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.kafka.model.AllCanonicalTypesValue;
import com.hazelcast.jet.sql.impl.connector.kafka.model.AllCanonicalTypesValueDeserializer;
import com.hazelcast.jet.sql.impl.connector.kafka.model.AllCanonicalTypesValueSerializer;
import com.hazelcast.jet.sql.impl.connector.kafka.model.JavaDeserializer;
import com.hazelcast.jet.sql.impl.connector.kafka.model.JavaSerializer;
import com.hazelcast.jet.sql.impl.connector.kafka.model.Person;
import com.hazelcast.jet.sql.impl.connector.kafka.model.PersonId;
import com.hazelcast.jet.sql.impl.connector.test.TestAllTypesSqlConnector;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelJVMTest;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.assertj.core.api.Assertions;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={NightlyTest.class, ParallelJVMTest.class})
public class SqlPojoTest
extends KafkaSqlTestSupport {
    private static final int INITIAL_PARTITION_COUNT = 4;

    @BeforeClass
    public static void setup() throws Exception {
        SqlPojoTest.setup(1, null);
    }

    private static <K, V> SqlTestSupport.SqlMapping kafkaMapping(String name, Class<K> keyClass, Class<V> valueClass, Class<? extends Serializer<? super K>> keySerializerClass, Class<? extends Deserializer<? super K>> keyDeserializerClass, Class<? extends Serializer<? super V>> valueSerializerClass, Class<? extends Deserializer<? super V>> valueDeserializerClass) {
        return (SqlTestSupport.SqlMapping)new SqlTestSupport.SqlMapping(name, KafkaSqlConnector.class).options(new Object[]{"keyFormat", "java", "keyJavaClass", keyClass.getName(), "valueFormat", "java", "valueJavaClass", valueClass.getName(), "bootstrap.servers", kafkaTestSupport.getBrokerConnectionString(), "key.serializer", keySerializerClass.getCanonicalName(), "key.deserializer", keyDeserializerClass.getCanonicalName(), "value.serializer", valueSerializerClass.getCanonicalName(), "value.deserializer", valueDeserializerClass.getCanonicalName(), "auto.offset.reset", "earliest"});
    }

    @Test
    public void test_nulls() {
        String name = SqlPojoTest.createRandomTopic();
        SqlPojoTest.kafkaMapping(name, PersonId.class, Person.class, JavaSerializer.class, JavaDeserializer.class, JavaSerializer.class, JavaDeserializer.class).create();
        SqlPojoTest.assertTopicEventually(name, "INSERT INTO " + name + " VALUES (null, null)", Map.of(new PersonId(), new Person()));
        SqlPojoTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, List.of(new SqlTestSupport.Row(null, null)));
    }

    @Test
    public void test_fieldsShadowing() {
        String name = SqlPojoTest.createRandomTopic();
        SqlPojoTest.kafkaMapping(name, PersonId.class, Person.class, JavaSerializer.class, JavaDeserializer.class, JavaSerializer.class, JavaDeserializer.class).create();
        SqlPojoTest.assertTopicEventually(name, "INSERT INTO " + name + " VALUES (1, 'Alice')", Map.of(new PersonId(1), new Person(null, "Alice")));
        SqlPojoTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, List.of(new SqlTestSupport.Row(1, "Alice")));
    }

    @Test
    public void test_fieldsMapping() {
        String name = SqlPojoTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)SqlPojoTest.kafkaMapping(name, PersonId.class, Person.class, JavaSerializer.class, JavaDeserializer.class, JavaSerializer.class, JavaDeserializer.class).fields(new String[]{"key_id INT EXTERNAL NAME \"__key.id\"", "value_id INT EXTERNAL NAME \"this.id\"", "name VARCHAR"})).create();
        SqlPojoTest.assertTopicEventually(name, "INSERT INTO " + name + " (value_id, key_id) VALUES (2, 1)", Map.of(new PersonId(1), new Person(2, null)));
        SqlPojoTest.assertRowsEventuallyInAnyOrder("SELECT  key_id, value_id FROM " + name, List.of(new SqlTestSupport.Row(1, 2)));
    }

    @Test
    public void test_allTypes() {
        String from = SqlPojoTest.randomName();
        TestAllTypesSqlConnector.create(sqlService, from);
        String to = SqlPojoTest.createRandomTopic();
        SqlPojoTest.kafkaMapping(to, PersonId.class, AllCanonicalTypesValue.class, JavaSerializer.class, JavaDeserializer.class, AllCanonicalTypesValueSerializer.class, AllCanonicalTypesValueDeserializer.class).create();
        sqlService.execute("INSERT INTO " + to + "(id, string, boolean0, byte0, short0, int0, long0, float0, double0, \"decimal\", \"time\", \"date\", \"timestamp\", timestampTz, object) SELECT CAST(1 AS INT), string, \"boolean\", byte, short, \"int\", long, \"float\", \"double\", \"decimal\", \"time\", \"date\", \"timestamp\", timestampTz, \"object\" FROM " + from + " f", new Object[0]);
        SqlPojoTest.assertRowsEventuallyInAnyOrder("SELECT id, string, boolean0, byte0, short0, int0, long0, float0, double0, \"decimal\", \"time\", \"date\", \"timestamp\", timestampTz, object FROM " + to, List.of(new SqlTestSupport.Row(1, "string", true, (byte)127, (short)Short.MAX_VALUE, Integer.MAX_VALUE, Long.MAX_VALUE, Float.valueOf(1.234568E9f), 1.234512345678901E14, new BigDecimal("9223372036854775.123"), LocalTime.of(12, 23, 34), LocalDate.of(2020, 4, 15), LocalDateTime.of(2020, 4, 15, 12, 23, 34, 1000000), OffsetDateTime.of(2020, 4, 15, 12, 23, 34, 200000000, ZoneOffset.UTC), null)));
    }

    @Test
    public void test_writingToTopLevelWhileNestedFieldMapped_explicit() {
        this.test_writingToTopLevel(true);
    }

    @Test
    public void test_writingToTopLevelWhileNestedFieldMapped_implicit() {
        this.test_writingToTopLevel(false);
    }

    public void test_writingToTopLevel(boolean explicit) {
        String topicName = SqlPojoTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)SqlPojoTest.kafkaMapping(topicName, Integer.class, Person.class, IntegerSerializer.class, IntegerDeserializer.class, JavaSerializer.class, JavaDeserializer.class).fields(new String[]{"__key INT"})).fieldsIf(explicit, new String[]{"this OBJECT"})).fields(new String[]{"name VARCHAR"})).create();
        if (explicit) {
            Assertions.assertThatThrownBy(() -> sqlService.execute("INSERT INTO " + topicName + " VALUES(1, null, 'foo')", new Object[0])).hasMessageContaining("Writing to top-level fields of type OBJECT not supported");
        }
        Assertions.assertThatThrownBy(() -> sqlService.execute("INSERT INTO " + topicName + "(__key, this) VALUES(1, null)", new Object[0])).hasMessageContaining("Writing to top-level fields of type OBJECT not supported");
        sqlService.execute("INSERT INTO " + topicName + (explicit ? "(__key, name)" : "") + " VALUES (1, 'foo')", new Object[0]);
        SqlPojoTest.assertRowsEventuallyInAnyOrder("SELECT __key, this, name FROM " + topicName, List.of(new SqlTestSupport.Row(1, new Person(null, "foo"), "foo")));
    }

    @Test
    public void test_topLevelFieldExtraction() {
        String name = SqlPojoTest.createRandomTopic();
        SqlPojoTest.kafkaMapping(name, PersonId.class, Person.class, JavaSerializer.class, JavaDeserializer.class, JavaSerializer.class, JavaDeserializer.class).create();
        sqlService.execute("INSERT INTO " + name + " VALUES (1, 'Alice')", new Object[0]);
        SqlPojoTest.assertRowsEventuallyInAnyOrder("SELECT __key, this FROM " + name, List.of(new SqlTestSupport.Row(new PersonId(1), new Person(null, "Alice"))));
    }

    @Test
    public void test_customType() {
        new SqlTestSupport.SqlType("person_type").create();
        ((SqlTestSupport.SqlMapping)SqlPojoTest.kafkaMapping("m", Integer.class, ClzWithPerson.class, IntegerSerializer.class, IntegerDeserializer.class, JavaSerializer.class, JavaDeserializer.class).fields(new String[]{"__key INT", "outerField INT", "person person_type"})).create();
        sqlService.execute("insert into m values (0, 1, (2, 'foo'))", new Object[0]);
        SqlPojoTest.assertRowsEventuallyInAnyOrder("select outerField, (person).id, (person).name from m", SqlPojoTest.rows(3, 1, 2, "foo"));
    }

    private static String createRandomTopic() {
        return SqlPojoTest.createRandomTopic(4);
    }

    private static void assertTopicEventually(String name, String sql, Map<PersonId, Person> expected) {
        sqlService.execute(sql, new Object[0]);
        kafkaTestSupport.assertTopicContentsEventually(name, expected, JavaDeserializer.class, JavaDeserializer.class);
    }

    public static class ClzWithPerson
    implements Serializable {
        public int outerField;
        public Person person;
    }
}

