/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.jsonplan;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.types.Row;
import org.junit.Test;

public class IntervalJoinJsonPlanITCase
extends JsonPlanTestBase {
    @Test
    public void testProcessTimeInnerJoin() throws Exception {
        List<Row> rowT1 = Arrays.asList(Row.of((Object[])new Object[]{1, 1L, "Hi1"}), Row.of((Object[])new Object[]{1, 2L, "Hi2"}), Row.of((Object[])new Object[]{1, 5L, "Hi3"}), Row.of((Object[])new Object[]{2, 7L, "Hi5"}), Row.of((Object[])new Object[]{1, 9L, "Hi6"}), Row.of((Object[])new Object[]{1, 8L, "Hi8"}));
        List<Row> rowT2 = Arrays.asList(Row.of((Object[])new Object[]{1, 1L, "HiHi"}), Row.of((Object[])new Object[]{2, 2L, "HeHe"}));
        this.createTestValuesSourceTable("T1", rowT1, "a int", "b bigint", "c varchar", "proctime as PROCTIME()");
        this.createTestValuesSourceTable("T2", rowT2, "a int", "b bigint", "c varchar", "proctime as PROCTIME()");
        this.createTestValuesSinkTable("MySink", "a int", "c1 varchar", "c2 varchar");
        String jsonPlan = this.tableEnv.getJsonPlan("insert into MySink SELECT t2.a, t2.c, t1.c\nFROM T1 as t1 join T2 as t2 ON\n  t1.a = t2.a AND\n  t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND\n    t2.proctime + INTERVAL '5' SECOND");
        this.tableEnv.executeJsonPlan(jsonPlan).await();
        List<String> expected = Arrays.asList("+I[1, HiHi, Hi1]", "+I[1, HiHi, Hi2]", "+I[1, HiHi, Hi3]", "+I[1, HiHi, Hi6]", "+I[1, HiHi, Hi8]", "+I[2, HeHe, Hi5]");
        this.assertResult(expected, TestValuesTableFactory.getResults("MySink"));
    }

    @Test
    public void testRowTimeInnerJoin() throws Exception {
        List<Row> rowT1 = Arrays.asList(Row.of((Object[])new Object[]{1, 1L, "Hi1"}), Row.of((Object[])new Object[]{1, 2L, "Hi2"}), Row.of((Object[])new Object[]{1, 5L, "Hi3"}), Row.of((Object[])new Object[]{2, 7L, "Hi5"}), Row.of((Object[])new Object[]{1, 9L, "Hi6"}), Row.of((Object[])new Object[]{1, 8L, "Hi8"}));
        List<Row> rowT2 = Arrays.asList(Row.of((Object[])new Object[]{1, 1L, "HiHi"}), Row.of((Object[])new Object[]{2, 2L, "HeHe"}));
        this.createTestValuesSourceTable("T1", rowT1, "a int", "b bigint", "c varchar", "rowtime as TO_TIMESTAMP (FROM_UNIXTIME(b))", "watermark for rowtime as rowtime - INTERVAL '5' second");
        this.createTestValuesSourceTable("T2", rowT2, "a int", "b bigint", "c varchar", "rowtime as TO_TIMESTAMP (FROM_UNIXTIME(b))", "watermark for rowtime as rowtime - INTERVAL '5' second");
        this.createTestValuesSinkTable("MySink", "a int", "c1 varchar", "c2 varchar");
        String jsonPlan = this.tableEnv.getJsonPlan("insert into MySink \nSELECT t2.a, t2.c, t1.c\nFROM T1 as t1 join T2 as t2 ON\n  t1.a = t2.a AND\n  t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n    t2.rowtime + INTERVAL '6' SECOND");
        this.tableEnv.executeJsonPlan(jsonPlan).await();
        List<String> expected = Arrays.asList("+I[1, HiHi, Hi1]", "+I[1, HiHi, Hi2]", "+I[1, HiHi, Hi3]", "+I[2, HeHe, Hi5]");
        this.assertResult(expected, TestValuesTableFactory.getResults("MySink"));
    }
}

