/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.jdbc;

import com.hazelcast.jet.sql.impl.connector.jdbc.JdbcSqlTestSupport;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;
import com.hazelcast.test.jdbc.H2DatabaseProvider;
import com.hazelcast.test.jdbc.TestDatabaseProvider;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.BeforeClass;
import org.junit.Test;

public class JdbcStreamJoinTest
extends JdbcSqlTestSupport {
    @BeforeClass
    public static void beforeClass() {
        JdbcStreamJoinTest.initialize((TestDatabaseProvider)new H2DatabaseProvider());
    }

    private static String getWorkerName(int index) {
        return "myworker" + index;
    }

    private static int getSSN(int index) {
        return 207 + index;
    }

    private static String getInsertSQL(String tableName, int index) {
        return String.format("INSERT INTO %s VALUES(%d, '%s', %d)", tableName, index, JdbcStreamJoinTest.getWorkerName(index), JdbcStreamJoinTest.getSSN(index));
    }

    @Test
    public void test_stream2BatchJoin() throws Exception {
        String tableName = JdbcStreamJoinTest.randomTableName();
        JdbcStreamJoinTest.createTable(tableName, "id INT PRIMARY KEY", "name VARCHAR(100)", "ssn INT DEFAULT 1");
        for (int index = 1; index < 3; ++index) {
            String sql = JdbcStreamJoinTest.getInsertSQL(tableName, index);
            JdbcStreamJoinTest.executeJdbc(sql);
        }
        JdbcStreamJoinTest.execute("CREATE MAPPING " + tableName + " ( id INT,  name VARCHAR,  ssn INT ) DATA CONNECTION testDatabaseRef", new Object[0]);
        List<SqlRow> actualList = this.getRows("SELECT n.id, n.name, n.ssn , t.v FROM TABLE(GENERATE_STREAM(2)) t JOIN " + tableName + " n ON t.v = n.id LIMIT 2");
        List ssnList = actualList.stream().map(sqlRow -> sqlRow.getObject("ssn")).collect(Collectors.toList());
        Assertions.assertThat(ssnList).contains(new Object[]{208, 209});
    }

    @Test
    public void test_stream2BatchJoinJoinOnNonPrimaryKey() throws Exception {
        String tableName = JdbcStreamJoinTest.randomTableName();
        JdbcStreamJoinTest.createTable(tableName, "id INT PRIMARY KEY", "name VARCHAR(100)", "ssn INT DEFAULT 1");
        for (int index = 1; index < 3; ++index) {
            String sql = JdbcStreamJoinTest.getInsertSQL(tableName, index);
            JdbcStreamJoinTest.executeJdbc(sql);
        }
        JdbcStreamJoinTest.execute("CREATE MAPPING " + tableName + " ( id INT,  name VARCHAR,  ssn INT ) DATA CONNECTION testDatabaseRef", new Object[0]);
        List<SqlRow> actualList = this.getRows("SELECT n.id, n.name, n.ssn , t.v FROM TABLE(generate_stream(300)) t JOIN " + tableName + " n ON t.v = n.ssn LIMIT 2");
        List ssnList = actualList.stream().map(sqlRow -> sqlRow.getObject("ssn")).collect(Collectors.toList());
        Assertions.assertThat(ssnList).contains(new Object[]{208, 209});
    }

    @Test
    public void test_stream2BatchLeftOuterJoin() throws Exception {
        String tableName = JdbcStreamJoinTest.randomTableName();
        JdbcStreamJoinTest.createTable(tableName, "id INT PRIMARY KEY", "name VARCHAR(100)", "ssn INT DEFAULT 1");
        for (int index = 1; index < 3; ++index) {
            String sql = JdbcStreamJoinTest.getInsertSQL(tableName, index);
            JdbcStreamJoinTest.executeJdbc(sql);
        }
        JdbcStreamJoinTest.execute("CREATE MAPPING " + tableName + " ( id INT,  name VARCHAR,  ssn INT ) DATA CONNECTION testDatabaseRef", new Object[0]);
        List<SqlRow> actualList = this.getRows("SELECT n.id, n.name, n.ssn , t.v FROM TABLE(GENERATE_STREAM(2)) t LEFT OUTER JOIN " + tableName + " n ON t.v = n.id LIMIT 6");
        List ssnList = actualList.stream().map(sqlRow -> sqlRow.getObject("ssn")).collect(Collectors.toList());
        Assertions.assertThat(ssnList).contains(new Object[]{null, null, 208, null, 209, null});
    }

    @Test
    public void test_stream2BatchLeftOuterJoinWithEmptyTable() throws Exception {
        String tableName = JdbcStreamJoinTest.randomTableName();
        JdbcStreamJoinTest.createTable(tableName, "id INT PRIMARY KEY", "name VARCHAR(100)", "ssn INT DEFAULT 1");
        JdbcStreamJoinTest.execute("CREATE MAPPING " + tableName + " ( id INT,  name VARCHAR,  ssn INT ) DATA CONNECTION testDatabaseRef", new Object[0]);
        List<SqlRow> actualList = this.getRows("SELECT n.id, n.name, n.ssn , t.v FROM TABLE(GENERATE_STREAM(2)) t LEFT OUTER JOIN " + tableName + " n ON t.v = n.id LIMIT 3");
        List ssnList = actualList.stream().map(sqlRow -> sqlRow.getObject("ssn")).collect(Collectors.toList());
        Assertions.assertThat(ssnList).contains(new Object[]{null, null, null, null, null, null});
    }

    private List<SqlRow> getRows(String sql) {
        ArrayList<SqlRow> actualList = new ArrayList<SqlRow>();
        try (SqlResult sqlResult = sqlService.execute(sql, new Object[0]);){
            Iterator iterator = sqlResult.iterator();
            iterator.forEachRemaining(actualList::add);
        }
        return actualList;
    }
}

