/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Before;
import org.junit.Test;

public class IntervalJoinJsonPlanTest
extends TableTestBase {
    private StreamTableTestUtil util;
    private TableEnvironment tEnv;

    @Before
    public void setup() {
        this.util = this.streamTestUtil(TableConfig.getDefault());
        this.tEnv = this.util.getTableEnv();
        String srcTableA = "CREATE TABLE A (\n  a int,\n  b varchar,\n  c bigint,\n  proctime as PROCTIME(),\n  rowtime as TO_TIMESTAMP(FROM_UNIXTIME(c)),\n  watermark for rowtime as rowtime - INTERVAL '1' second \n) with (\n  'connector' = 'values',\n  'bounded' = 'false')";
        String srcTableB = "CREATE TABLE B (\n  a int,\n  b varchar,\n  c bigint,\n  proctime as PROCTIME(),\n  rowtime as TO_TIMESTAMP(FROM_UNIXTIME(c)),\n  watermark for rowtime as rowtime - INTERVAL '1' second \n) with (\n  'connector' = 'values',\n  'bounded' = 'false')";
        this.tEnv.executeSql(srcTableA);
        this.tEnv.executeSql(srcTableB);
    }

    @Test
    public void testProcessingTimeInnerJoinWithOnClause() {
        String sinkTableDdl = "CREATE TABLE MySink (\n  a int,\n  b varchar\n) with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("INSERT INTO MySink  SELECT t1.a, t2.b FROM A t1 JOIN B t2 ON\n    t1.a = t2.a AND \n    t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR");
    }

    @Test
    public void testRowTimeInnerJoinWithOnClause() {
        String sinkTableDdl = "CREATE TABLE MySink (\n  a int,\n  b varchar\n) with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("INSERT INTO MySink SELECT t1.a, t2.b FROM A t1 JOIN B t2 ON\n  t1.a = t2.a AND\n  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime + INTERVAL '1' HOUR");
    }
}

