/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.util.MutableByte;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.sql.impl.CalciteSqlOptimizerImpl;
import com.hazelcast.jet.sql.impl.connector.SqlConnectorCache;
import com.hazelcast.jet.sql.impl.connector.test.TestAbstractSqlConnector;
import com.hazelcast.jet.sql.impl.connector.test.TestStreamSqlConnector;
import com.hazelcast.jet.sql.impl.opt.OptUtils;
import com.hazelcast.jet.sql.impl.opt.OptimizerTestSupport;
import com.hazelcast.jet.sql.impl.opt.WatermarkKeysAssigner;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateAccumulateByKeyPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateCombineByKeyPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.CalcPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.DropLateItemsPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.FullScanPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.PhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.SlidingWindowAggregatePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.SlidingWindowPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.StreamToStreamJoinPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.UnionPhysicalRel;
import com.hazelcast.jet.sql.impl.schema.HazelcastTable;
import com.hazelcast.jet.sql.impl.schema.RelationsStorage;
import com.hazelcast.jet.sql.impl.schema.TableResolverImpl;
import com.hazelcast.shaded.org.apache.calcite.rel.RelNode;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.sql.impl.extract.QueryPath;
import com.hazelcast.sql.impl.schema.Table;
import com.hazelcast.sql.impl.type.QueryDataType;
import com.hazelcast.sql.impl.type.QueryDataTypeFamily;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.assertj.core.api.Assertions;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(value=HazelcastSerialClassRunner.class)
@Category(value={QuickTest.class, ParallelJVMTest.class})
public class WatermarkKeysAssignerTest
extends OptimizerTestSupport {
    @BeforeClass
    public static void beforeClass() {
        WatermarkKeysAssignerTest.initialize((int)1, null);
    }

    @Test
    public void when_scanAndDropArePresent_then_keyWasAssigned() {
        HazelcastTable table = WatermarkKeysAssignerTest.partitionedTable("map", Arrays.asList(WatermarkKeysAssignerTest.field(QueryPath.KEY, QueryDataType.INT), WatermarkKeysAssignerTest.field(QueryPath.VALUE, QueryDataType.INT)), 1L);
        List<QueryDataType> parameterTypes = Arrays.asList(QueryDataType.INT, QueryDataType.INT);
        String sql = "SELECT * FROM TABLE(IMPOSE_ORDER((SELECT __key, this FROM map), DESCRIPTOR(this), 1))";
        PhysicalRel optimizedPhysicalRel = this.optimizePhysical("SELECT * FROM TABLE(IMPOSE_ORDER((SELECT __key, this FROM map), DESCRIPTOR(this), 1))", parameterTypes, table).getPhysical();
        WatermarkKeysAssignerTest.assertPlan((RelNode)optimizedPhysicalRel, WatermarkKeysAssignerTest.plan(WatermarkKeysAssignerTest.planRow(0, DropLateItemsPhysicalRel.class), WatermarkKeysAssignerTest.planRow(1, FullScanPhysicalRel.class)));
        WatermarkKeysAssigner keysAssigner = new WatermarkKeysAssigner(optimizedPhysicalRel);
        keysAssigner.assignWatermarkKeys();
        Map map = keysAssigner.getWatermarkedFieldsKey((RelNode)optimizedPhysicalRel);
        Assertions.assertThat((Map)map).isNotNull();
        Assertions.assertThat((Map)map).isNotEmpty();
        Assertions.assertThat((Object)((MutableByte)map.get(1))).isNotNull();
        Assertions.assertThat((byte)((MutableByte)map.get(1)).getValue()).isEqualTo((byte)0);
        map = keysAssigner.getWatermarkedFieldsKey(optimizedPhysicalRel.getInput(0));
        Assertions.assertThat((Map)map).isNotNull();
        Assertions.assertThat((Map)map).isNotEmpty();
        Assertions.assertThat((Object)((MutableByte)map.get(1))).isNotNull();
        Assertions.assertThat((byte)((MutableByte)map.get(1)).getValue()).isEqualTo((byte)0);
    }

    @Test
    public void when_calcIsPresent_then_keyWasPropagated() {
        HazelcastTable table = WatermarkKeysAssignerTest.partitionedTable("map", Arrays.asList(WatermarkKeysAssignerTest.field(QueryPath.KEY, QueryDataType.INT), WatermarkKeysAssignerTest.field(QueryPath.VALUE, QueryDataType.INT)), 1L);
        List<QueryDataType> parameterTypes = Arrays.asList(QueryDataType.INT, QueryDataType.INT);
        String sql = "SELECT window_end FROM TABLE(HOP(  (SELECT * FROM TABLE(IMPOSE_ORDER((SELECT __key, this FROM map), DESCRIPTOR(__key), 1))), DESCRIPTOR(__key), 2, 1)) GROUP BY window_start, window_end, __key, this";
        PhysicalRel optimizedPhysicalRel = this.optimizePhysical("SELECT window_end FROM TABLE(HOP(  (SELECT * FROM TABLE(IMPOSE_ORDER((SELECT __key, this FROM map), DESCRIPTOR(__key), 1))), DESCRIPTOR(__key), 2, 1)) GROUP BY window_start, window_end, __key, this", parameterTypes, table).getPhysical();
        WatermarkKeysAssignerTest.assertPlan((RelNode)optimizedPhysicalRel, WatermarkKeysAssignerTest.plan(WatermarkKeysAssignerTest.planRow(0, CalcPhysicalRel.class), WatermarkKeysAssignerTest.planRow(1, AggregateCombineByKeyPhysicalRel.class), WatermarkKeysAssignerTest.planRow(2, AggregateAccumulateByKeyPhysicalRel.class), WatermarkKeysAssignerTest.planRow(3, CalcPhysicalRel.class), WatermarkKeysAssignerTest.planRow(4, SlidingWindowPhysicalRel.class), WatermarkKeysAssignerTest.planRow(5, FullScanPhysicalRel.class)));
        WatermarkKeysAssigner keysAssigner = new WatermarkKeysAssigner(optimizedPhysicalRel);
        keysAssigner.assignWatermarkKeys();
        Assertions.assertThat((Object)optimizedPhysicalRel).isInstanceOf(CalcPhysicalRel.class);
        Map map = keysAssigner.getWatermarkedFieldsKey((RelNode)optimizedPhysicalRel);
        Assertions.assertThat((Map)map).isNull();
        RelNode rel = optimizedPhysicalRel.getInput(0);
        while (!(rel instanceof CalcPhysicalRel)) {
            rel = rel.getInput(0);
        }
        Assertions.assertThat((Object)rel).isInstanceOf(CalcPhysicalRel.class);
        map = keysAssigner.getWatermarkedFieldsKey(rel);
        Assertions.assertThat((Map)map).isNotNull();
        Assertions.assertThat((int)map.size()).isEqualTo(1);
        Assertions.assertThat((byte)((MutableByte)map.get(2)).getValue()).isEqualTo((byte)0);
    }

    @Test
    public void when_unionIsPresent_then_keyWasPropagated() {
        NodeEngineImpl nodeEngine = Util.getNodeEngine((HazelcastInstance)WatermarkKeysAssignerTest.instance());
        TableResolverImpl resolver = new TableResolverImpl((NodeEngine)nodeEngine, new RelationsStorage((NodeEngine)nodeEngine), new SqlConnectorCache((NodeEngine)nodeEngine));
        TestStreamSqlConnector.create(WatermarkKeysAssignerTest.instance().getSql(), "s", Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.BIGINT), new Object[][]{WatermarkKeysAssignerTest.row(1L)});
        WatermarkKeysAssignerTest.assertInstanceOf(TestAbstractSqlConnector.TestTable.class, resolver.getTables().get(0));
        HazelcastTable table = WatermarkKeysAssignerTest.streamingTable((Table)resolver.getTables().get(0), 1L);
        String sql = "(SELECT * FROM TABLE(IMPOSE_ORDER((SELECT * FROM s), DESCRIPTOR(a), 1))) UNION ALL (SELECT * FROM TABLE(IMPOSE_ORDER((SELECT * FROM s), DESCRIPTOR(a), 1)))";
        PhysicalRel optPhysicalRel = this.optimizePhysical(sql, Collections.singletonList(QueryDataType.BIGINT), table).getPhysical();
        WatermarkKeysAssignerTest.assertPlan((RelNode)optPhysicalRel, WatermarkKeysAssignerTest.plan(WatermarkKeysAssignerTest.planRow(0, DropLateItemsPhysicalRel.class), WatermarkKeysAssignerTest.planRow(1, UnionPhysicalRel.class), WatermarkKeysAssignerTest.planRow(2, FullScanPhysicalRel.class), WatermarkKeysAssignerTest.planRow(2, FullScanPhysicalRel.class)));
        Assertions.assertThat((boolean)OptUtils.isUnbounded((RelNode)optPhysicalRel)).isTrue();
        PhysicalRel finalOptRel = CalciteSqlOptimizerImpl.postOptimizationRewrites((PhysicalRel)optPhysicalRel);
        WatermarkKeysAssigner keysAssigner = new WatermarkKeysAssigner(finalOptRel);
        keysAssigner.assignWatermarkKeys();
        Assertions.assertThat((Object)finalOptRel.getInput(0)).isInstanceOf(UnionPhysicalRel.class);
        UnionPhysicalRel unionRel = (UnionPhysicalRel)finalOptRel.getInput(0);
        Map map = keysAssigner.getWatermarkedFieldsKey((RelNode)unionRel);
        Assertions.assertThat((Map)map).isNotNull();
        Assertions.assertThat((Map)map).isNotEmpty();
        Assertions.assertThat((Object)((MutableByte)map.get(0))).isNotNull();
        Assertions.assertThat((byte)((MutableByte)map.get(0)).getValue()).isEqualTo((byte)0);
        Map leftInputKeys = keysAssigner.getWatermarkedFieldsKey(unionRel.getInput(0));
        Map rightInputKeys = keysAssigner.getWatermarkedFieldsKey(unionRel.getInput(1));
        Assertions.assertThat((byte)((MutableByte)leftInputKeys.values().iterator().next()).getValue()).isEqualTo((byte)0);
        Assertions.assertThat((byte)((MutableByte)rightInputKeys.values().iterator().next()).getValue()).isEqualTo((byte)0);
    }

    @Test
    public void when_streamToStreamJoinIsPresent_then_keyWasPropagated() {
        NodeEngineImpl nodeEngine = Util.getNodeEngine((HazelcastInstance)WatermarkKeysAssignerTest.instance());
        TableResolverImpl resolver = new TableResolverImpl((NodeEngine)nodeEngine, new RelationsStorage((NodeEngine)nodeEngine), new SqlConnectorCache((NodeEngine)nodeEngine));
        String stream = "s";
        TestStreamSqlConnector.create(WatermarkKeysAssignerTest.instance().getSql(), stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.BIGINT), new Object[][]{WatermarkKeysAssignerTest.row(1L)});
        HazelcastTable table = WatermarkKeysAssignerTest.streamingTable((Table)resolver.getTables().get(0), 1L);
        String sql = "SELECT * FROM (SELECT * FROM TABLE(IMPOSE_ORDER(TABLE s, DESCRIPTOR(a), 1))) s1  INNER JOIN (SELECT * FROM TABLE(IMPOSE_ORDER(TABLE s, DESCRIPTOR(a), 1))) s2  ON s1.a = s2.a";
        PhysicalRel optPhysicalRel = this.optimizePhysical(sql, Collections.singletonList(QueryDataType.BIGINT), table).getPhysical();
        WatermarkKeysAssignerTest.assertPlan((RelNode)optPhysicalRel, WatermarkKeysAssignerTest.plan(WatermarkKeysAssignerTest.planRow(0, StreamToStreamJoinPhysicalRel.class), WatermarkKeysAssignerTest.planRow(1, FullScanPhysicalRel.class), WatermarkKeysAssignerTest.planRow(1, FullScanPhysicalRel.class)));
        Assertions.assertThat((boolean)OptUtils.isUnbounded((RelNode)optPhysicalRel)).isTrue();
        PhysicalRel finalOptRel = CalciteSqlOptimizerImpl.postOptimizationRewrites((PhysicalRel)optPhysicalRel);
        WatermarkKeysAssigner keysAssigner = new WatermarkKeysAssigner(finalOptRel);
        keysAssigner.assignWatermarkKeys();
        Assertions.assertThat((Object)finalOptRel).isInstanceOf(StreamToStreamJoinPhysicalRel.class);
        Map map = keysAssigner.getWatermarkedFieldsKey((RelNode)finalOptRel);
        Assertions.assertThat((Map)map).isNotNull();
        Assertions.assertThat((Map)map).isNotEmpty();
        Assertions.assertThat((Object)((MutableByte)map.get(0))).isNotNull();
        Assertions.assertThat((byte)((MutableByte)map.get(0)).getValue()).isEqualTo((byte)0);
    }

    @Test
    public void when_s2sJoinWithWindowAggregationIsPresent_then_keyWasPropagated() {
        NodeEngineImpl nodeEngine = Util.getNodeEngine((HazelcastInstance)WatermarkKeysAssignerTest.instance());
        TableResolverImpl resolver = new TableResolverImpl((NodeEngine)nodeEngine, new RelationsStorage((NodeEngine)nodeEngine), new SqlConnectorCache((NodeEngine)nodeEngine));
        String stream = "s";
        TestStreamSqlConnector.create(WatermarkKeysAssignerTest.instance().getSql(), stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.BIGINT), new Object[][]{WatermarkKeysAssignerTest.row(1L)});
        HazelcastTable table = WatermarkKeysAssignerTest.streamingTable((Table)resolver.getTables().get(0), 1L);
        String sql = "SELECT * FROM ( SELECT window_end, AVG(a) AS price FROM     TABLE(HOP((SELECT * FROM TABLE(IMPOSE_ORDER(TABLE s, DESCRIPTOR(a), 1))), DESCRIPTOR(a), 4, 1))    GROUP BY window_end, a) s1  RIGHT JOIN ( SELECT window_end, AVG(a) AS price FROM     TABLE(HOP((SELECT * FROM TABLE(IMPOSE_ORDER(TABLE s, DESCRIPTOR(a), 1))), DESCRIPTOR(a), 4, 1))    GROUP BY window_end, a) s2 ON s1.window_end = s2.window_end";
        PhysicalRel optPhysicalRel = this.optimizePhysical(sql, Collections.singletonList(QueryDataType.BIGINT), table).getPhysical();
        WatermarkKeysAssignerTest.assertPlan((RelNode)optPhysicalRel, WatermarkKeysAssignerTest.plan(WatermarkKeysAssignerTest.planRow(0, CalcPhysicalRel.class), WatermarkKeysAssignerTest.planRow(1, StreamToStreamJoinPhysicalRel.class), WatermarkKeysAssignerTest.planRow(2, SlidingWindowAggregatePhysicalRel.class), WatermarkKeysAssignerTest.planRow(3, CalcPhysicalRel.class), WatermarkKeysAssignerTest.planRow(4, FullScanPhysicalRel.class), WatermarkKeysAssignerTest.planRow(2, SlidingWindowAggregatePhysicalRel.class), WatermarkKeysAssignerTest.planRow(3, CalcPhysicalRel.class), WatermarkKeysAssignerTest.planRow(4, FullScanPhysicalRel.class)));
        Assertions.assertThat((boolean)OptUtils.isUnbounded((RelNode)optPhysicalRel)).isTrue();
        PhysicalRel finalOptRel = CalciteSqlOptimizerImpl.postOptimizationRewrites((PhysicalRel)optPhysicalRel);
        WatermarkKeysAssigner keysAssigner = new WatermarkKeysAssigner(finalOptRel);
        keysAssigner.assignWatermarkKeys();
        Map map = keysAssigner.getWatermarkedFieldsKey((RelNode)finalOptRel);
        Assertions.assertThat((Map)map).isNotNull();
        Assertions.assertThat((Map)map).isNotEmpty();
        Assertions.assertThat((int)map.size()).isEqualTo(2);
        Assertions.assertThat((Object)((MutableByte)map.get(0))).isNotNull();
        Assertions.assertThat((byte)((MutableByte)map.get(0)).getValue()).isEqualTo((byte)0);
        Assertions.assertThat((Object)((MutableByte)map.get(2))).isNotNull();
        Assertions.assertThat((byte)((MutableByte)map.get(2)).getValue()).isEqualTo((byte)1);
        SlidingWindowAggregatePhysicalRel leftAgg = (SlidingWindowAggregatePhysicalRel)finalOptRel.getInput(0).getInput(0);
        SlidingWindowAggregatePhysicalRel rightAgg = (SlidingWindowAggregatePhysicalRel)finalOptRel.getInput(0).getInput(1);
        Map leftAggMap = keysAssigner.getWatermarkedFieldsKey((RelNode)leftAgg);
        Map rightAggMap = keysAssigner.getWatermarkedFieldsKey((RelNode)rightAgg);
        Assertions.assertThat((Map)leftAggMap).isNotNull();
        Assertions.assertThat((Map)rightAggMap).isNotNull();
        Assertions.assertThat((int)leftAggMap.size()).isOne();
        Assertions.assertThat((int)rightAggMap.size()).isOne();
        Assertions.assertThat((byte)((MutableByte)leftAggMap.get(0)).getValue()).isEqualTo((byte)0);
        Assertions.assertThat((byte)((MutableByte)rightAggMap.get(0)).getValue()).isEqualTo((byte)1);
    }

    @Test
    public void when_slidingWindowAggregationIsPresent_andWindowEndProjects_then_keyWasPropagated() {
        NodeEngineImpl nodeEngine = Util.getNodeEngine((HazelcastInstance)WatermarkKeysAssignerTest.instance());
        TableResolverImpl resolver = new TableResolverImpl((NodeEngine)nodeEngine, new RelationsStorage((NodeEngine)nodeEngine), new SqlConnectorCache((NodeEngine)nodeEngine));
        String stream = "s";
        TestStreamSqlConnector.create(WatermarkKeysAssignerTest.instance().getSql(), stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.BIGINT), new Object[][]{WatermarkKeysAssignerTest.row(1L)});
        HazelcastTable table = WatermarkKeysAssignerTest.streamingTable((Table)resolver.getTables().get(0), 1L);
        String sql = "SELECT window_end, AVG(a) AS price FROM      TABLE(HOP((SELECT * FROM TABLE(IMPOSE_ORDER(TABLE s, DESCRIPTOR(a), 1))), DESCRIPTOR(a), 4, 1))     GROUP BY window_end";
        PhysicalRel optPhysicalRel = this.optimizePhysical(sql, Collections.singletonList(QueryDataType.BIGINT), table).getPhysical();
        WatermarkKeysAssignerTest.assertPlan((RelNode)optPhysicalRel, WatermarkKeysAssignerTest.plan(WatermarkKeysAssignerTest.planRow(0, SlidingWindowAggregatePhysicalRel.class), WatermarkKeysAssignerTest.planRow(1, CalcPhysicalRel.class), WatermarkKeysAssignerTest.planRow(2, FullScanPhysicalRel.class)));
        Assertions.assertThat((boolean)OptUtils.isUnbounded((RelNode)optPhysicalRel)).isTrue();
        PhysicalRel finalOptRel = CalciteSqlOptimizerImpl.postOptimizationRewrites((PhysicalRel)optPhysicalRel);
        WatermarkKeysAssigner keysAssigner = new WatermarkKeysAssigner(finalOptRel);
        keysAssigner.assignWatermarkKeys();
        SlidingWindowAggregatePhysicalRel swaRel = (SlidingWindowAggregatePhysicalRel)finalOptRel;
        Map rootKeyMap = keysAssigner.getWatermarkedFieldsKey((RelNode)finalOptRel);
        Assertions.assertThat((Map)rootKeyMap).isNotEmpty();
        Assertions.assertThat((byte)((MutableByte)rootKeyMap.get(swaRel.watermarkedFields().findFirst())).getValue()).isEqualTo((byte)0);
    }

    @Test
    public void when_slidingWindowAggregationIsPresent_andWindowEndDoesNotProject_then_keyWasNotPropagated() {
        NodeEngineImpl nodeEngine = Util.getNodeEngine((HazelcastInstance)WatermarkKeysAssignerTest.instance());
        TableResolverImpl resolver = new TableResolverImpl((NodeEngine)nodeEngine, new RelationsStorage((NodeEngine)nodeEngine), new SqlConnectorCache((NodeEngine)nodeEngine));
        String stream = "s";
        TestStreamSqlConnector.create(WatermarkKeysAssignerTest.instance().getSql(), stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.BIGINT), new Object[][]{WatermarkKeysAssignerTest.row(1L)});
        HazelcastTable table = WatermarkKeysAssignerTest.streamingTable((Table)resolver.getTables().get(0), 1L);
        String sql = "SELECT window_start, AVG(a) AS price FROM      TABLE(HOP((SELECT * FROM TABLE(IMPOSE_ORDER(TABLE s, DESCRIPTOR(a), 1))), DESCRIPTOR(a), 4, 1))     GROUP BY window_start";
        PhysicalRel optPhysicalRel = this.optimizePhysical(sql, Collections.singletonList(QueryDataType.BIGINT), table).getPhysical();
        WatermarkKeysAssignerTest.assertPlan((RelNode)optPhysicalRel, WatermarkKeysAssignerTest.plan(WatermarkKeysAssignerTest.planRow(0, SlidingWindowAggregatePhysicalRel.class), WatermarkKeysAssignerTest.planRow(1, CalcPhysicalRel.class), WatermarkKeysAssignerTest.planRow(2, FullScanPhysicalRel.class)));
        Assertions.assertThat((boolean)OptUtils.isUnbounded((RelNode)optPhysicalRel)).isTrue();
        PhysicalRel finalOptRel = CalciteSqlOptimizerImpl.postOptimizationRewrites((PhysicalRel)optPhysicalRel);
        WatermarkKeysAssigner keysAssigner = new WatermarkKeysAssigner(finalOptRel);
        keysAssigner.assignWatermarkKeys();
        Assertions.assertThat((Map)keysAssigner.getWatermarkedFieldsKey((RelNode)finalOptRel)).isEmpty();
    }

    @Test
    public void when_slidingWindowAggregationsAreUnionized_andWindowEndDoesNotProject_then_keyWasNotPropagated() {
        NodeEngineImpl nodeEngine = Util.getNodeEngine((HazelcastInstance)WatermarkKeysAssignerTest.instance());
        TableResolverImpl resolver = new TableResolverImpl((NodeEngine)nodeEngine, new RelationsStorage((NodeEngine)nodeEngine), new SqlConnectorCache((NodeEngine)nodeEngine));
        String stream = "s";
        TestStreamSqlConnector.create(WatermarkKeysAssignerTest.instance().getSql(), stream, Collections.singletonList("a"), Collections.singletonList(QueryDataTypeFamily.BIGINT), new Object[][]{WatermarkKeysAssignerTest.row(1L)});
        HazelcastTable table = WatermarkKeysAssignerTest.streamingTable((Table)resolver.getTables().get(0), 1L);
        String sql = "(SELECT window_start, AVG(a) AS price FROM      TABLE(HOP((SELECT * FROM TABLE(IMPOSE_ORDER(TABLE s, DESCRIPTOR(a), 1))), DESCRIPTOR(a), 4, 1))     GROUP BY window_start) UNION ALL (SELECT window_start, AVG(a) AS price FROM      TABLE(HOP((SELECT * FROM TABLE(IMPOSE_ORDER(TABLE s, DESCRIPTOR(a), 1))), DESCRIPTOR(a), 4, 1))     GROUP BY window_start) UNION ALL (SELECT window_start, AVG(a) AS price FROM      TABLE(HOP((SELECT * FROM TABLE(IMPOSE_ORDER(TABLE s, DESCRIPTOR(a), 1))), DESCRIPTOR(a), 4, 1))     GROUP BY window_start)";
        PhysicalRel optPhysicalRel = this.optimizePhysical(sql, Collections.singletonList(QueryDataType.BIGINT), table).getPhysical();
        WatermarkKeysAssignerTest.assertPlan((RelNode)optPhysicalRel, WatermarkKeysAssignerTest.plan(WatermarkKeysAssignerTest.planRow(0, UnionPhysicalRel.class), WatermarkKeysAssignerTest.planRow(1, SlidingWindowAggregatePhysicalRel.class), WatermarkKeysAssignerTest.planRow(2, CalcPhysicalRel.class), WatermarkKeysAssignerTest.planRow(3, FullScanPhysicalRel.class), WatermarkKeysAssignerTest.planRow(1, SlidingWindowAggregatePhysicalRel.class), WatermarkKeysAssignerTest.planRow(2, CalcPhysicalRel.class), WatermarkKeysAssignerTest.planRow(3, FullScanPhysicalRel.class), WatermarkKeysAssignerTest.planRow(1, SlidingWindowAggregatePhysicalRel.class), WatermarkKeysAssignerTest.planRow(2, CalcPhysicalRel.class), WatermarkKeysAssignerTest.planRow(3, FullScanPhysicalRel.class)));
        Assertions.assertThat((boolean)OptUtils.isUnbounded((RelNode)optPhysicalRel)).isTrue();
        PhysicalRel finalOptRel = CalciteSqlOptimizerImpl.postOptimizationRewrites((PhysicalRel)optPhysicalRel);
        WatermarkKeysAssigner keysAssigner = new WatermarkKeysAssigner(finalOptRel);
        keysAssigner.assignWatermarkKeys();
        Assertions.assertThat((Map)keysAssigner.getWatermarkedFieldsKey((RelNode)finalOptRel)).isEmpty();
    }

    @Test
    public void when_slidingWindowIsPresent_then_inputWatermarkedFieldWatermarked() {
        HazelcastTable table = WatermarkKeysAssignerTest.partitionedTable("map", Arrays.asList(WatermarkKeysAssignerTest.field(QueryPath.KEY, QueryDataType.INT), WatermarkKeysAssignerTest.field(QueryPath.VALUE, QueryDataType.INT)), 1L);
        List<QueryDataType> parameterTypes = Arrays.asList(QueryDataType.INT, QueryDataType.INT);
        String sql = "SELECT window_start, window_end FROM TABLE(HOP(  (SELECT * FROM TABLE(IMPOSE_ORDER((SELECT __key, this FROM map), DESCRIPTOR(__key), 1))), DESCRIPTOR(__key), 2, 1)) GROUP BY window_start, window_end, __key, this";
        PhysicalRel optimizedPhysicalRel = this.optimizePhysical("SELECT window_start, window_end FROM TABLE(HOP(  (SELECT * FROM TABLE(IMPOSE_ORDER((SELECT __key, this FROM map), DESCRIPTOR(__key), 1))), DESCRIPTOR(__key), 2, 1)) GROUP BY window_start, window_end, __key, this", parameterTypes, table).getPhysical();
        WatermarkKeysAssignerTest.assertPlan((RelNode)optimizedPhysicalRel, WatermarkKeysAssignerTest.plan(WatermarkKeysAssignerTest.planRow(0, CalcPhysicalRel.class), WatermarkKeysAssignerTest.planRow(1, AggregateCombineByKeyPhysicalRel.class), WatermarkKeysAssignerTest.planRow(2, AggregateAccumulateByKeyPhysicalRel.class), WatermarkKeysAssignerTest.planRow(3, CalcPhysicalRel.class), WatermarkKeysAssignerTest.planRow(4, SlidingWindowPhysicalRel.class), WatermarkKeysAssignerTest.planRow(5, FullScanPhysicalRel.class)));
        WatermarkKeysAssigner keysAssigner = new WatermarkKeysAssigner(optimizedPhysicalRel);
        keysAssigner.assignWatermarkKeys();
        PhysicalRel sw = optimizedPhysicalRel;
        while (!(sw instanceof SlidingWindowPhysicalRel)) {
            sw = (PhysicalRel)sw.getInput(0);
        }
        Assertions.assertThat((Object)sw).isInstanceOf(SlidingWindowPhysicalRel.class);
        Map map = keysAssigner.getWatermarkedFieldsKey((RelNode)sw);
        Assertions.assertThat((Map)map).isNotNull();
        Assertions.assertThat((int)map.size()).isEqualTo(1);
        Assertions.assertThat((byte)((MutableByte)map.get(0)).getValue()).isEqualTo((byte)0);
    }

    @Test
    public void when_upperRelsDoNotSupportWatermarks_then_justPartOfTreeIsWatermarked() {
        HazelcastTable table = WatermarkKeysAssignerTest.partitionedTable("map", Arrays.asList(WatermarkKeysAssignerTest.field(QueryPath.KEY, QueryDataType.INT), WatermarkKeysAssignerTest.field(QueryPath.VALUE, QueryDataType.INT)), 1L);
        List<QueryDataType> parameterTypes = Arrays.asList(QueryDataType.INT, QueryDataType.INT);
        String sql = "SELECT window_start, window_end FROM TABLE(HOP(  (SELECT * FROM TABLE(IMPOSE_ORDER((SELECT __key, this FROM map), DESCRIPTOR(__key), 1))), DESCRIPTOR(__key), 2, 1)) GROUP BY window_start, window_end, __key, this";
        PhysicalRel optimizedPhysicalRel = this.optimizePhysical("SELECT window_start, window_end FROM TABLE(HOP(  (SELECT * FROM TABLE(IMPOSE_ORDER((SELECT __key, this FROM map), DESCRIPTOR(__key), 1))), DESCRIPTOR(__key), 2, 1)) GROUP BY window_start, window_end, __key, this", parameterTypes, table).getPhysical();
        WatermarkKeysAssignerTest.assertPlan((RelNode)optimizedPhysicalRel, WatermarkKeysAssignerTest.plan(WatermarkKeysAssignerTest.planRow(0, CalcPhysicalRel.class), WatermarkKeysAssignerTest.planRow(1, AggregateCombineByKeyPhysicalRel.class), WatermarkKeysAssignerTest.planRow(2, AggregateAccumulateByKeyPhysicalRel.class), WatermarkKeysAssignerTest.planRow(3, CalcPhysicalRel.class), WatermarkKeysAssignerTest.planRow(4, SlidingWindowPhysicalRel.class), WatermarkKeysAssignerTest.planRow(5, FullScanPhysicalRel.class)));
        WatermarkKeysAssigner keysAssigner = new WatermarkKeysAssigner(optimizedPhysicalRel);
        keysAssigner.assignWatermarkKeys();
        Map map = keysAssigner.getWatermarkedFieldsKey((RelNode)optimizedPhysicalRel);
        Assertions.assertThat((Map)map).isNull();
        PhysicalRel aggAccumRel = optimizedPhysicalRel;
        while (!(aggAccumRel instanceof AggregateAccumulateByKeyPhysicalRel)) {
            aggAccumRel = (PhysicalRel)aggAccumRel.getInput(0);
        }
        Assertions.assertThat((Object)aggAccumRel).isInstanceOf(AggregateAccumulateByKeyPhysicalRel.class);
        map = keysAssigner.getWatermarkedFieldsKey((RelNode)aggAccumRel);
        Assertions.assertThat((Map)map).isNull();
        PhysicalRel nextRelContainsWm = (PhysicalRel)((AggregateAccumulateByKeyPhysicalRel)aggAccumRel).getInput();
        Assertions.assertThat((Object)nextRelContainsWm).isInstanceOf(CalcPhysicalRel.class);
        map = keysAssigner.getWatermarkedFieldsKey((RelNode)nextRelContainsWm);
        Assertions.assertThat((Map)map).isNotNull();
        Assertions.assertThat((Map)map).isNotEmpty();
        Assertions.assertThat((Object)((MutableByte)map.get(2))).isNotNull();
        Assertions.assertThat((byte)((MutableByte)map.get(2)).getValue()).isEqualTo((byte)0);
    }
}

