/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.JobAssertions;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.map.model.Person;
import com.hazelcast.jet.sql.impl.connector.test.TestBatchSqlConnector;
import com.hazelcast.map.IMap;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;
import com.hazelcast.sql.SqlService;
import com.hazelcast.sql.impl.client.SqlClientService;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

public class SqlClientTest
extends SqlTestSupport {
    @BeforeClass
    public static void setUpClass() {
        SqlClientTest.initializeWithClient((int)2, null, null);
    }

    @Test
    public void test_partitionBasedRouting_insert() {
        SqlClientTest.createMapping("test_simpleKey", Integer.class, String.class);
        SqlClientTest.createMapping("test_complexKey", Person.class, String.class);
        this.checkPartitionArgumentIndex("INSERT INTO test_simpleKey (__key, this) VALUES (?, ?)", 0, 1, "value");
        this.checkPartitionArgumentIndex("INSERT INTO test_simpleKey (this, __key) VALUES (?, ?)", 1, "value", 2);
        this.checkPartitionArgumentIndex("INSERT INTO test_simpleKey (this, __key) VALUES ('value', 3)", null, new Object[0]);
        this.checkPartitionArgumentIndex("INSERT INTO test_simpleKey (this, __key) VALUES ('value', 4), ('value', 5)", null, new Object[0]);
        this.checkPartitionArgumentIndex("INSERT INTO test_simpleKey (this, __key) VALUES (?, ?), (?, ?)", null, "value", 6, "value", 7);
        this.checkPartitionArgumentIndex("INSERT INTO test_complexKey (this, id, name) VALUES (?, ?, ?)", null, "value-1", 1, "name-1");
        Assertions.assertThatThrownBy(() -> this.checkPartitionArgumentIndex("INSERT INTO test_complexKey (this, __key) VALUES (?, ?)", null, "value-1", new Person(2, "name-2"))).hasMessageEndingWith("Writing to top-level fields of type OBJECT not supported");
    }

    @Test
    public void test_partitionBasedRouting() {
        SqlClientTest.createMapping("test", Integer.class, String.class);
        SqlClientTest.createMapping("test2", Integer.class, String.class);
        this.checkPartitionArgumentIndex("SELECT * FROM test WHERE __key = ?", 0, 1);
        this.checkPartitionArgumentIndex("UPDATE test SET this = ? WHERE __key = ?", 1, "testVal", 1);
        this.checkPartitionArgumentIndex("DELETE FROM test WHERE __key = ?", 0, 1);
        this.checkPartitionArgumentIndex("SELECT JSON_OBJECT(this : __key) FROM test WHERE __key = ?", 0, 1);
        this.checkPartitionArgumentIndex("SELECT JSON_ARRAY(__key, this) FROM test WHERE __key = ?", 0, 1);
        this.checkPartitionArgumentIndex("SELECT JSON_OBJECTAGG(this : __key) FROM test WHERE __key = ?", null, 1);
        this.checkPartitionArgumentIndex("SELECT SUM(__key) FROM test WHERE __key = ?", null, 1);
        this.checkPartitionArgumentIndex("SELECT COUNT(*) FROM test WHERE __key = ?", null, 1);
        this.checkPartitionArgumentIndex("SELECT * FROM test t1 JOIN test2 t2 ON t1.__key = t2.__key WHERE t1.__key = ?", null, 1);
        this.checkPartitionArgumentIndex("SELECT t1.*, t2.* FROM test t1 JOIN test2 t2 USING(__key) WHERE t1.__key = ?", null, 1);
    }

    @Test
    public void test_partitionBasedRoutingComplexKey() {
        SqlClientTest.createMapping("test_complex", Person.class, String.class);
        this.checkPartitionArgumentIndex("SELECT * FROM test_complex WHERE __key = ?", 0, new Person(1, "name-1"));
        this.checkPartitionArgumentIndex("UPDATE test_complex SET this = ? WHERE __key = ?", 1, "testVal", new Person(1, "name-1"));
        this.checkPartitionArgumentIndex("DELETE FROM test_complex WHERE __key = ?", 0, new Person(1, "name-1"));
    }

    @Test
    @Ignore(value="Currently unsupported: https://github.com/hazelcast/hazelcast/pull/22659#issuecomment-1382086013")
    public void test_partitionBasedRouting_multipleConditions() {
        SqlClientTest.createMapping("test", Integer.class, String.class);
        SqlClientTest.createMapping("complex_value", Integer.class, Person.class);
        this.checkPartitionArgumentIndex("SELECT * FROM test WHERE __key = ? AND this = ?", 0, 1, "aaa");
        this.checkPartitionArgumentIndex("SELECT * FROM test WHERE this = ? AND __key = ?", 1, "aaa", 1);
        this.checkPartitionArgumentIndex("SELECT * FROM complex_value WHERE __key = ? AND id = ?", 0, 1, 1);
        this.checkPartitionArgumentIndex("SELECT * FROM complex_value WHERE __key = ? AND id = ? AND name = ?", 0, 1, 1, "name-1");
    }

    private void checkPartitionArgumentIndex(String sql, Integer expectedIndex, Object ... arguments) {
        SqlClientService sqlService = (SqlClientService)SqlClientTest.client().getSql();
        Assert.assertNull((Object)sqlService.partitionArgumentIndexCache.get((Object)sql));
        sqlService.execute(sql, arguments);
        Assert.assertEquals((Object)expectedIndex, (Object)sqlService.partitionArgumentIndexCache.get((Object)sql));
    }

    @Test
    public void test_jetJobReturnRowsToClientFrom() {
        HazelcastInstance client = SqlClientTest.factory().newHazelcastClient();
        SqlService sqlService = client.getSql();
        int itemCount = 10000;
        TestBatchSqlConnector.create(sqlService, "t", itemCount);
        SqlResult result = sqlService.execute("SELECT v FROM t", new Object[0]);
        BitSet seenValues = new BitSet(itemCount);
        for (SqlRow r : result) {
            Integer v = (Integer)r.getObject(0);
            Assert.assertFalse((String)("value already seen: " + v), (boolean)seenValues.get(v));
            seenValues.set(v);
        }
        Assert.assertEquals((long)itemCount, (long)seenValues.cardinality());
    }

    @Test
    public void when_clientDisconnects_then_jobCancelled() {
        HazelcastInstance client = SqlClientTest.factory().newHazelcastClient();
        SqlService sqlService = client.getSql();
        sqlService.execute("SELECT * FROM TABLE(GENERATE_STREAM(100))", new Object[0]);
        Job job = SqlClientTest.awaitSingleRunningJob((HazelcastInstance)SqlClientTest.instance());
        client.shutdown();
        JobAssertions.assertThat((Job)job).eventuallyHasStatus(JobStatus.FAILED);
        Assertions.assertThatThrownBy(() -> ((Job)job).join()).hasMessageContaining("QueryException: Client cannot be reached");
    }

    @Test
    public void when_jobFails_then_clientFindsOut() {
        HazelcastInstance client = SqlClientTest.factory().newHazelcastClient();
        SqlService sqlService = client.getSql();
        sqlService.execute("CREATE MAPPING t TYPE FailingSource", new Object[0]);
        Assertions.assertThatThrownBy(() -> {
            SqlResult result = sqlService.execute("SELECT * FROM t", new Object[0]);
            for (SqlRow r : result) {
                System.out.println(r);
            }
        }).hasMessageContaining("mock failure");
    }

    @Test
    public void when_resultClosed_then_jobCancelled_withNoResults() {
        HazelcastInstance client = SqlClientTest.factory().newHazelcastClient();
        SqlService sqlService = client.getSql();
        this.logger.info("before select");
        SqlResult result = sqlService.execute("SELECT * FROM TABLE(GENERATE_STREAM(0))", new Object[0]);
        this.logger.info("after execute returned");
        Job job = SqlClientTest.awaitSingleRunningJob((HazelcastInstance)client);
        this.logger.info("Job is running.");
        result.close();
        this.logger.info("after res.close() returned");
        JobAssertions.assertThat((Job)job).eventuallyHasStatus(JobStatus.FAILED);
    }

    @Test
    public void when_resultClosed_then_executionContextCleanedUp() {
        int i;
        HazelcastInstance client = SqlClientTest.factory().newHazelcastClient();
        SqlService sql = client.getSql();
        IMap map = SqlClientTest.instance().getMap("map");
        HashMap<Integer, Integer> tmpMap = new HashMap<Integer, Integer>();
        for (i = 0; i < 100000; ++i) {
            tmpMap.put(i, i);
            if (i % 10000 != 0) continue;
            map.putAll(tmpMap);
            tmpMap.clear();
        }
        SqlClientTest.createMapping("map", Integer.class, Integer.class);
        for (i = 0; i < 100; ++i) {
            SqlResult result = sql.execute("SELECT * FROM map", new Object[0]);
            result.close();
        }
        JetServiceBackend jetService = SqlClientTest.getJetServiceBackend((HazelcastInstance)SqlClientTest.instance());
        Collection contexts = jetService.getJobExecutionService().getExecutionContexts();
        SqlClientTest.assertTrueEventually(() -> {
            String remainingContexts = contexts.stream().map(c -> Util.idToString((long)c.executionId())).collect(Collectors.joining(", "));
            Assert.assertEquals((String)("remaining execIds: " + remainingContexts), (long)0L, (long)contexts.size());
        }, (long)5L);
        ConcurrentMap failedJobs = jetService.getJobExecutionService().getFailedJobs();
        SqlClientTest.assertTrueEventually(() -> Assert.assertEquals((long)0L, (long)failedJobs.size()));
    }
}

