package org.apache.flink.streaming.api.operators.co;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.operators.SourceOperatorTestContext;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OutputTag;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.class */
public class IntervalJoinOperatorTest {
    private final boolean lhsFasterThanRhs;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest$JoinTestBuilder.class */
    public class JoinTestBuilder {
        private IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator;
        private TestHarness testHarness;

        public JoinTestBuilder(TestHarness testHarness, IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> intervalJoinOperator) throws Exception {
            this.testHarness = testHarness;
            this.operator = intervalJoinOperator;
            testHarness.open();
            testHarness.setup();
        }

        public TestHarness get() {
            return this.testHarness;
        }

        public JoinTestBuilder processElement1(int i) throws Exception {
            this.testHarness.processElement1(IntervalJoinOperatorTest.createStreamRecord(i, "lhs"));
            return this;
        }

        public JoinTestBuilder processElement2(int i) throws Exception {
            this.testHarness.processElement2(IntervalJoinOperatorTest.createStreamRecord(i, "rhs"));
            return this;
        }

        public JoinTestBuilder processWatermark1(int i) throws Exception {
            this.testHarness.processWatermark1(new Watermark(i));
            return this;
        }

        public JoinTestBuilder processWatermark2(int i) throws Exception {
            this.testHarness.processWatermark2(new Watermark(i));
            return this;
        }

        public JoinTestBuilder processElementsAndWatermarks(int i, int i2) throws Exception {
            if (IntervalJoinOperatorTest.this.lhsFasterThanRhs) {
                for (int i3 = i; i3 <= i2; i3++) {
                    this.testHarness.processElement1(IntervalJoinOperatorTest.createStreamRecord(i3, "lhs"));
                    this.testHarness.processWatermark1(new Watermark(i3));
                }
                for (int i4 = i; i4 <= i2; i4++) {
                    this.testHarness.processElement2(IntervalJoinOperatorTest.createStreamRecord(i4, "rhs"));
                    this.testHarness.processWatermark2(new Watermark(i4));
                }
            } else {
                for (int i5 = i; i5 <= i2; i5++) {
                    this.testHarness.processElement2(IntervalJoinOperatorTest.createStreamRecord(i5, "rhs"));
                    this.testHarness.processWatermark2(new Watermark(i5));
                }
                for (int i6 = i; i6 <= i2; i6++) {
                    this.testHarness.processElement1(IntervalJoinOperatorTest.createStreamRecord(i6, "lhs"));
                    this.testHarness.processWatermark1(new Watermark(i6));
                }
            }
            return this;
        }

        @SafeVarargs
        public final JoinTestBuilder andExpect(StreamRecord<Tuple2<TestElem, TestElem>>... streamRecordArr) {
            IntervalJoinOperatorTest.this.assertOutput(Lists.newArrayList(streamRecordArr), this.testHarness.getOutput());
            return this;
        }

