/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.co;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class KeyedCoProcessOperatorTest {
    KeyedCoProcessOperatorTest() {
    }

    @Test
    void testTimestampAndWatermarkQuerying() throws Exception {
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((KeyedCoProcessFunction)new WatermarkQueryingProcessFunction());
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness(operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processWatermark1(new Watermark(17L));
        testHarness.processWatermark2(new Watermark(17L));
        testHarness.processElement1(new StreamRecord((Object)5, 12L));
        testHarness.processWatermark1(new Watermark(42L));
        testHarness.processWatermark2(new Watermark(42L));
        testHarness.processElement2(new StreamRecord((Object)"6", 13L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new Watermark(17L));
        expectedOutput.add(new StreamRecord((Object)"5WM:17 TS:12", 12L));
        expectedOutput.add(new Watermark(42L));
        expectedOutput.add(new StreamRecord((Object)"6WM:42 TS:13", 13L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTimestampAndProcessingTimeQuerying() throws Exception {
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((KeyedCoProcessFunction)new ProcessingTimeQueryingProcessFunction());
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness(operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.setProcessingTime(17L);
        testHarness.processElement1(new StreamRecord((Object)5));
        testHarness.setProcessingTime(42L);
        testHarness.processElement2(new StreamRecord((Object)"6"));
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)"5PT:17 TS:null"));
        expectedOutput.add(new StreamRecord((Object)"6PT:42 TS:null"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testEventTimeTimers() throws Exception {
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((KeyedCoProcessFunction)new EventTimeTriggeringProcessFunction());
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness(operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement1(new StreamRecord((Object)17, 42L));
        testHarness.processElement2(new StreamRecord((Object)"18", 42L));
        testHarness.processWatermark1(new Watermark(5L));
        testHarness.processWatermark2(new Watermark(5L));
        testHarness.processWatermark1(new Watermark(6L));
        testHarness.processWatermark2(new Watermark(6L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)"INPUT1:17", 42L));
        expectedOutput.add(new StreamRecord((Object)"INPUT2:18", 42L));
        expectedOutput.add(new StreamRecord((Object)"17:1777", 5L));
        expectedOutput.add(new Watermark(5L));
        expectedOutput.add(new StreamRecord((Object)"18:1777", 6L));
        expectedOutput.add(new Watermark(6L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testProcessingTimeTimers() throws Exception {
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((KeyedCoProcessFunction)new ProcessingTimeTriggeringProcessFunction());
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness(operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement1(new StreamRecord((Object)17));
        testHarness.processElement2(new StreamRecord((Object)"18"));
        testHarness.setProcessingTime(5L);
        testHarness.setProcessingTime(6L);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)"INPUT1:17"));
        expectedOutput.add(new StreamRecord((Object)"INPUT2:18"));
        expectedOutput.add(new StreamRecord((Object)"1777"));
        expectedOutput.add(new StreamRecord((Object)"1777"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testEventTimeTimerWithState() throws Exception {
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((KeyedCoProcessFunction)new EventTimeTriggeringStatefulProcessFunction());
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness(operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processWatermark1(new Watermark(1L));
        testHarness.processWatermark2(new Watermark(1L));
        testHarness.processElement1(new StreamRecord((Object)17, 0L));
        testHarness.processElement1(new StreamRecord((Object)13, 0L));
        testHarness.processWatermark1(new Watermark(2L));
        testHarness.processWatermark2(new Watermark(2L));
        testHarness.processElement1(new StreamRecord((Object)13, 1L));
        testHarness.processElement2(new StreamRecord((Object)"42", 1L));
        testHarness.processWatermark1(new Watermark(6L));
        testHarness.processWatermark2(new Watermark(6L));
        testHarness.processWatermark1(new Watermark(7L));
        testHarness.processWatermark2(new Watermark(7L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new Watermark(1L));
        expectedOutput.add(new StreamRecord((Object)"INPUT1:17", 0L));
        expectedOutput.add(new StreamRecord((Object)"INPUT1:13", 0L));
        expectedOutput.add(new Watermark(2L));
        expectedOutput.add(new StreamRecord((Object)"INPUT2:42", 1L));
        expectedOutput.add(new StreamRecord((Object)"STATE:17", 6L));
        expectedOutput.add(new Watermark(6L));
        expectedOutput.add(new StreamRecord((Object)"STATE:42", 7L));
        expectedOutput.add(new Watermark(7L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testProcessingTimeTimerWithState() throws Exception {
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((KeyedCoProcessFunction)new ProcessingTimeTriggeringStatefulProcessFunction());
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness(operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(new StreamRecord((Object)17));
        testHarness.processElement1(new StreamRecord((Object)13));
        testHarness.setProcessingTime(2L);
        testHarness.processElement1(new StreamRecord((Object)13));
        testHarness.processElement2(new StreamRecord((Object)"42"));
        testHarness.setProcessingTime(6L);
        testHarness.setProcessingTime(7L);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)"INPUT1:17"));
        expectedOutput.add(new StreamRecord((Object)"INPUT1:13"));
        expectedOutput.add(new StreamRecord((Object)"INPUT2:42"));
        expectedOutput.add(new StreamRecord((Object)"STATE:17"));
        expectedOutput.add(new StreamRecord((Object)"STATE:42"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testSnapshotAndRestore() throws Exception {
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((KeyedCoProcessFunction)new BothTriggeringProcessFunction());
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness(operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement1(new StreamRecord((Object)5, 12L));
        testHarness.processElement2(new StreamRecord((Object)"5", 12L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        operator = new KeyedCoProcessOperator((KeyedCoProcessFunction)new BothTriggeringProcessFunction());
        testHarness = new KeyedTwoInputStreamOperatorTestHarness(operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.setProcessingTime(5L);
        testHarness.processWatermark1(new Watermark(6L));
        testHarness.processWatermark2(new Watermark(6L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)"PROC:1777"));
        expectedOutput.add(new StreamRecord((Object)"EVENT:1777", 6L));
        expectedOutput.add(new Watermark(6L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testGetCurrentKeyFromContext() throws Exception {
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((KeyedCoProcessFunction)new AppendCurrentKeyProcessFunction());
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness(operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement1(new StreamRecord((Object)5));
        testHarness.processElement1(new StreamRecord((Object)6));
        testHarness.processElement2(new StreamRecord((Object)"hello"));
        testHarness.processElement2(new StreamRecord((Object)"world"));
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)"5,5"));
        expectedOutput.add(new StreamRecord((Object)"6,6"));
        expectedOutput.add(new StreamRecord((Object)"hello,hello"));
        expectedOutput.add(new StreamRecord((Object)"world,world"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    private static class AppendCurrentKeyProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private AppendCurrentKeyProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "," + (String)ctx.getCurrentKey()));
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "," + (String)ctx.getCurrentKey()));
        }
    }

    private static class BothTriggeringProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1L;

        private BothTriggeringProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            ctx.timerService().registerProcessingTimeTimer(3L);
            ctx.timerService().registerEventTimeTimer(6L);
            ctx.timerService().deleteProcessingTimeTimer(3L);
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            ctx.timerService().registerEventTimeTimer(4L);
            ctx.timerService().registerProcessingTimeTimer(5L);
            ctx.timerService().deleteEventTimeTimer(4L);
        }

        public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            if (TimeDomain.EVENT_TIME.equals((Object)ctx.timeDomain())) {
                out.collect((Object)"EVENT:1777");
            } else {
                out.collect((Object)"PROC:1777");
            }
        }
    }

    private static class ProcessingTimeTriggeringStatefulProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1L;
        private final ValueStateDescriptor<String> state = new ValueStateDescriptor("seen-element", (TypeSerializer)StringSerializer.INSTANCE);

        private ProcessingTimeTriggeringStatefulProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            this.handleValue(value, out, ctx.timerService(), 1);
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            this.handleValue(value, out, ctx.timerService(), 2);
        }

        private void handleValue(Object value, Collector<String> out, TimerService timerService, int channel) throws IOException {
            ValueState state = this.getRuntimeContext().getState(this.state);
            if (state.value() == null) {
                out.collect((Object)("INPUT" + channel + ":" + value));
                state.update((Object)String.valueOf(value));
                timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5L);
            } else {
                state.clear();
                timerService.deleteProcessingTimeTimer(timerService.currentProcessingTime() + 4L);
            }
        }

        public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            Assertions.assertThat((Comparable)ctx.timeDomain()).isEqualTo((Object)TimeDomain.PROCESSING_TIME);
            out.collect((Object)("STATE:" + (String)this.getRuntimeContext().getState(this.state).value()));
        }
    }

    private static class ProcessingTimeQueryingProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1L;

        private ProcessingTimeQueryingProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()));
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()));
        }

        public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
        }
    }

    private static class ProcessingTimeTriggeringProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1L;

        private ProcessingTimeTriggeringProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)("INPUT1:" + value));
            ctx.timerService().registerProcessingTimeTimer(5L);
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)("INPUT2:" + value));
            ctx.timerService().registerProcessingTimeTimer(6L);
        }

        public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            Assertions.assertThat((Comparable)ctx.timeDomain()).isEqualTo((Object)TimeDomain.PROCESSING_TIME);
            out.collect((Object)"1777");
        }
    }

    private static class EventTimeTriggeringStatefulProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1L;
        private final ValueStateDescriptor<String> state = new ValueStateDescriptor("seen-element", (TypeSerializer)StringSerializer.INSTANCE);

        private EventTimeTriggeringStatefulProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            this.handleValue(value, out, ctx.timerService(), 1);
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            this.handleValue(value, out, ctx.timerService(), 2);
        }

        private void handleValue(Object value, Collector<String> out, TimerService timerService, int channel) throws IOException {
            ValueState state = this.getRuntimeContext().getState(this.state);
            if (state.value() == null) {
                out.collect((Object)("INPUT" + channel + ":" + value));
                state.update((Object)String.valueOf(value));
                timerService.registerEventTimeTimer(timerService.currentWatermark() + 5L);
            } else {
                state.clear();
                timerService.deleteEventTimeTimer(timerService.currentWatermark() + 4L);
            }
        }

        public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            Assertions.assertThat((Comparable)ctx.timeDomain()).isEqualTo((Object)TimeDomain.EVENT_TIME);
            out.collect((Object)("STATE:" + (String)this.getRuntimeContext().getState(this.state).value()));
        }
    }

    private static class EventTimeTriggeringProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1L;

        private EventTimeTriggeringProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)("INPUT1:" + value));
            ctx.timerService().registerEventTimeTimer(5L);
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)("INPUT2:" + value));
            ctx.timerService().registerEventTimeTimer(6L);
        }

        public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            Assertions.assertThat((Comparable)ctx.timeDomain()).isEqualTo((Object)TimeDomain.EVENT_TIME);
            out.collect((Object)((String)ctx.getCurrentKey() + ":1777"));
        }
    }

    private static class WatermarkQueryingProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1L;

        private WatermarkQueryingProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()));
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()));
        }

        public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
        }
    }

    private static class IdentityKeySelector<T>
    implements KeySelector<T, T> {
        private static final long serialVersionUID = 1L;

        private IdentityKeySelector() {
        }

        public T getKey(T value) throws Exception {
            return value;
        }
    }

    private static class IntToStringKeySelector<T>
    implements KeySelector<Integer, String> {
        private static final long serialVersionUID = 1L;

        private IntToStringKeySelector() {
        }

        public String getKey(Integer value) throws Exception {
            return "" + value;
        }
    }
}

