/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.kafka;

import com.hazelcast.core.HazelcastJsonValue;
import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.kafka.KafkaSqlConnector;
import com.hazelcast.jet.sql.impl.connector.kafka.KafkaSqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.test.TestAllTypesSqlConnector;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelJVMTest;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={NightlyTest.class, ParallelJVMTest.class})
public class SqlJsonTest
extends KafkaSqlTestSupport {
    private static final int INITIAL_PARTITION_COUNT = 4;

    private static SqlTestSupport.SqlMapping kafkaMapping(String name) {
        return (SqlTestSupport.SqlMapping)new SqlTestSupport.SqlMapping(name, KafkaSqlConnector.class).options(new Object[]{"keyFormat", "json-flat", "valueFormat", "json-flat", "bootstrap.servers", kafkaTestSupport.getBrokerConnectionString(), "auto.offset.reset", "earliest"});
    }

    @Test
    public void test_nulls() {
        String name = SqlJsonTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)SqlJsonTest.kafkaMapping(name).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR EXTERNAL NAME \"this.name\""})).create();
        SqlJsonTest.assertTopicEventually(name, "INSERT INTO " + name + " VALUES (null, null)", Map.of("{\"id\":null}", "{\"name\":null}"));
        SqlJsonTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, List.of(new SqlTestSupport.Row(null, null)));
    }

    @Test
    public void test_fieldsMapping() {
        String name = SqlJsonTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)SqlJsonTest.kafkaMapping(name).fields(new String[]{"key_name VARCHAR EXTERNAL NAME \"__key.name\"", "value_name VARCHAR EXTERNAL NAME \"this.name\""})).create();
        SqlJsonTest.assertTopicEventually(name, "INSERT INTO " + name + " (value_name, key_name) VALUES ('Bob', 'Alice')", Map.of("{\"name\":\"Alice\"}", "{\"name\":\"Bob\"}"));
        SqlJsonTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, List.of(new SqlTestSupport.Row("Alice", "Bob")));
    }

    @Test
    public void test_schemaEvolution() {
        String name = SqlJsonTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)SqlJsonTest.kafkaMapping(name).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR"})).create();
        sqlService.execute("INSERT INTO " + name + " VALUES (13, 'Alice')", new Object[0]);
        ((SqlTestSupport.SqlMapping)SqlJsonTest.kafkaMapping(name).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR", "ssn BIGINT"})).createOrReplace();
        sqlService.execute("INSERT INTO " + name + " VALUES (69, 'Bob', 123456789)", new Object[0]);
        SqlJsonTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, List.of(new SqlTestSupport.Row(13, "Alice", null), new SqlTestSupport.Row(69, "Bob", 123456789L)));
    }

    @Test
    public void test_allTypes() {
        String from = SqlJsonTest.randomName();
        TestAllTypesSqlConnector.create(sqlService, from);
        String to = SqlJsonTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)SqlJsonTest.kafkaMapping(to).fields(new String[]{"id VARCHAR EXTERNAL NAME \"__key.id\"", "string VARCHAR", "\"boolean\" BOOLEAN", "byte TINYINT", "short SMALLINT", "\"int\" INT", "long BIGINT", "\"float\" REAL", "\"double\" DOUBLE", "\"decimal\" DECIMAL", "\"time\" TIME", "\"date\" DATE", "\"timestamp\" TIMESTAMP", "timestampTz TIMESTAMP WITH TIME ZONE", "map OBJECT", "object OBJECT"})).create();
        sqlService.execute("INSERT INTO " + to + " SELECT '1', f.* FROM " + from + " f", new Object[0]);
        SqlJsonTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + to, List.of(new SqlTestSupport.Row("1", "string", true, (byte)127, (short)Short.MAX_VALUE, Integer.MAX_VALUE, Long.MAX_VALUE, Float.valueOf(1.234568E9f), 1.234512345678901E14, new BigDecimal("9223372036854775.123"), LocalTime.of(12, 23, 34), LocalDate.of(2020, 4, 15), LocalDateTime.of(2020, 4, 15, 12, 23, 34, 1000000), OffsetDateTime.of(2020, 4, 15, 12, 23, 34, 200000000, ZoneOffset.UTC), Map.of("42", 43), null)));
    }

    @Test
    public void when_createMappingNoColumns_then_fail() {
        Assertions.assertThatThrownBy(() -> SqlJsonTest.kafkaMapping("kafka").create()).hasMessage("Column list is required for JSON format");
    }

    @Test
    public void test_jsonType() {
        String name = SqlJsonTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)SqlJsonTest.kafkaMapping(name).options(new Object[]{"keyFormat", "json", "valueFormat", "json"})).create();
        SqlJsonTest.assertTopicEventually(name, "INSERT INTO " + name + " VALUES ('[1,2,3]', '[4,5,6]')", Map.of("[1,2,3]", "[4,5,6]"));
        SqlJsonTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, List.of(new SqlTestSupport.Row(new HazelcastJsonValue("[1,2,3]"), new HazelcastJsonValue("[4,5,6]"))));
    }

    @Test
    public void when_explicitTopLevelField_then_fail_key() {
        this.when_explicitTopLevelField_then_fail("__key", "this");
    }

    @Test
    public void when_explicitTopLevelField_then_fail_this() {
        this.when_explicitTopLevelField_then_fail("this", "__key");
    }

    private void when_explicitTopLevelField_then_fail(String field, String otherField) {
        Assertions.assertThatThrownBy(() -> ((SqlTestSupport.SqlMapping)SqlJsonTest.kafkaMapping("kafka").fields(new String[]{field + " VARCHAR", "f VARCHAR EXTERNAL NAME \"" + otherField + ".f\""})).create()).hasMessage("Cannot use '" + field + "' field with JSON serialization");
    }

    @Test
    public void test_writingToTopLevel() {
        String mapName = SqlJsonTest.randomName();
        ((SqlTestSupport.SqlMapping)SqlJsonTest.kafkaMapping(mapName).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR"})).create();
        Assertions.assertThatThrownBy(() -> sqlService.execute("INSERT INTO " + mapName + "(__key, name) VALUES ('{\"id\":1}', null)", new Object[0])).hasMessageContaining("Writing to top-level fields of type OBJECT not supported");
        Assertions.assertThatThrownBy(() -> sqlService.execute("INSERT INTO " + mapName + "(id, this) VALUES (1, '{\"name\":\"foo\"}')", new Object[0])).hasMessageContaining("Writing to top-level fields of type OBJECT not supported");
    }

    @Test
    public void test_topLevelFieldExtraction() {
        String name = SqlJsonTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)SqlJsonTest.kafkaMapping(name).fields(new String[]{"id INT EXTERNAL NAME \"__key.id\"", "name VARCHAR"})).create();
        sqlService.execute("INSERT INTO " + name + " VALUES (1, 'Alice')", new Object[0]);
        SqlJsonTest.assertRowsEventuallyInAnyOrder("SELECT __key, this FROM " + name, List.of(new SqlTestSupport.Row(Map.of("id", 1), Map.of("name", "Alice"))));
    }

    @Test
    public void test_explicitKeyAndValueSerializers() {
        String name = SqlJsonTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)SqlJsonTest.kafkaMapping(name).fields(new String[]{"key_name VARCHAR EXTERNAL NAME \"__key.name\"", "value_name VARCHAR EXTERNAL NAME \"this.name\""})).options(new Object[]{"key.serializer", ByteArraySerializer.class.getCanonicalName(), "key.deserializer", ByteArrayDeserializer.class.getCanonicalName(), "value.serializer", ByteArraySerializer.class.getCanonicalName(), "value.deserializer", ByteArrayDeserializer.class.getCanonicalName()})).create();
        SqlJsonTest.assertTopicEventually(name, "INSERT INTO " + name + " (value_name, key_name) VALUES ('Bob', 'Alice')", Map.of("{\"name\":\"Alice\"}", "{\"name\":\"Bob\"}"));
        SqlJsonTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, List.of(new SqlTestSupport.Row("Alice", "Bob")));
    }

    private static String createRandomTopic() {
        return SqlJsonTest.createRandomTopic(4);
    }

    private static void assertTopicEventually(String name, String sql, Map<String, String> expected) {
        sqlService.execute(sql, new Object[0]);
        kafkaTestSupport.assertTopicContentsEventually(name, expected, StringDeserializer.class, StringDeserializer.class);
    }
}