        public JoinTestBuilder assertLeftBufferContainsOnly(long... jArr) {
            try {
                IntervalJoinOperatorTest.this.assertContainsOnly(this.operator.getLeftBuffer(), jArr);
                return this;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public JoinTestBuilder assertRightBufferContainsOnly(long... jArr) {
            try {
                IntervalJoinOperatorTest.this.assertContainsOnly(this.operator.getRightBuffer(), jArr);
                return this;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public JoinTestBuilder assertLeftBufferEmpty() {
            try {
                IntervalJoinOperatorTest.this.assertEmpty(this.operator.getLeftBuffer());
                return this;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public JoinTestBuilder assertRightBufferEmpty() {
            try {
                IntervalJoinOperatorTest.this.assertEmpty(this.operator.getRightBuffer());
                return this;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @SafeVarargs
        public final JoinTestBuilder expectLateRecords(OutputTag<TestElem> outputTag, StreamRecord<TestElem>... streamRecordArr) {
            IntervalJoinOperatorTest.this.assertOutput(Lists.newArrayList(streamRecordArr), this.testHarness.getSideOutput(outputTag));
            return this;
        }

        public JoinTestBuilder noLateRecords() {
            TestHarnessUtil.assertNoLateRecords(this.testHarness.getOutput());
            return this;
        }

        public void close() throws Exception {
            this.testHarness.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest$PassthroughFunction.class */
    public static class PassthroughFunction extends ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>> {
        private PassthroughFunction() {
        }

        public void processElement(TestElem testElem, TestElem testElem2, ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>.Context context, Collector<Tuple2<TestElem, TestElem>> collector) throws Exception {
            collector.collect(Tuple2.of(testElem, testElem2));
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, Object obj2, ProcessJoinFunction.Context context, Collector collector) throws Exception {
            processElement((TestElem) obj, (TestElem) obj2, (ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>.Context) context, (Collector<Tuple2<TestElem, TestElem>>) collector);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest$TestElem.class */
    public static class TestElem {
        String key = "key";
        long ts;
        String source;

        public TestElem(long j, String str) {
            this.ts = j;
            this.source = str;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            TestElem testElem = (TestElem) obj;
            if (this.ts != testElem.ts) {
                return false;
            }
            if (this.key != null) {
                if (!this.key.equals(testElem.key)) {
                    return false;
                }
            } else if (testElem.key != null) {
                return false;
            }
            return this.source != null ? this.source.equals(testElem.source) : testElem.source == null;
        }

        public int hashCode() {
            return (31 * ((31 * (this.key != null ? this.key.hashCode() : 0)) + ((int) (this.ts ^ (this.ts >>> 32))))) + (this.source != null ? this.source.hashCode() : 0);
        }

        public String toString() {
            return this.source + ":" + this.ts;
        }

        public static TypeSerializer<TestElem> serializer() {
            return TypeInformation.of(new TypeHint<TestElem>() { // from class: org.apache.flink.streaming.api.operators.co.IntervalJoinOperatorTest.TestElem.1
            }).createSerializer(new SerializerConfigImpl());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest$TestHarness.class */
    public static class TestHarness extends KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> {
        TestHarness(TwoInputStreamOperator<TestElem, TestElem, Tuple2<TestElem, TestElem>> twoInputStreamOperator, KeySelector<TestElem, String> keySelector, KeySelector<TestElem, String> keySelector2, TypeInformation<String> typeInformation) throws Exception {
            super(twoInputStreamOperator, keySelector, keySelector2, typeInformation);
        }
    }

    @Parameterized.Parameters(name = "lhs faster than rhs: {0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{true}, new Object[]{false});
    }

    public IntervalJoinOperatorTest(boolean z) {
        this.lhsFasterThanRhs = z;
    }

    @Test
    public void testImplementationMirrorsCorrectly() throws Exception {
        setupHarness(1L, true, 3L, false).processElementsAndWatermarks(1, 4).andExpect(streamRecordOf(1L, 2L), streamRecordOf(1L, 3L), streamRecordOf(2L, 3L), streamRecordOf(2L, 4L), streamRecordOf(3L, 4L)).noLateRecords().close();
        setupHarness((-1) * 3, false, (-1) * 1, true).processElementsAndWatermarks(1, 4).andExpect(streamRecordOf(2L, 1L), streamRecordOf(3L, 1L), streamRecordOf(3L, 2L), streamRecordOf(4L, 2L), streamRecordOf(4L, 3L)).noLateRecords().close();
    }

    @Test
    public void testNegativeInclusiveAndNegativeInclusive() throws Exception {
        setupHarness(-2L, true, -1L, true).processElementsAndWatermarks(1, 4).andExpect(streamRecordOf(2L, 1L), streamRecordOf(3L, 1L), streamRecordOf(3L, 2L), streamRecordOf(4L, 2L), streamRecordOf(4L, 3L)).noLateRecords().close();
    }

    @Test
    public void testNegativeInclusiveAndPositiveInclusive() throws Exception {
        setupHarness(-1L, true, 1L, true).processElementsAndWatermarks(1, 4).andExpect(streamRecordOf(1L, 1L), streamRecordOf(1L, 2L), streamRecordOf(2L, 1L), streamRecordOf(2L, 2L), streamRecordOf(2L, 3L), streamRecordOf(3L, 2L), streamRecordOf(3L, 3L), streamRecordOf(3L, 4L), streamRecordOf(4L, 3L), streamRecordOf(4L, 4L)).noLateRecords().close();
    }

    @Test
    public void testPositiveInclusiveAndPositiveInclusive() throws Exception {
        setupHarness(1L, true, 2L, true).processElementsAndWatermarks(1, 4).andExpect(streamRecordOf(1L, 2L), streamRecordOf(1L, 3L), streamRecordOf(2L, 3L), streamRecordOf(2L, 4L), streamRecordOf(3L, 4L)).noLateRecords().close();
    }

    @Test
    public void testNegativeExclusiveAndNegativeExlusive() throws Exception {
        setupHarness(-3L, false, -1L, false).processElementsAndWatermarks(1, 4).andExpect(streamRecordOf(3L, 1L), streamRecordOf(4L, 2L)).noLateRecords().close();
    }

    @Test
    public void testNegativeExclusiveAndPositiveExlusive() throws Exception {
        setupHarness(-1L, false, 1L, false).processElementsAndWatermarks(1, 4).andExpect(streamRecordOf(1L, 1L), streamRecordOf(2L, 2L), streamRecordOf(3L, 3L), streamRecordOf(4L, 4L)).noLateRecords().close();
    }

    @Test
    public void testPositiveExclusiveAndPositiveExlusive() throws Exception {
        setupHarness(1L, false, 3L, false).processElementsAndWatermarks(1, 4).andExpect(streamRecordOf(1L, 3L), streamRecordOf(2L, 4L)).noLateRecords().close();
    }

    @Test
    public void testStateCleanupNegativeInclusiveNegativeInclusive() throws Exception {
        setupHarness(-1L, true, 0L, true).processElement1(1).processElement1(2).processElement1(3).processElement1(4).processElement1(5).processElement2(1).processElement2(2).processElement2(3).processElement2(4).processElement2(5).processWatermark1(1).processWatermark2(1).assertLeftBufferContainsOnly(2, 3, 4, 5).assertRightBufferContainsOnly(1, 2, 3, 4, 5).processWatermark1(4).processWatermark2(4).assertLeftBufferContainsOnly(5).assertRightBufferContainsOnly(4, 5).processWatermark1(6).processWatermark2(6).assertLeftBufferEmpty().assertRightBufferEmpty().close();
    }

    @Test
    public void testStateCleanupNegativePositiveNegativeExlusive() throws Exception {
        setupHarness(-2L, false, 1L, false).processElement1(1).processElement1(2).processElement1(3).processElement1(4).processElement1(5).processElement2(1).processElement2(2).processElement2(3).processElement2(4).processElement2(5).processWatermark1(1).processWatermark2(1).assertLeftBufferContainsOnly(2, 3, 4, 5).assertRightBufferContainsOnly(1, 2, 3, 4, 5).processWatermark1(4).processWatermark2(4).assertLeftBufferContainsOnly(5).assertRightBufferContainsOnly(4, 5).processWatermark1(6).processWatermark2(6).assertLeftBufferEmpty().assertRightBufferEmpty().close();
    }

    @Test
    public void testStateCleanupPositiveInclusivePositiveInclusive() throws Exception {
        setupHarness(0L, true, 1L, true).processElement1(1).processElement1(2).processElement1(3).processElement1(4).processElement1(5).processElement2(1).processElement2(2).processElement2(3).processElement2(4).processElement2(5).processWatermark1(1).processWatermark2(1).assertLeftBufferContainsOnly(1, 2, 3, 4, 5).assertRightBufferContainsOnly(2, 3, 4, 5).processWatermark1(4).processWatermark2(4).assertLeftBufferContainsOnly(4, 5).assertRightBufferContainsOnly(5).processWatermark1(6).processWatermark2(6).assertLeftBufferEmpty().assertRightBufferEmpty().close();
    }

    @Test
    public void testStateCleanupPositiveExlusivePositiveExclusive() throws Exception {
        setupHarness(-1L, false, 2L, false).processElement1(1).processElement1(2).processElement1(3).processElement1(4).processElement1(5).processElement2(1).processElement2(2).processElement2(3).processElement2(4).processElement2(5).processWatermark1(1).processWatermark2(1).assertLeftBufferContainsOnly(1, 2, 3, 4, 5).assertRightBufferContainsOnly(2, 3, 4, 5).processWatermark1(4).processWatermark2(4).assertLeftBufferContainsOnly(4, 5).assertRightBufferContainsOnly(5).processWatermark1(6).processWatermark2(6).assertLeftBufferEmpty().assertRightBufferEmpty().close();
    }

    @Test
    public void testRestoreFromSnapshot() throws Exception {
        TestHarness createTestHarness = createTestHarness(-1, true, 1, true);
        Throwable th = null;
        try {
            createTestHarness.setup();
            createTestHarness.open();
            createTestHarness.processElement1(createStreamRecord(1L, "lhs"));
            createTestHarness.processWatermark1(new Watermark(1L));
            createTestHarness.processElement2(createStreamRecord(1L, "rhs"));
            createTestHarness.processWatermark2(new Watermark(1L));
            createTestHarness.processElement1(createStreamRecord(2L, "lhs"));
            createTestHarness.processWatermark1(new Watermark(2L));
            createTestHarness.processElement2(createStreamRecord(2L, "rhs"));
            createTestHarness.processWatermark2(new Watermark(2L));
            createTestHarness.processElement1(createStreamRecord(3L, "lhs"));
            createTestHarness.processWatermark1(new Watermark(3L));
            createTestHarness.processElement2(createStreamRecord(3L, "rhs"));
            createTestHarness.processWatermark2(new Watermark(3L));
            OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
            createTestHarness.close();
            ArrayList newArrayList = Lists.newArrayList(new StreamRecord[]{streamRecordOf(1L, 1L), streamRecordOf(1L, 2L), streamRecordOf(2L, 1L), streamRecordOf(2L, 2L), streamRecordOf(2L, 3L), streamRecordOf(3L, 2L), streamRecordOf(3L, 3L)});
            TestHarnessUtil.assertNoLateRecords(createTestHarness.getOutput());
            assertOutput(newArrayList, createTestHarness.getOutput());
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            TestHarness createTestHarness2 = createTestHarness(-1, true, 1, true);
            Throwable th3 = null;
            try {
                try {
                    createTestHarness2.setup();
                    createTestHarness2.initializeState(snapshot);
                    createTestHarness2.open();
                    createTestHarness2.processElement1(createStreamRecord(4L, "lhs"));
                    createTestHarness2.processWatermark1(new Watermark(4L));
                    createTestHarness2.processElement2(createStreamRecord(4L, "rhs"));
                    createTestHarness2.processWatermark2(new Watermark(4L));
                    ArrayList newArrayList2 = Lists.newArrayList(new StreamRecord[]{streamRecordOf(3L, 4L), streamRecordOf(4L, 3L), streamRecordOf(4L, 4L)});
                    TestHarnessUtil.assertNoLateRecords(createTestHarness2.getOutput());
                    assertOutput(newArrayList2, createTestHarness2.getOutput());
                    if (createTestHarness2 != null) {
                        if (0 == 0) {
                            createTestHarness2.close();
                            return;
                        }
                        try {
                            createTestHarness2.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th3 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (createTestHarness2 != null) {
                    if (th3 != null) {
                        try {
                            createTestHarness2.close();
                        } catch (Throwable th7) {
                            th3.addSuppressed(th7);
                        }
                    } else {
                        createTestHarness2.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void testContextCorrectLeftTimestamp() throws Exception {
        TestHarness testHarness = new TestHarness(new IntervalJoinOperator(-1L, 1L, true, true, (OutputTag) null, (OutputTag) null, TestElem.serializer(), TestElem.serializer(), new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() { // from class: org.apache.flink.streaming.api.operators.co.IntervalJoinOperatorTest.1
            public void processElement(TestElem testElem, TestElem testElem2, ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>.Context context, Collector<Tuple2<TestElem, TestElem>> collector) throws Exception {
                Assert.assertEquals(testElem.ts, context.getLeftTimestamp());
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, Object obj2, ProcessJoinFunction.Context context, Collector collector) throws Exception {
                processElement((TestElem) obj, (TestElem) obj2, (ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>.Context) context, (Collector<Tuple2<TestElem, TestElem>>) collector);
            }
        }), testElem -> {
            return testElem.key;
        }, testElem2 -> {
            return testElem2.key;
        }, TypeInformation.of(String.class));
        Throwable th = null;
        try {
            try {
                testHarness.setup();
                testHarness.open();
                processElementsAndWatermarks(testHarness);
                if (testHarness != null) {
                    if (0 == 0) {
                        testHarness.close();
                        return;
                    }
                    try {
                        testHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (testHarness != null) {
                if (th != null) {
                    try {
                        testHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    testHarness.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testReturnsCorrectTimestamp() throws Exception {
        TestHarness testHarness = new TestHarness(new IntervalJoinOperator(-1L, 1L, true, true, (OutputTag) null, (OutputTag) null, TestElem.serializer(), TestElem.serializer(), new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() { // from class: org.apache.flink.streaming.api.operators.co.IntervalJoinOperatorTest.2
            private static final long serialVersionUID = 1;

            public void processElement(TestElem testElem, TestElem testElem2, ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>.Context context, Collector<Tuple2<TestElem, TestElem>> collector) throws Exception {
                Assert.assertEquals(Math.max(testElem.ts, testElem2.ts), context.getTimestamp());
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, Object obj2, ProcessJoinFunction.Context context, Collector collector) throws Exception {
                processElement((TestElem) obj, (TestElem) obj2, (ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>.Context) context, (Collector<Tuple2<TestElem, TestElem>>) collector);
            }
        }), testElem -> {
            return testElem.key;
        }, testElem2 -> {
            return testElem2.key;
        }, TypeInformation.of(String.class));
        Throwable th = null;
        try {
            try {
                testHarness.setup();
                testHarness.open();
                processElementsAndWatermarks(testHarness);
                if (testHarness != null) {
                    if (0 == 0) {
                        testHarness.close();
                        return;
                    }
                    try {
                        testHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (testHarness != null) {
                if (th != null) {
                    try {
                        testHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    testHarness.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testContextCorrectRightTimestamp() throws Exception {
        TestHarness testHarness = new TestHarness(new IntervalJoinOperator(-1L, 1L, true, true, (OutputTag) null, (OutputTag) null, TestElem.serializer(), TestElem.serializer(), new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() { // from class: org.apache.flink.streaming.api.operators.co.IntervalJoinOperatorTest.3
            public void processElement(TestElem testElem, TestElem testElem2, ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>.Context context, Collector<Tuple2<TestElem, TestElem>> collector) throws Exception {
                Assert.assertEquals(testElem2.ts, context.getRightTimestamp());
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, Object obj2, ProcessJoinFunction.Context context, Collector collector) throws Exception {
                processElement((TestElem) obj, (TestElem) obj2, (ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>.Context) context, (Collector<Tuple2<TestElem, TestElem>>) collector);
            }
        }), testElem -> {
            return testElem.key;
        }, testElem2 -> {
            return testElem2.key;
        }, TypeInformation.of(String.class));
        Throwable th = null;
        try {
            try {
                testHarness.setup();
                testHarness.open();
                processElementsAndWatermarks(testHarness);
                if (testHarness != null) {
                    if (0 == 0) {
                        testHarness.close();
                        return;
                    }
                    try {
                        testHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (testHarness != null) {
                if (th != null) {
                    try {
                        testHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    testHarness.close();
                }
            }
            throw th4;
        }
    }

    @Test(expected = FlinkException.class)
    public void testFailsWithNoTimestampsLeft() throws Exception {
        TestHarness createTestHarness = createTestHarness(0L, true, 0L, true);
        createTestHarness.setup();
        createTestHarness.open();
        createTestHarness.processElement1(new StreamRecord(new TestElem(0L, "lhs")));
    }

    @Test(expected = FlinkException.class)
    public void testFailsWithNoTimestampsRight() throws Exception {
        TestHarness createTestHarness = createTestHarness(0L, true, 0L, true);
        Throwable th = null;
        try {
            createTestHarness.setup();
            createTestHarness.open();
            createTestHarness.processElement2(new StreamRecord(new TestElem(0L, "rhs")));
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testDiscardsLateData() throws Exception {
        setupHarness(-1L, true, 1L, true).processElement1(1).processElement2(1).processElement1(2).processElement2(2).processElement1(3).processElement2(3).processWatermark1(3).processWatermark2(3).processElement1(1).processElement1(4).processElement2(4).processElement1(5).processElement2(5).andExpect(streamRecordOf(1L, 1L), streamRecordOf(1L, 2L), streamRecordOf(2L, 1L), streamRecordOf(2L, 2L), streamRecordOf(2L, 3L), streamRecordOf(3L, 2L), streamRecordOf(3L, 3L), streamRecordOf(3L, 4L), streamRecordOf(4L, 3L), streamRecordOf(4L, 4L), streamRecordOf(4L, 5L), streamRecordOf(5L, 4L), streamRecordOf(5L, 5L)).noLateRecords().close();
    }

    @Test
    public void testLateData() throws Exception {
        OutputTag<TestElem> outputTag = new OutputTag<TestElem>("left_late") { // from class: org.apache.flink.streaming.api.operators.co.IntervalJoinOperatorTest.4
        };
        OutputTag<TestElem> outputTag2 = new OutputTag<TestElem>("right_late") { // from class: org.apache.flink.streaming.api.operators.co.IntervalJoinOperatorTest.5
        };
        setupHarness(-1L, true, 1L, true, outputTag, outputTag2).processElement1(3).processElement2(3).processWatermark1(3).processWatermark2(3).processElement1(4).processElement2(4).processElement1(1).processElement2(2).processElement1(5).processElement2(5).andExpect(streamRecordOf(3L, 3L), streamRecordOf(3L, 4L), streamRecordOf(4L, 3L), streamRecordOf(4L, 4L), streamRecordOf(4L, 5L), streamRecordOf(5L, 4L), streamRecordOf(5L, 5L)).expectLateRecords(outputTag, createStreamRecord(1L, "lhs")).expectLateRecords(outputTag2, createStreamRecord(2L, "rhs")).close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void assertEmpty(MapState<Long, ?> mapState) throws Exception {
        Assert.assertTrue("state not empty", Iterables.size(mapState.keys()) == 0);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void assertContainsOnly(MapState<Long, ?> mapState, long... jArr) throws Exception {
        for (long j : jArr) {
            Assert.assertTrue("Keys not found in state. \n Expected: " + Arrays.toString(jArr) + "\n Actual:   " + mapState.keys(), mapState.contains(Long.valueOf(j)));
        }
        Assert.assertEquals("Too many objects in state. \n Expected: " + Arrays.toString(jArr) + "\n Actual:   " + mapState.keys(), jArr.length, Iterables.size(mapState.keys()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T1, T2> void assertOutput(Iterable<StreamRecord<T1>> iterable, Queue<T2> queue) {
        Assert.assertEquals("Expected and actual size of stream records different", Iterables.size(iterable), ((List) queue.stream().filter(obj -> {
            return obj instanceof StreamRecord;
        }).collect(Collectors.toList())).size());
        Iterator<StreamRecord<T1>> it = iterable.iterator();
        while (it.hasNext()) {
            Assert.assertTrue(queue.contains(it.next()));
        }
    }

    private TestHarness createTestHarness(long j, boolean z, long j2, boolean z2) throws Exception {
        return new TestHarness(new IntervalJoinOperator(j, j2, z, z2, (OutputTag) null, (OutputTag) null, TestElem.serializer(), TestElem.serializer(), new PassthroughFunction()), testElem -> {
            return testElem.key;
        }, testElem2 -> {
            return testElem2.key;
        }, TypeInformation.of(String.class));
    }

    private JoinTestBuilder setupHarness(long j, boolean z, long j2, boolean z2, OutputTag<TestElem> outputTag, OutputTag<TestElem> outputTag2) throws Exception {
        IntervalJoinOperator intervalJoinOperator = new IntervalJoinOperator(j, j2, z, z2, outputTag, outputTag2, TestElem.serializer(), TestElem.serializer(), new PassthroughFunction());
        return new JoinTestBuilder(new TestHarness(intervalJoinOperator, testElem -> {
            return testElem.key;
        }, testElem2 -> {
            return testElem2.key;
        }, TypeInformation.of(String.class)), intervalJoinOperator);
    }

    private JoinTestBuilder setupHarness(long j, boolean z, long j2, boolean z2) throws Exception {
        return setupHarness(j, z, j2, z2, null, null);
    }

    private StreamRecord<Tuple2<TestElem, TestElem>> streamRecordOf(long j, long j2) {
        TestElem testElem = new TestElem(j, "lhs");
        TestElem testElem2 = new TestElem(j2, "rhs");
        return new StreamRecord<>(Tuple2.of(testElem, testElem2), Math.max(j, j2));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static StreamRecord<TestElem> createStreamRecord(long j, String str) {
        return new StreamRecord<>(new TestElem(j, str), j);
    }

    private void processElementsAndWatermarks(TestHarness testHarness) throws Exception {
        if (this.lhsFasterThanRhs) {
            for (int i = 1; i <= 4; i++) {
                testHarness.processElement1(createStreamRecord(i, "lhs"));
                testHarness.processWatermark1(new Watermark(i));
            }
            for (int i2 = 1; i2 <= 4; i2++) {
                testHarness.processElement2(createStreamRecord(i2, "rhs"));
                testHarness.processWatermark2(new Watermark(i2));
            }
            return;
        }
        for (int i3 = 1; i3 <= 4; i3++) {
            testHarness.processElement2(createStreamRecord(i3, "rhs"));
            testHarness.processWatermark2(new Watermark(i3));
        }
        for (int i4 = 1; i4 <= 4; i4++) {
            testHarness.processElement1(createStreamRecord(i4, "lhs"));
            testHarness.processWatermark1(new Watermark(i4));
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1887697628:
                if (implMethodName.equals("lambda$testContextCorrectRightTimestamp$aeea360d$1")) {
                    z = 6;
                    break;
                }
                break;
            case -1887697627:
                if (implMethodName.equals("lambda$testContextCorrectRightTimestamp$aeea360d$2")) {
                    z = 7;
                    break;
                }
                break;
            case -1180669982:
                if (implMethodName.equals("lambda$createTestHarness$bd59b2ee$1")) {
                    z = true;
                    break;
                }
                break;
            case -1180669981:
                if (implMethodName.equals("lambda$createTestHarness$bd59b2ee$2")) {
                    z = 3;
                    break;
                }
                break;
            case -688224600:
                if (implMethodName.equals("lambda$testReturnsCorrectTimestamp$aeea360d$1")) {
                    z = false;
                    break;
                }
                break;
            case -688224599:
                if (implMethodName.equals("lambda$testReturnsCorrectTimestamp$aeea360d$2")) {
                    z = 2;
                    break;
                }
                break;
            case -455548965:
                if (implMethodName.equals("lambda$setupHarness$7fba346a$1")) {
                    z = 4;
                    break;
                }
                break;
            case -455548964:
                if (implMethodName.equals("lambda$setupHarness$7fba346a$2")) {
                    z = 5;
                    break;
                }
                break;
            case 1932119139:
                if (implMethodName.equals("lambda$testContextCorrectLeftTimestamp$aeea360d$1")) {
                    z = 8;
                    break;
                }
                break;
            case 1932119140:
                if (implMethodName.equals("lambda$testContextCorrectLeftTimestamp$aeea360d$2")) {
                    z = 9;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest$TestElem;)Ljava/lang/String;")) {
                    return testElem -> {
                        return testElem.key;
                    };
                }
                break;
            case SourceOperatorTestContext.SUBTASK_INDEX /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest$TestElem;)Ljava/lang/String;")) {
                    return testElem2 -> {
                        return testElem2.key;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest$TestElem;)Ljava/lang/String;")) {
                    return testElem22 -> {
                        return testElem22.key;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest$TestElem;)Ljava/lang/String;")) {
                    return testElem23 -> {
                        return testElem23.key;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest$TestElem;)Ljava/lang/String;")) {
                    return testElem3 -> {
                        return testElem3.key;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest$TestElem;)Ljava/lang/String;")) {
                    return testElem24 -> {
                        return testElem24.key;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest$TestElem;)Ljava/lang/String;")) {
                    return testElem4 -> {
                        return testElem4.key;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest$TestElem;)Ljava/lang/String;")) {
                    return testElem25 -> {
                        return testElem25.key;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest$TestElem;)Ljava/lang/String;")) {
                    return testElem5 -> {
                        return testElem5.key;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest$TestElem;)Ljava/lang/String;")) {
                    return testElem26 -> {
                        return testElem26.key;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
