/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.s2sjoin;

import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.test.TestStreamSqlConnector;
import com.hazelcast.sql.SqlService;
import com.hazelcast.sql.impl.type.QueryDataTypeFamily;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.util.Arrays;
import java.util.Collections;
import org.assertj.core.api.Assertions;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(value=HazelcastSerialClassRunner.class)
@Category(value={QuickTest.class, ParallelJVMTest.class})
public class SqlStreamToStreamJoinTest
extends SqlTestSupport {
    private static SqlService sqlService;

    @BeforeClass
    public static void setUpClass() {
        SqlStreamToStreamJoinTest.initialize((int)3, null);
        sqlService = SqlStreamToStreamJoinTest.instance().getSql();
    }

    @Test
    public void test_missingBound() {
        TestStreamSqlConnector.create(sqlService, "s", Arrays.asList("a", "b"), Arrays.asList(QueryDataTypeFamily.INTEGER, QueryDataTypeFamily.INTEGER), new Object[0][]);
        sqlService.execute("CREATE VIEW v AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE s, DESCRIPTOR(a), 0))", new Object[0]);
        Assertions.assertThatThrownBy(() -> sqlService.execute("SELECT * FROM v v1 JOIN v v2 ON 1=2", new Object[0])).hasMessageContaining("A stream-to-stream join must have a join condition constraining the maximum difference between time values of the joined tables in both directions").hasMessageContaining("Time columns on the left side: [a], time columns on the right side: [a]");
        Assertions.assertThatThrownBy(() -> sqlService.execute("SELECT * FROM v v1 JOIN v v2 ON v1.a >= v2.a", new Object[0])).hasMessageContaining("A stream-to-stream join must have a join condition constraining the maximum difference between time values of the joined tables in both directions");
        Assertions.assertThatThrownBy(() -> sqlService.execute("SELECT * FROM v v1 JOIN v v2 ON v2.a >= v1.a", new Object[0])).hasMessageContaining("A stream-to-stream join must have a join condition constraining the maximum difference between time values of the joined tables in both directions");
        Assertions.assertThatThrownBy(() -> sqlService.execute("SELECT * FROM v v1 JOIN v v2 ON v1.a BETWEEN v1.b AND v2.a", new Object[0])).hasMessageContaining("A stream-to-stream join must have a join condition constraining the maximum difference between time values of the joined tables in both directions");
        Assertions.assertThatThrownBy(() -> sqlService.execute("SELECT * FROM v v1 JOIN v v2 ON v2.a BETWEEN v2.b AND v1.a", new Object[0])).hasMessageContaining("A stream-to-stream join must have a join condition constraining the maximum difference between time values of the joined tables in both directions");
    }

    @Test
    public void test_joinIsContinuous() {
        String stream = "stream1";
        TestStreamSqlConnector.create(sqlService, stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP_WITH_TIME_ZONE), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(0L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(1L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(2L)));
        String stream2 = "stream2";
        TestStreamSqlConnector.create(sqlService, stream2, Collections.singletonList("b"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP_WITH_TIME_ZONE), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(1L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(2L)));
        sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '0.002' SECOND))", new Object[0]);
        sqlService.execute("CREATE VIEW s2 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream2, DESCRIPTOR(b), INTERVAL '0.002' SECOND))", new Object[0]);
        SqlStreamToStreamJoinTest.assertRowsEventuallyInAnyOrder("SELECT * FROM s1 JOIN s2 ON s2.b BETWEEN s1.a AND s1.a + INTERVAL '0.002' SECOND ", Arrays.asList(new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestampTz(0L), SqlStreamToStreamJoinTest.timestampTz(1L)), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestampTz(0L), SqlStreamToStreamJoinTest.timestampTz(2L)), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestampTz(1L), SqlStreamToStreamJoinTest.timestampTz(1L)), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestampTz(1L), SqlStreamToStreamJoinTest.timestampTz(2L)), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestampTz(2L), SqlStreamToStreamJoinTest.timestampTz(2L))));
    }

    @Test
    public void test_leftStreamToStreamJoinWithTimeBounds() {
        String stream = "stream1";
        TestStreamSqlConnector.create(sqlService, stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP_WITH_TIME_ZONE), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(0L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(3L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(4L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(5L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(51L)));
        String stream2 = "stream2";
        TestStreamSqlConnector.create(sqlService, stream2, Collections.singletonList("b"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP_WITH_TIME_ZONE), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(2L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(5L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(7L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(50L)));
        sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '0.002' SECOND))", new Object[0]);
        sqlService.execute("CREATE VIEW s2 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream2, DESCRIPTOR(b), INTERVAL '0.002' SECOND))", new Object[0]);
        SqlStreamToStreamJoinTest.assertRowsEventuallyInAnyOrder("SELECT * FROM s1 LEFT JOIN s2 ON s2.b BETWEEN s1.a AND s1.a + INTERVAL '0.001' SECOND ", Arrays.asList(new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestampTz(0L), null), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestampTz(3L), null), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestampTz(4L), SqlStreamToStreamJoinTest.timestampTz(5L)), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestampTz(5L), SqlStreamToStreamJoinTest.timestampTz(5L))));
    }

    @Test
    public void test_rightStreamToStreamJoinWithTimeBounds() {
        String stream = "stream1";
        TestStreamSqlConnector.create(sqlService, stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP_WITH_TIME_ZONE), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(2L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(5L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(7L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(50L)));
        String stream2 = "stream2";
        TestStreamSqlConnector.create(sqlService, stream2, Collections.singletonList("b"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP_WITH_TIME_ZONE), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(0L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(3L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(4L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(5L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(51L)));
        sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '0.002' SECOND))", new Object[0]);
        sqlService.execute("CREATE VIEW s2 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream2, DESCRIPTOR(b), INTERVAL '0.002' SECOND))", new Object[0]);
        SqlStreamToStreamJoinTest.assertRowsEventuallyInAnyOrder("SELECT * FROM s1 RIGHT JOIN s2 ON s1.a BETWEEN s2.b AND s2.b + INTERVAL '0.001' SECOND ", Arrays.asList(new SqlTestSupport.Row(null, SqlStreamToStreamJoinTest.timestampTz(0L)), new SqlTestSupport.Row(null, SqlStreamToStreamJoinTest.timestampTz(3L)), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestampTz(5L), SqlStreamToStreamJoinTest.timestampTz(4L)), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestampTz(5L), SqlStreamToStreamJoinTest.timestampTz(5L))));
    }

    @Test
    public void test_joinHasTimestampBounds() {
        String stream = "stream1";
        TestStreamSqlConnector.create(sqlService, stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(0L))});
        String stream2 = "stream2";
        TestStreamSqlConnector.create(sqlService, stream2, Collections.singletonList("b"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(0L))});
        sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '0.001' SECOND))", new Object[0]);
        sqlService.execute("CREATE VIEW s2 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream2, DESCRIPTOR(b), INTERVAL '0.001' SECOND))", new Object[0]);
        SqlStreamToStreamJoinTest.assertTipOfStream("SELECT * FROM s1 JOIN s2 ON s2.b BETWEEN s1.a - INTERVAL '0.001' SECOND                              AND     s1.a + INTERVAL '0.004' SECOND ", Collections.singletonList(new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestamp(0L), SqlStreamToStreamJoinTest.timestamp(0L))));
    }

    @Test
    public void test_joinIsEquiJoin() {
        String stream = "stream1";
        TestStreamSqlConnector.create(sqlService, stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(0L))});
        String stream2 = "stream2";
        TestStreamSqlConnector.create(sqlService, stream2, Collections.singletonList("b"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(0L))});
        sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '0.001' SECOND))", new Object[0]);
        sqlService.execute("CREATE VIEW s2 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream2, DESCRIPTOR(b), INTERVAL '0.001' SECOND))", new Object[0]);
        SqlStreamToStreamJoinTest.assertTipOfStream("SELECT * FROM s1 JOIN s2 ON s2.b = s1.a", Collections.singletonList(new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestamp(0L), SqlStreamToStreamJoinTest.timestamp(0L))));
    }

    @Test
    public void when_joinIsEquiJoinBetweenFunctions_then_fail() {
        String stream = "stream1";
        TestStreamSqlConnector.create(sqlService, stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(2L))});
        String stream2 = "stream2";
        TestStreamSqlConnector.create(sqlService, stream2, Collections.singletonList("b"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(2L))});
        sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '0.001' SECOND))", new Object[0]);
        sqlService.execute("CREATE VIEW s2 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream2, DESCRIPTOR(b), INTERVAL '0.001' SECOND))", new Object[0]);
        Assertions.assertThatThrownBy(() -> sqlService.execute("SELECT * FROM s1 JOIN s2 ON TO_TIMESTAMP_TZ(s2.b) = TO_TIMESTAMP_TZ(2)", new Object[0]), (String)"Only time bound / equality condition are supported for stream-to-stream JOIN", (Object[])new Object[0]);
    }

    @Test
    public void test_additionalJoinConditionApplies() {
        String stream = "stream1";
        TestStreamSqlConnector.create(sqlService, stream, Arrays.asList("a", "b"), Arrays.asList(QueryDataTypeFamily.TIMESTAMP, QueryDataTypeFamily.INTEGER), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(9L), 9), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(11L), 11), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(12L), 12), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(13L), 13), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(14L), 14), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(15L), 15));
        String stream2 = "stream2";
        TestStreamSqlConnector.create(sqlService, stream2, Arrays.asList("x", "y"), Arrays.asList(QueryDataTypeFamily.TIMESTAMP, QueryDataTypeFamily.INTEGER), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(9L), 9), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(10L), 10), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(11L), 11), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(12L), 12), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(13L), 13), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(14L), 14), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(15L), 15));
        sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '0.002' SECOND))", new Object[0]);
        sqlService.execute("CREATE VIEW s2 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream2, DESCRIPTOR(x), INTERVAL '0.002' SECOND))", new Object[0]);
        SqlStreamToStreamJoinTest.assertRowsEventuallyInAnyOrder("SELECT b, y FROM s1 JOIN s2 ON s2.x BETWEEN s1.a AND s1.a + INTERVAL '0.002' SECOND WHERE b % 2 = 0 ", Arrays.asList(new SqlTestSupport.Row(12, 12), new SqlTestSupport.Row(12, 13), new SqlTestSupport.Row(12, 14), new SqlTestSupport.Row(14, 14), new SqlTestSupport.Row(14, 15)));
    }

    @Test
    public void test_nonTemporalWatermarkedType() {
        TestStreamSqlConnector.create(sqlService, "stream1", Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.INTEGER), new Object[][]{SqlStreamToStreamJoinTest.row(42)});
        TestStreamSqlConnector.create(sqlService, "stream2", Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.INTEGER), new Object[][]{SqlStreamToStreamJoinTest.row(42)});
        sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), 1))", new Object[0]);
        sqlService.execute("CREATE VIEW s2 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream2, DESCRIPTOR(a), 1))", new Object[0]);
        SqlStreamToStreamJoinTest.assertTipOfStream("SELECT * FROM s1 JOIN s2 ON s1.a=s2.a", Collections.singletonList(new SqlTestSupport.Row(42, 42)));
    }

    @Test
    public void test_joinHasDoubledTimestampBounds() {
        String stream1 = "stream1";
        TestStreamSqlConnector.create(sqlService, stream1, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(100L))});
        String stream2 = "stream2";
        TestStreamSqlConnector.create(sqlService, stream2, Collections.singletonList("b"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(100L))});
        String stream3 = "stream3";
        TestStreamSqlConnector.create(sqlService, stream3, Collections.singletonList("c"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(101L))});
        sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '0.001' SECOND))", new Object[0]);
        sqlService.execute("CREATE VIEW s2 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream2, DESCRIPTOR(b), INTERVAL '0.001' SECOND))", new Object[0]);
        sqlService.execute("CREATE VIEW s3 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream3, DESCRIPTOR(c), INTERVAL '0.001' SECOND))", new Object[0]);
        SqlStreamToStreamJoinTest.assertTipOfStream("SELECT * FROM s1 JOIN s2 ON s2.b BETWEEN s1.a - INTERVAL '0.1' SECONDS AND s1.a JOIN s3 ON s3.c BETWEEN s2.b - INTERVAL '0.1' SECONDS AND s2.b + INTERVAL '0.5' SECONDS", Collections.singletonList(new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestamp(100L), SqlStreamToStreamJoinTest.timestamp(100L), SqlStreamToStreamJoinTest.timestamp(101L))));
    }

    @Test
    public void test_joinHasTripledTimestampBounds() {
        String stream1 = "stream1";
        TestStreamSqlConnector.create(sqlService, stream1, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(100L))});
        String stream2 = "stream2";
        TestStreamSqlConnector.create(sqlService, stream2, Collections.singletonList("b"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(100L))});
        String stream3 = "stream3";
        TestStreamSqlConnector.create(sqlService, stream3, Collections.singletonList("c"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(101L))});
        String stream4 = "stream4";
        TestStreamSqlConnector.create(sqlService, stream4, Collections.singletonList("d"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(102L))});
        sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '0.001' SECOND))", new Object[0]);
        sqlService.execute("CREATE VIEW s2 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream2, DESCRIPTOR(b), INTERVAL '0.001' SECOND))", new Object[0]);
        sqlService.execute("CREATE VIEW s3 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream3, DESCRIPTOR(c), INTERVAL '0.001' SECOND))", new Object[0]);
        sqlService.execute("CREATE VIEW s4 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream4, DESCRIPTOR(d), INTERVAL '0.001' SECOND))", new Object[0]);
        SqlStreamToStreamJoinTest.assertTipOfStream("SELECT * FROM s1 JOIN s2 ON s2.b BETWEEN s1.a - INTERVAL '0.1' SECONDS AND s1.a JOIN s3 ON s3.c BETWEEN s2.b AND s2.b + INTERVAL '0.1' SECONDS JOIN s4 ON s4.d BETWEEN s3.c AND s3.c + INTERVAL '0.1' SECONDS ", Collections.singletonList(new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestamp(100L), SqlStreamToStreamJoinTest.timestamp(100L), SqlStreamToStreamJoinTest.timestamp(101L), SqlStreamToStreamJoinTest.timestamp(102L))));
    }

    @Test
    public void test_calcReordersFields() {
        String stream = "stream1";
        TestStreamSqlConnector.create(sqlService, stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(0L))});
        String stream2 = "stream2";
        TestStreamSqlConnector.create(sqlService, stream2, Arrays.asList("b", "c", "d"), Arrays.asList(QueryDataTypeFamily.TIMESTAMP, QueryDataTypeFamily.INTEGER, QueryDataTypeFamily.VARCHAR), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(0L), 0, "zero")});
        sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '0.001' SECOND))", new Object[0]);
        sqlService.execute("CREATE VIEW s2 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream2, DESCRIPTOR(b), INTERVAL '0.001' SECOND))", new Object[0]);
        SqlStreamToStreamJoinTest.assertTipOfStream("SELECT d, c, b, a FROM s1 JOIN s2 ON s2.b BETWEEN s1.a - INTERVAL '0.001' SECOND                                       AND     s1.a + INTERVAL '0.004' SECOND ", Collections.singletonList(new SqlTestSupport.Row("zero", 0, SqlStreamToStreamJoinTest.timestamp(0L), SqlStreamToStreamJoinTest.timestamp(0L))));
    }

    @Test
    public void test_calcHasParent() {
        String stream = "stream1";
        TestStreamSqlConnector.create(sqlService, stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(0L))});
        String stream2 = "stream2";
        TestStreamSqlConnector.create(sqlService, stream2, Arrays.asList("b", "c", "d"), Arrays.asList(QueryDataTypeFamily.TIMESTAMP, QueryDataTypeFamily.INTEGER, QueryDataTypeFamily.VARCHAR), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(0L), 0, "zero")});
        sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '0.001' SECOND))", new Object[0]);
        sqlService.execute("CREATE VIEW s2 AS SELECT b FROM TABLE(IMPOSE_ORDER(TABLE stream2, DESCRIPTOR(b), INTERVAL '0.001' SECOND))", new Object[0]);
        String join = "(SELECT b, a FROM s1 JOIN s2 ON s2.b BETWEEN s1.a - INTERVAL '0.001' SECOND                                            AND     s1.a + INTERVAL '0.004' SECOND)";
        SqlStreamToStreamJoinTest.assertRowsEventuallyInAnyOrder("SELECT window_start, window_end, a FROM TABLE(TUMBLE(" + join + "  , DESCRIPTOR(a)  , INTERVAL '0.002' SECOND))", Collections.singletonList(new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestamp(0L), SqlStreamToStreamJoinTest.timestamp(2L), SqlStreamToStreamJoinTest.timestamp(0L))));
    }

    @Test
    public void test_relTreeHasUnion() {
        String stream = "stream1";
        TestStreamSqlConnector.create(sqlService, stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(0L)), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(1L)));
        String stream2 = "stream2";
        TestStreamSqlConnector.create(sqlService, stream2, Arrays.asList("b", "c", "d"), Arrays.asList(QueryDataTypeFamily.TIMESTAMP, QueryDataTypeFamily.INTEGER, QueryDataTypeFamily.VARCHAR), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(0L), 0, "zero"), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(1L), 1, "one"));
        String stream3 = "stream3";
        TestStreamSqlConnector.create(sqlService, stream3, Arrays.asList("b", "c", "d"), Arrays.asList(QueryDataTypeFamily.TIMESTAMP, QueryDataTypeFamily.INTEGER, QueryDataTypeFamily.VARCHAR), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(1L), 1, "one"), SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestamp(2L), 2, "two"));
        sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '0.001' SECOND))", new Object[0]);
        sqlService.execute("CREATE VIEW s2 AS SELECT b, d FROM TABLE(IMPOSE_ORDER(TABLE stream2, DESCRIPTOR(b), INTERVAL '0.001' SECOND)) UNION ALL SELECT b, d FROM TABLE(IMPOSE_ORDER(TABLE stream3, DESCRIPTOR(b), INTERVAL '0.001' SECOND)) ", new Object[0]);
        SqlStreamToStreamJoinTest.assertRowsEventuallyInAnyOrder("SELECT a, d FROM s1 JOIN s2 ON s2.b BETWEEN s1.a - INTERVAL '0.001' SECOND         AND     s1.a + INTERVAL '0.004' SECOND ", Arrays.asList(new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestamp(0L), "zero"), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestamp(0L), "one"), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestamp(0L), "one"), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestamp(0L), "two"), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestamp(1L), "zero"), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestamp(1L), "one"), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestamp(1L), "one"), new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestamp(1L), "two")));
    }

    @Test
    public void test_selfJoin() {
        TestStreamSqlConnector.create(sqlService, "stream1", Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP_WITH_TIME_ZONE), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(42L))});
        sqlService.execute("CREATE VIEW s AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '0' SECONDS))", new Object[0]);
        SqlStreamToStreamJoinTest.assertRowsEventuallyInAnyOrder("SELECT * FROM s s1 JOIN s s2 ON s1.a=s2.a", Collections.singletonList(new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestampTz(42L), SqlStreamToStreamJoinTest.timestampTz(42L))));
    }

    @Test
    public void test_joinWithoutViews() {
        TestStreamSqlConnector.create(sqlService, "stream1", Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP_WITH_TIME_ZONE), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(42L))});
        SqlStreamToStreamJoinTest.assertRowsEventuallyInAnyOrder("SELECT * FROM (SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '1' SECONDS))) s1 INNER JOIN (SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '1' SECONDS))) s2 ON s1.a=s2.a", Collections.singletonList(new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestampTz(42L), SqlStreamToStreamJoinTest.timestampTz(42L))));
    }

    @Test
    public void test_joinWithoutViewsAndSubqueries() {
        TestStreamSqlConnector.create(sqlService, "stream1", Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP_WITH_TIME_ZONE), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(42L))});
        SqlStreamToStreamJoinTest.assertRowsEventuallyInAnyOrder("SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '1' SECONDS)) s1 INNER JOIN TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '1' SECONDS)) s2 ON s1.a=s2.a", Collections.singletonList(new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestampTz(42L), SqlStreamToStreamJoinTest.timestampTz(42L))));
    }

    @Test
    public void test_joinWithUsingClause() {
        TestStreamSqlConnector.create(sqlService, "stream1", Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.TIMESTAMP_WITH_TIME_ZONE), new Object[][]{SqlStreamToStreamJoinTest.row(SqlStreamToStreamJoinTest.timestampTz(42L))});
        SqlStreamToStreamJoinTest.assertRowsEventuallyInAnyOrder("SELECT * FROM (SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '1' SECONDS))) s1 INNER JOIN (SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), INTERVAL '1' SECONDS))) s2  USING(a)", Collections.singletonList(new SqlTestSupport.Row(SqlStreamToStreamJoinTest.timestampTz(42L))));
    }

    @Test
    public void test_joinGenerators() {
        Assertions.assertThatThrownBy(() -> sqlService.execute("SELECT 1 from TABLE(GENERATE_STREAM(1)) JOIN TABLE(GENERATE_STREAM(3)) on 1=1;", new Object[0])).hasMessageContaining("For stream-to-stream join, both joined sides must have an order imposed");
    }

    @Test
    public void test_joinWithImplicitCastsInJoinCondition() {
        String stream = "stream1";
        TestStreamSqlConnector.create(sqlService, stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.INTEGER), new Object[][]{SqlStreamToStreamJoinTest.row(0)});
        String stream2 = "stream2";
        TestStreamSqlConnector.create(sqlService, stream2, Collections.singletonList("b"), Collections.singletonList(QueryDataTypeFamily.INTEGER), new Object[][]{SqlStreamToStreamJoinTest.row(0)});
        sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), 1))", new Object[0]);
        sqlService.execute("CREATE VIEW s2 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream2, DESCRIPTOR(b), 1))", new Object[0]);
        SqlStreamToStreamJoinTest.assertTipOfStream("SELECT * FROM s1 JOIN s2 ON s2.b BETWEEN s1.a AND s1.a + 1", Collections.singletonList(new SqlTestSupport.Row(0, 0)));
    }

    @Test
    public void test_joinWithUnsupportedCastsInJoinCondition() {
        String stream = "stream1";
        TestStreamSqlConnector.create(sqlService, stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.INTEGER), new Object[][]{SqlStreamToStreamJoinTest.row(0)});
        String stream2 = "stream2";
        TestStreamSqlConnector.create(sqlService, stream2, Collections.singletonList("b"), Collections.singletonList(QueryDataTypeFamily.INTEGER), new Object[][]{SqlStreamToStreamJoinTest.row(0)});
        sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream1, DESCRIPTOR(a), 1))", new Object[0]);
        sqlService.execute("CREATE VIEW s2 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE stream2, DESCRIPTOR(b), 1))", new Object[0]);
        Assertions.assertThatThrownBy(() -> sqlService.execute("SELECT * FROM s1 JOIN s2 ON s2.b BETWEEN s1.a AND CAST(CAST(s1.a as VARCHAR) AS TIMESTAMP) + INTERVAL '10' SECONDS", new Object[0])).hasMessageContaining("A stream-to-stream join must have a join condition constraining");
    }
}

