/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.kafka;

import com.hazelcast.jet.core.TestUtil;
import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.kafka.KafkaSqlConnector;
import com.hazelcast.jet.sql.impl.connector.kafka.KafkaSqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.test.TestBatchSqlConnector;
import com.hazelcast.sql.SqlStatement;
import com.hazelcast.sql.impl.ResultIterator;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class SqlPrimitiveTest
extends KafkaSqlTestSupport {
    private static final int INITIAL_PARTITION_COUNT = 4;

    private static SqlTestSupport.SqlMapping kafkaMapping(String name, boolean simple) {
        return (SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)new SqlTestSupport.SqlMapping(name, KafkaSqlConnector.class).options(new Object[]{"bootstrap.servers", kafkaTestSupport.getBrokerConnectionString(), "auto.offset.reset", "earliest"})).optionsIf(simple, new Object[]{"keyFormat", "int", "valueFormat", "varchar"})).optionsIf(!simple, new Object[]{"keyFormat", "java", "keyJavaClass", Integer.class.getName(), "valueFormat", "java", "valueJavaClass", String.class.getName()});
    }

    private static void createMapping(String name, boolean simple) {
        SqlPrimitiveTest.kafkaMapping(name, simple).create();
    }

    @Test
    public void test_insertSelect() {
        String name = SqlPrimitiveTest.createRandomTopic();
        SqlPrimitiveTest.createMapping(name, false);
        String from = SqlPrimitiveTest.randomName();
        TestBatchSqlConnector.create(sqlService, from, 4);
        SqlPrimitiveTest.assertTopicEventually(name, "INSERT INTO " + name + " SELECT v, 'value-' || v FROM " + from, TestUtil.createMap((Object[])new Object[]{0, "value-0", 1, "value-1", 2, "value-2", 3, "value-3"}));
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name + " WHERE __key > 0 AND __key < 3", Arrays.asList(new SqlTestSupport.Row(1, "value-1"), new SqlTestSupport.Row(2, "value-2")));
    }

    @Test
    public void createKafkaMappingWithDataConnection() {
        String dlName = SqlPrimitiveTest.randomName();
        sqlService.execute("CREATE DATA CONNECTION " + dlName + " TYPE Kafka NOT SHARED " + String.format("OPTIONS ( 'bootstrap.servers' = '%s', 'key.deserializer' = '%s', 'key.serializer' = '%s', 'value.serializer' = '%s', 'value.deserializer' = '%s', 'auto.offset.reset' = 'earliest') ", kafkaTestSupport.getBrokerConnectionString(), IntegerDeserializer.class.getCanonicalName(), IntegerSerializer.class.getCanonicalName(), StringSerializer.class.getCanonicalName(), StringDeserializer.class.getCanonicalName()), new Object[0]);
        String name = SqlPrimitiveTest.randomName();
        ((SqlTestSupport.SqlMapping)new SqlTestSupport.SqlMapping(name, dlName).options(new Object[]{"keyFormat", "java", "keyJavaClass", Integer.class.getName(), "valueFormat", "java", "valueJavaClass", String.class.getName()})).create();
        String from = SqlPrimitiveTest.randomName();
        TestBatchSqlConnector.create(sqlService, from, 4);
        SqlPrimitiveTest.assertTopicEventually(name, "INSERT INTO " + name + " SELECT v, 'value-' || v FROM " + from, TestUtil.createMap((Object[])new Object[]{0, "value-0", 1, "value-1", 2, "value-2", 3, "value-3"}));
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name + " WHERE __key > 0 AND __key < 3", Arrays.asList(new SqlTestSupport.Row(1, "value-1"), new SqlTestSupport.Row(2, "value-2")));
    }

    @Test
    public void test_insertValues() {
        String name = SqlPrimitiveTest.createRandomTopic();
        SqlPrimitiveTest.createMapping(name, false);
        SqlPrimitiveTest.assertTopicEventually(name, "INSERT INTO " + name + " (__key, this) VALUES (1, '2')", TestUtil.createMap((Object[])new Object[]{1, "2"}));
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, Collections.singletonList(new SqlTestSupport.Row(1, "2")));
    }

    @Test
    public void test_insertSink() {
        String name = SqlPrimitiveTest.createRandomTopic();
        SqlPrimitiveTest.createMapping(name, false);
        SqlPrimitiveTest.assertTopicEventually(name, "SINK INTO " + name + " (this, __key) VALUES ('2', 1)", TestUtil.createMap((Object[])new Object[]{1, "2"}));
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, Collections.singletonList(new SqlTestSupport.Row(1, "2")));
    }

    @Test
    public void test_insertSink_simpleKeyFormat() {
        String name = SqlPrimitiveTest.createRandomTopic();
        SqlPrimitiveTest.createMapping(name, true);
        SqlPrimitiveTest.assertTopicEventually(name, "SINK INTO " + name + " (this, __key) VALUES ('2', 1)", TestUtil.createMap((Object[])new Object[]{1, "2"}));
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, Collections.singletonList(new SqlTestSupport.Row(1, "2")));
    }

    @Test
    public void test_insertNulls() {
        String name = SqlPrimitiveTest.createRandomTopic();
        SqlPrimitiveTest.createMapping(name, false);
        SqlPrimitiveTest.assertTopicEventually(name, "INSERT INTO " + name + " VALUES (null, null)", TestUtil.createMap((Object[])new Object[]{null, null}));
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, Collections.singletonList(new SqlTestSupport.Row(null, null)));
    }

    @Test
    public void test_insertWithProject() {
        String name = SqlPrimitiveTest.createRandomTopic();
        SqlPrimitiveTest.createMapping(name, false);
        SqlPrimitiveTest.assertTopicEventually(name, "INSERT INTO " + name + " (this, __key) VALUES ('2', CAST(0 + 1 AS INT))", TestUtil.createMap((Object[])new Object[]{1, "2"}));
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, Collections.singletonList(new SqlTestSupport.Row(1, "2")));
    }

    @Test
    public void test_insertWithDynamicParameters() {
        String name = SqlPrimitiveTest.createRandomTopic();
        SqlPrimitiveTest.createMapping(name, false);
        SqlPrimitiveTest.assertTopicEventually(name, "INSERT INTO " + name + " (this, __key) VALUES (?, CAST(0 + ? AS INT))", Arrays.asList("2", 1), TestUtil.createMap((Object[])new Object[]{1, "2"}));
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, Collections.singletonList(new SqlTestSupport.Row(1, "2")));
    }

    @Test
    public void test_selectWithDynamicParameters() {
        String name = SqlPrimitiveTest.createRandomTopic();
        SqlPrimitiveTest.createMapping(name, false);
        sqlService.execute("INSERT INTO " + name + " VALUES (1, '1'),  (2, '2')", new Object[0]);
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT __key + ?, ABS(__key + ?), this FROM " + name + " WHERE __key + ? >= ?", Arrays.asList(2, -10, 2, 4), Collections.singletonList(new SqlTestSupport.Row(4L, 8L, "2")));
    }

    @Test
    public void test_renameKey() {
        Assertions.assertThatThrownBy(() -> ((SqlTestSupport.SqlMapping)SqlPrimitiveTest.kafkaMapping("map", true).fields(new String[]{"id INT EXTERNAL NAME __key", "this VARCHAR"})).create()).hasMessageContaining("Cannot rename field: '__key'");
        Assertions.assertThatThrownBy(() -> ((SqlTestSupport.SqlMapping)SqlPrimitiveTest.kafkaMapping("map", true).fields(new String[]{"__key INT EXTERNAL NAME renamed", "this VARCHAR"})).create()).hasMessageContaining("Cannot rename field: '__key'");
    }

    @Test
    public void test_renameThis() {
        Assertions.assertThatThrownBy(() -> ((SqlTestSupport.SqlMapping)SqlPrimitiveTest.kafkaMapping("map", true).fields(new String[]{"__key INT", "name VARCHAR EXTERNAL NAME this"})).create()).hasMessageContaining("Cannot rename field: 'this'");
        Assertions.assertThatThrownBy(() -> ((SqlTestSupport.SqlMapping)SqlPrimitiveTest.kafkaMapping("map", true).fields(new String[]{"__key INT", "this VARCHAR EXTERNAL NAME renamed"})).create()).hasMessageContaining("Cannot rename field: 'this'");
    }

    @Test
    public void test_objectAndMappingNameDifferent() {
        String topicName = SqlPrimitiveTest.createRandomTopic();
        String tableName = SqlPrimitiveTest.randomName();
        SqlPrimitiveTest.kafkaMapping(tableName, false).externalName(topicName).create();
        kafkaTestSupport.produce(topicName, (Object)1, (Object)"Alice");
        kafkaTestSupport.produce(topicName, (Object)2, (Object)"Bob");
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + tableName, Arrays.asList(new SqlTestSupport.Row(1, "Alice"), new SqlTestSupport.Row(2, "Bob")));
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + tableName, Arrays.asList(new SqlTestSupport.Row(1, "Alice"), new SqlTestSupport.Row(2, "Bob")));
    }

    @Test
    public void test_explicitKeyAndThis() {
        String topicName = SqlPrimitiveTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)SqlPrimitiveTest.kafkaMapping(topicName, false).fields(new String[]{"__key INT", "this VARCHAR"})).create();
        kafkaTestSupport.produce(topicName, (Object)1, (Object)"Alice");
        kafkaTestSupport.produce(topicName, (Object)2, (Object)"Bob");
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + topicName, Arrays.asList(new SqlTestSupport.Row(1, "Alice"), new SqlTestSupport.Row(2, "Bob")));
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + topicName, Arrays.asList(new SqlTestSupport.Row(1, "Alice"), new SqlTestSupport.Row(2, "Bob")));
    }

    @Test
    public void test_explicitKeyAndThisWithExternalNames() {
        String topicName = SqlPrimitiveTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)SqlPrimitiveTest.kafkaMapping(topicName, false).fields(new String[]{"__key INT EXTERNAL NAME __key", "this VARCHAR EXTERNAL NAME this"})).create();
        kafkaTestSupport.produce(topicName, (Object)1, (Object)"Alice");
        kafkaTestSupport.produce(topicName, (Object)2, (Object)"Bob");
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + topicName, Arrays.asList(new SqlTestSupport.Row(1, "Alice"), new SqlTestSupport.Row(2, "Bob")));
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + topicName, Arrays.asList(new SqlTestSupport.Row(1, "Alice"), new SqlTestSupport.Row(2, "Bob")));
    }

    @Test
    public void test_explicitKeyAndValueSerializers() {
        String name = SqlPrimitiveTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)SqlPrimitiveTest.kafkaMapping(name, false).options(new Object[]{"key.serializer", IntegerSerializer.class.getCanonicalName(), "key.deserializer", IntegerDeserializer.class.getCanonicalName(), "value.serializer", StringSerializer.class.getCanonicalName(), "value.deserializer", StringDeserializer.class.getCanonicalName()})).create();
        SqlPrimitiveTest.assertTopicEventually(name, "INSERT INTO " + name + " (__key, this) VALUES (1, '2')", TestUtil.createMap((Object[])new Object[]{1, "2"}));
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + name, Collections.singletonList(new SqlTestSupport.Row(1, "2")));
    }

    @Test
    public void test_noKeyFormat() {
        String topicName = SqlPrimitiveTest.createRandomTopic();
        ((SqlTestSupport.SqlMapping)new SqlTestSupport.SqlMapping(topicName, KafkaSqlConnector.class).options(new Object[]{"valueFormat", "java", "valueJavaClass", Integer.class.getName(), "bootstrap.servers", kafkaTestSupport.getBrokerConnectionString(), "auto.offset.reset", "earliest"})).create();
        sqlService.execute("INSERT INTO " + topicName + " VALUES(42)", new Object[0]);
        SqlPrimitiveTest.assertRowsEventuallyInAnyOrder("SELECT * FROM " + topicName, Collections.singletonList(new SqlTestSupport.Row(42)));
    }

    @Test
    public void test_noValueFormat() {
        Assertions.assertThatThrownBy(() -> ((SqlTestSupport.SqlMapping)new SqlTestSupport.SqlMapping("kafka", KafkaSqlConnector.class).options(new Object[]{"keyFormat", "java", "keyJavaClass", String.class.getName()})).create()).hasMessage("Missing 'valueFormat' option");
    }

    @Test
    public void test_multipleFieldsForPrimitive_key() {
        this.test_multipleFieldsForPrimitive("__key");
    }

    @Test
    public void test_multipleFieldsForPrimitive_value() {
        this.test_multipleFieldsForPrimitive("this");
    }

    private void test_multipleFieldsForPrimitive(String fieldName) {
        Assertions.assertThatThrownBy(() -> ((SqlTestSupport.SqlMapping)((SqlTestSupport.SqlMapping)new SqlTestSupport.SqlMapping("kafka", KafkaSqlConnector.class).fields(new String[]{fieldName + " INT", "field INT EXTERNAL NAME \"" + fieldName + ".field\""})).options(new Object[]{"keyFormat", "java", "keyJavaClass", Integer.class.getName(), "valueFormat", "java", "valueJavaClass", Integer.class.getName()})).create()).hasMessage("The field '" + fieldName + "' is of type INTEGER, you can't map '" + fieldName + ".field' too");
    }

    @Test
    public void test_nonExistentTopic() {
        String name = "nonExistentTopic";
        SqlPrimitiveTest.createMapping(name, false);
        ResultIterator result = (ResultIterator)sqlService.execute("select * from " + name, new Object[0]).iterator();
        result.hasNext(500L, TimeUnit.MILLISECONDS);
    }

    private static String createRandomTopic() {
        return SqlPrimitiveTest.createRandomTopic(4);
    }

    private static void assertTopicEventually(String name, String sql, Map<Integer, String> expected) {
        SqlPrimitiveTest.assertTopicEventually(name, sql, Collections.emptyList(), expected);
    }

    private static void assertTopicEventually(String name, String sql, List<Object> arguments, Map<Integer, String> expected) {
        SqlStatement statement = new SqlStatement(sql);
        arguments.forEach(arg_0 -> ((SqlStatement)statement).addParameter(arg_0));
        sqlService.execute(statement);
        kafkaTestSupport.assertTopicContentsEventually(name, expected, false);
    }
}

