/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.harness;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.harness.HarnessTestBase;
import org.apache.flink.table.planner.runtime.harness.WindowAggregateHarnessTest$;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@RunWith(value=Parameterized.class)
@ScalaSignature(bytes="\u0006\u0001\u0005\rg\u0001B\u0001\u0003\u0001E\u0011!dV5oI><\u0018iZ4sK\u001e\fG/\u001a%be:,7o\u001d+fgRT!a\u0001\u0003\u0002\u000f!\f'O\\3tg*\u0011QAB\u0001\beVtG/[7f\u0015\t9\u0001\"A\u0004qY\u0006tg.\u001a:\u000b\u0005%Q\u0011!\u0002;bE2,'BA\u0006\r\u0003\u00151G.\u001b8l\u0015\tia\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u001f\u0005\u0019qN]4\u0004\u0001M\u0011\u0001A\u0005\t\u0003'Qi\u0011AA\u0005\u0003+\t\u0011q\u0002S1s]\u0016\u001c8\u000fV3ti\n\u000b7/\u001a\u0005\t/\u0001\u0011\t\u0011)A\u00051\u00059!-Y2lK:$\u0007CA\r0\u001d\tQBF\u0004\u0002\u001cU9\u0011A$\u000b\b\u0003;!r!AH\u0014\u000f\u0005}1cB\u0001\u0011&\u001d\t\tC%D\u0001#\u0015\t\u0019\u0003#\u0001\u0004=e>|GOP\u0005\u0002\u001f%\u0011QBD\u0005\u0003\u00171I!!\u0003\u0006\n\u0005\u001dA\u0011BA\u0003\u0007\u0013\tYC!A\u0003vi&d7/\u0003\u0002.]\u0005Q2\u000b\u001e:fC6LgnZ,ji\"\u001cF/\u0019;f)\u0016\u001cHOQ1tK*\u00111\u0006B\u0005\u0003aE\u0012\u0001c\u0015;bi\u0016\u0014\u0015mY6f]\u0012lu\u000eZ3\u000b\u00055r\u0003\u0002C\u001a\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001b\u0002\u001bMD\u0017N\u001a;US6,'l\u001c8f!\t)$(D\u00017\u0015\t9\u0004(\u0001\u0003uS6,'\"A\u001d\u0002\t)\fg/Y\u0005\u0003wY\u0012aAW8oK&#\u0007\"B\u001f\u0001\t\u0003q\u0014A\u0002\u001fj]&$h\bF\u0002@\u0001\u0006\u0003\"a\u0005\u0001\t\u000b]a\u0004\u0019\u0001\r\t\u000bMb\u0004\u0019\u0001\u001b\t\u000f\r\u0003!\u0019!C\u0005\t\u0006YQ\u000bV\"`5>sUiX%E+\u0005!\u0004B\u0002$\u0001A\u0003%A'\u0001\u0007V)\u000e{&l\u0014(F?&#\u0005\u0005C\u0003I\u0001\u0011\u0005\u0013*\u0001\u0004cK\u001a|'/\u001a\u000b\u0002\u0015B\u00111JT\u0007\u0002\u0019*\tQ*A\u0003tG\u0006d\u0017-\u0003\u0002P\u0019\n!QK\\5uQ\t9\u0015\u000b\u0005\u0002S+6\t1K\u0003\u0002U\u001d\u0005)!.\u001e8ji&\u0011ak\u0015\u0002\u0007\u0005\u00164wN]3\t\u000ba\u0003A\u0011A%\u0002=Q,7\u000f\u001e)s_\u000e,7o]5oORKW.\u001a+v[\ndWmV5oI><\bFA,[!\t\u00116,\u0003\u0002]'\n!A+Z:u\u0011\u0015q\u0006\u0001\"\u0003`\u0003!\u001a'/Z1uKB\u0013xnY3tg&tw\rV5nKR+XN\u00197f/&tGm\\<Pa\u0016\u0014\u0018\r^8s)\u0005\u0001\u0007\u0003B&bGFL!A\u0019'\u0003\rQ+\b\u000f\\33!\u0015!\u0017n[6l\u001b\u0005)'B\u00014h\u0003\u0011)H/\u001b7\u000b\u0005!T\u0011!C:ue\u0016\fW.\u001b8h\u0013\tQWM\u0001\u0014LKf,Gm\u00148f\u0013:\u0004X\u000f^*ue\u0016\fWn\u00149fe\u0006$xN\u001d+fgRD\u0015M\u001d8fgN\u0004\"\u0001\\8\u000e\u00035T!A\u001c\u0005\u0002\t\u0011\fG/Y\u0005\u0003a6\u0014qAU8x\t\u0006$\u0018\rE\u0002LeRL!a\u001d'\u0003\u000b\u0005\u0013(/Y=\u0011\u0005UTX\"\u0001<\u000b\u0005]D\u0018a\u00027pO&\u001c\u0017\r\u001c\u0006\u0003s\"\tQ\u0001^=qKNL!a\u001f<\u0003\u00171{w-[2bYRK\b/\u001a\u0005\u0006{\u0002!\t!S\u0001\u001ci\u0016\u001cH\u000f\u0015:pG\u0016\u001c8/\u001b8h)&lW\rS8q/&tGm\\<)\u0005qT\u0006BBA\u0001\u0001\u0011\u0005\u0011*\u0001\u0011uKN$\bK]8dKN\u001c\u0018N\\4US6,7)^7vY\u0006$XmV5oI><\bFA@[\u0011\u0019\t9\u0001\u0001C\u0001\u0013\u0006!B/Z:u\u00072|7/Z,ji\"|W\u000f^(qK:D3!!\u0002[\u0011\u0019\ti\u0001\u0001C\u0001\u0013\u0006YC/Z:u)^|\u0007\u000b[1tK^Kg\u000eZ8x\u0003\u001e<'/Z4bi\u0016\u001cEn\\:f/&$\bn\\;u\u001fB,g\u000eK\u0002\u0002\fiCq!a\u0005\u0001\t\u0013\t)\"\u0001\u0006j]\u001e,7\u000f\u001e#bi\u0006$2ASA\f\u0011\u001d\tI\"!\u0005A\u0002\r\f1\u0002^3ti\"\u000b'O\\3tg\"9\u0011Q\u0004\u0001\u0005\n\u0005}\u0011A\u0002:fG>\u0014H\r\u0006\u0003\u0002\"\u0005=\u0002#BA\u0012\u0003WYWBAA\u0013\u0015\u0011\t9#!\u000b\u0002\u0019M$(/Z1ne\u0016\u001cwN\u001d3\u000b\u0005\u00159\u0017\u0002BA\u0017\u0003K\u0011Ab\u0015;sK\u0006l'+Z2pe\u0012D\u0001\"!\r\u0002\u001c\u0001\u0007\u00111G\u0001\u0005CJ<7\u000fE\u0003L\u0003k\tI$C\u0002\u000281\u0013!\u0002\u0010:fa\u0016\fG/\u001a3?!\rY\u00151H\u0005\u0004\u0003{a%aA!os\"9\u0011\u0011\t\u0001\u0005\n\u0005\r\u0013A\u00037pG\u0006dW*\u001b7mgR!\u0011QIA&!\ra\u0017qI\u0005\u0004\u0003\u0013j'!\u0004+j[\u0016\u001cH/Y7q\t\u0006$\u0018\r\u0003\u0005\u0002N\u0005}\u0002\u0019AA(\u0003!!\u0017\r^3US6,\u0007\u0003BA)\u0003/r1aSA*\u0013\r\t)\u0006T\u0001\u0007!J,G-\u001a4\n\t\u0005e\u00131\f\u0002\u0007'R\u0014\u0018N\\4\u000b\u0007\u0005UC\nK\u0004\u0001\u0003?\nY'!\u001c\u0011\t\u0005\u0005\u0014qM\u0007\u0003\u0003GR1!!\u001aT\u0003\u0019\u0011XO\u001c8fe&!\u0011\u0011NA2\u0005\u001d\u0011VO\\,ji\"\fQA^1mk\u0016\u001c#!a\u001c\u0011\t\u0005E\u0014qO\u0007\u0003\u0003gR1!!\u001eT\u0003\u001d\u0011XO\u001c8feNLA!!\u001f\u0002t\ti\u0001+\u0019:b[\u0016$XM]5{K\u0012<q!! \u0003\u0011\u0003\ty(\u0001\u000eXS:$wn^!hOJ,w-\u0019;f\u0011\u0006\u0014h.Z:t)\u0016\u001cH\u000fE\u0002\u0014\u0003\u00033a!\u0001\u0002\t\u0002\u0005\r5\u0003BAA\u0003\u000b\u00032aSAD\u0013\r\tI\t\u0014\u0002\u0007\u0003:L(+\u001a4\t\u000fu\n\t\t\"\u0001\u0002\u000eR\u0011\u0011q\u0010\u0005\t\u0003#\u000b\t\t\"\u0001\u0002\u0014\u0006Q\u0001/\u0019:b[\u0016$XM]:\u0015\u0005\u0005U\u0005CBAL\u00037\u000by*\u0004\u0002\u0002\u001a*\u0011a\rO\u0005\u0005\u0003;\u000bIJ\u0001\u0006D_2dWm\u0019;j_:\u0004Ba\u0013:\u0002\"B!\u00111UAU\u001b\t\t)KC\u0002\u0002(b\nA\u0001\\1oO&!\u00111VAS\u0005\u0019y%M[3di\"B\u0011qRAX\u0003{\u000by\f\u0005\u0003\u00022\u0006]f\u0002BA9\u0003gKA!!.\u0002t\u0005i\u0001+\u0019:b[\u0016$XM]5{K\u0012LA!!/\u0002<\nQ\u0001+\u0019:b[\u0016$XM]:\u000b\t\u0005U\u00161O\u0001\u0005]\u0006lW-\t\u0002\u0002B\u0006q2\u000b^1uK\n\u000b7m[3oIvZ\b' \u0017!)&lWMW8oKvZ\u0018' ")
public class WindowAggregateHarnessTest
extends HarnessTestBase {
    private final ZoneId shiftTimeZone;
    private final ZoneId UTC_ZONE_ID;

    @Parameterized.Parameters(name="StateBackend={0}, TimeZone={1}")
    public static Collection<Object[]> parameters() {
        return WindowAggregateHarnessTest$.MODULE$.parameters();
    }

    private ZoneId UTC_ZONE_ID() {
        return this.UTC_ZONE_ID;
    }

    @Override
    @Before
    public void before() {
        super.before();
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp());
        this.tEnv().getConfig().setLocalTimeZone(this.shiftTimeZone);
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |CREATE TABLE T1 (\n         | `ts` STRING,\n         | `int` INT,\n         | `double` DOUBLE,\n         | `float` FLOAT,\n         | `bigdec` DECIMAL(10, 2),\n         | `string` STRING,\n         | `name` STRING,\n         | proctime AS PROCTIME()\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '", "'\n         |)\n         |"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{dataId})))).stripMargin());
    }

    @Test
    public void testProcessingTimeTumbleWindow() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> tuple2 = this.createProcessingTimeTumbleWindowOperator();
        if (tuple2 != null) {
            Tuple2 tuple22;
            KeyedOneInputStreamOperatorTestHarness testHarness = (KeyedOneInputStreamOperatorTestHarness)tuple2._1();
            LogicalType[] outputTypes = (LogicalType[])tuple2._2();
            Tuple2 tuple23 = tuple22 = new Tuple2((Object)testHarness, (Object)outputTypes);
            KeyedOneInputStreamOperatorTestHarness testHarness2 = (KeyedOneInputStreamOperatorTestHarness)tuple23._1();
            LogicalType[] outputTypes2 = (LogicalType[])tuple23._2();
            RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputTypes2);
            testHarness2.open();
            this.ingestData((KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>)testHarness2);
            ConcurrentLinkedQueue<StreamRecord<RowData>> expected = new ConcurrentLinkedQueue<StreamRecord<RowData>>();
            expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)4L), BoxesRunTime.boxToDouble((double)5.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:05")})));
            expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)1L), null, BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:05"), this.localMills("1970-01-01T00:00:10")})));
            expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)6.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:05"), this.localMills("1970-01-01T00:00:10")})));
            expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:15"), this.localMills("1970-01-01T00:00:20")})));
            expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)3.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:35")})));
            expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)7.0), BoxesRunTime.boxToLong((long)0L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:35")})));
            assertor.assertOutputEqualsSorted("result mismatch", expected, (Collection)testHarness2.getOutput());
            testHarness2.close();
            return;
        }
        throw new MatchError(tuple2);
    }

    private Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> createProcessingTimeTumbleWindowOperator() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(*),\n        |  MAX(`double`),\n        |  COUNT(DISTINCT `string`)\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(proctime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n      ")).stripMargin();
        Table t1 = this.tEnv().sqlQuery(sql);
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarnessTester(package$.MODULE$.tableConversions(t1).toAppendStream(TypeExtractor.createTypeInfo(Row.class)), "WindowAggregate");
        LogicalType[] outputTypes = (LogicalType[])((Object[])new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.DOUBLE().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.TIMESTAMP_LTZ((int)3).getLogicalType(), DataTypes.TIMESTAMP_LTZ((int)3).getLogicalType()});
        return new Tuple2(testHarness, (Object)outputTypes);
    }

    @Test
    public void testProcessingTimeHopWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(*),\n        |  MAX(`double`),\n        |  COUNT(DISTINCT `string`)\n        |FROM TABLE(\n        |   HOP(TABLE T1, DESCRIPTOR(proctime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n        |GROUP BY `name`, window_start, window_end\n      ")).stripMargin();
        Table t1 = this.tEnv().sqlQuery(sql);
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarnessTester(package$.MODULE$.tableConversions(t1).toAppendStream(TypeExtractor.createTypeInfo(Row.class)), "WindowAggregate");
        RowDataHarnessAssertor assertor = new RowDataHarnessAssertor((LogicalType[])((Object[])new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.DOUBLE().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.TIMESTAMP_LTZ((int)3).getLogicalType(), DataTypes.TIMESTAMP_LTZ((int)3).getLogicalType()}));
        testHarness.open();
        this.ingestData(testHarness);
        ConcurrentLinkedQueue<StreamRecord<RowData>> expected = new ConcurrentLinkedQueue<StreamRecord<RowData>>();
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)4L), BoxesRunTime.boxToDouble((double)5.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1969-12-31T23:59:55"), this.localMills("1970-01-01T00:00:05")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)5L), BoxesRunTime.boxToDouble((double)5.0), BoxesRunTime.boxToLong((long)3L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)1L), null, BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:05"), this.localMills("1970-01-01T00:00:15")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)6.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)6.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:05"), this.localMills("1970-01-01T00:00:15")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:10"), this.localMills("1970-01-01T00:00:20")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:15"), this.localMills("1970-01-01T00:00:25")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)3.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:25"), this.localMills("1970-01-01T00:00:35")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)3.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:40")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)7.0), BoxesRunTime.boxToLong((long)0L), this.localMills("1970-01-01T00:00:25"), this.localMills("1970-01-01T00:00:35")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)7.0), BoxesRunTime.boxToLong((long)0L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:40")})));
        assertor.assertOutputEqualsSorted("result mismatch", expected, (Collection)testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testProcessingTimeCumulateWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(*),\n        |  MAX(`double`),\n        |  COUNT(DISTINCT `string`)\n        |FROM TABLE(\n        |   CUMULATE(\n        |     TABLE T1,\n        |     DESCRIPTOR(proctime),\n        |     INTERVAL '5' SECOND,\n        |     INTERVAL '15' SECOND))\n        |GROUP BY `name`, window_start, window_end\n      ")).stripMargin();
        Table t1 = this.tEnv().sqlQuery(sql);
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarnessTester(package$.MODULE$.tableConversions(t1).toAppendStream(TypeExtractor.createTypeInfo(Row.class)), "WindowAggregate");
        RowDataHarnessAssertor assertor = new RowDataHarnessAssertor((LogicalType[])((Object[])new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.DOUBLE().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.TIMESTAMP_LTZ((int)3).getLogicalType(), DataTypes.TIMESTAMP_LTZ((int)3).getLogicalType()}));
        testHarness.open();
        this.ingestData(testHarness);
        ConcurrentLinkedQueue<StreamRecord<RowData>> expected = new ConcurrentLinkedQueue<StreamRecord<RowData>>();
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)4L), BoxesRunTime.boxToDouble((double)5.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:05")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)5L), BoxesRunTime.boxToDouble((double)5.0), BoxesRunTime.boxToLong((long)3L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)5L), BoxesRunTime.boxToDouble((double)5.0), BoxesRunTime.boxToLong((long)3L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:15")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)6.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)6.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:15")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:15"), this.localMills("1970-01-01T00:00:20")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:15"), this.localMills("1970-01-01T00:00:25")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:15"), this.localMills("1970-01-01T00:00:30")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)3.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:35")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)3.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:40")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)3.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:45")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)7.0), BoxesRunTime.boxToLong((long)0L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:35")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)7.0), BoxesRunTime.boxToLong((long)0L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:40")})));
        expected.add(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)7.0), BoxesRunTime.boxToLong((long)0L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:45")})));
        assertor.assertOutputEqualsSorted("result mismatch", expected, (Collection)testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testCloseWithoutOpen() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> tuple2 = this.createProcessingTimeTumbleWindowOperator();
        if (tuple2 != null) {
            Tuple2 tuple22;
            KeyedOneInputStreamOperatorTestHarness testHarness = (KeyedOneInputStreamOperatorTestHarness)tuple2._1();
            LogicalType[] outputTypes = (LogicalType[])tuple2._2();
            Tuple2 tuple23 = tuple22 = new Tuple2((Object)testHarness, (Object)outputTypes);
            KeyedOneInputStreamOperatorTestHarness testHarness2 = (KeyedOneInputStreamOperatorTestHarness)tuple23._1();
            LogicalType[] outputTypes2 = (LogicalType[])tuple23._2();
            testHarness2.setup((TypeSerializer)new RowDataSerializer(outputTypes2));
            testHarness2.close();
            return;
        }
        throw new MatchError(tuple2);
    }

    @Test
    public void testTwoPhaseWindowAggregateCloseWithoutOpen() {
        String timestampDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |CREATE TABLE T2 (\n         | `ts` STRING,\n         | `int` INT,\n         | `double` DOUBLE,\n         | `float` FLOAT,\n         | `bigdec` DECIMAL(10, 2),\n         | `string` STRING,\n         | `name` STRING,\n         | `rowtime` AS\n         | TO_TIMESTAMP(`ts`),\n         | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '", "',\n         | 'failing-source' = 'false'\n         |)\n         |"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{timestampDataId})))).stripMargin());
        this.tEnv().getConfig().getConfiguration().setString(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(*),\n        |  MAX(`double`),\n        |  COUNT(DISTINCT `string`)\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n      ")).stripMargin();
        Table t1 = this.tEnv().sqlQuery(sql);
        DataStream stream = package$.MODULE$.tableConversions(t1).toAppendStream(TypeExtractor.createTypeInfo(Row.class));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createHarnessTesterForNoState(stream, "LocalWindowAggregate");
        LogicalType[] outputTypes = (LogicalType[])((Object[])new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.DOUBLE().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.TIMESTAMP_LTZ((int)3).getLogicalType(), DataTypes.TIMESTAMP_LTZ((int)3).getLogicalType()});
        testHarness.setup((TypeSerializer)new RowDataSerializer(outputTypes));
        testHarness.close();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness1 = this.createHarnessTester(stream, "GlobalWindowAggregate");
        testHarness1.setup((TypeSerializer)new RowDataSerializer(outputTypes));
        testHarness1.close();
    }

    private void ingestData(KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness) {
        testHarness.setProcessingTime(1000L);
        testHarness.processElement(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)1.0), "Hi", null})));
        testHarness.setProcessingTime(2000L);
        testHarness.processElement(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)2.0), "Comment#1", null})));
        testHarness.setProcessingTime(3000L);
        testHarness.processElement(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)2.0), "Comment#1", null})));
        testHarness.setProcessingTime(4000L);
        testHarness.processElement(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)5.0), null, null})));
        testHarness.setProcessingTime(6000L);
        testHarness.processElement(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToDouble((double)6.0), "Hi", null})));
        testHarness.setProcessingTime(7000L);
        testHarness.processElement(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToDouble((double)3.0), "Hello", null})));
        testHarness.setProcessingTime(8000L);
        testHarness.processElement(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", null, "Comment#2", null})));
        testHarness.setProcessingTime(16000L);
        testHarness.processElement(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToDouble((double)4.0), "Hi", null})));
        testHarness.setProcessingTime(32000L);
        testHarness.processElement(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, BoxesRunTime.boxToDouble((double)7.0), null, null})));
        testHarness.setProcessingTime(34000L);
        testHarness.processElement(this.record((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToDouble((double)3.0), "Comment#3", null})));
        testHarness.setProcessingTime(50000L);
    }

    private StreamRecord<RowData> record(Seq<Object> args) {
        Object[] objs = (Object[])((TraversableOnce)args.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Object apply(Object x0$1) {
                Object object;
                Object object2 = x0$1;
                if (object2 instanceof Long) {
                    long l = BoxesRunTime.unboxToLong((Object)object2);
                    object = BoxesRunTime.boxToLong((long)l);
                } else if (object2 instanceof Double) {
                    double d = BoxesRunTime.unboxToDouble((Object)object2);
                    object = BoxesRunTime.boxToDouble((double)d);
                } else {
                    object = object2;
                }
                return object;
            }
        }, Seq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.Object());
        return StreamRecordUtils.binaryRecord((RowKind)RowKind.INSERT, (Object[])objs);
    }

    private TimestampData localMills(String dateTime) {
        ZonedDateTime windowDateTime = LocalDateTime.parse(dateTime).atZone(this.UTC_ZONE_ID());
        return TimestampData.fromEpochMillis((long)TimeWindowUtil.toUtcTimestampMills((long)windowDateTime.toInstant().toEpochMilli(), (ZoneId)this.shiftTimeZone));
    }

    public WindowAggregateHarnessTest(StreamingWithStateTestBase.StateBackendMode backend, ZoneId shiftTimeZone) {
        this.shiftTimeZone = shiftTimeZone;
        super(backend);
        this.UTC_ZONE_ID = ZoneId.of("UTC");
    }
}

