/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.assertj.core.api.HamcrestCondition;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.jupiter.api.Test;

public class AbstractStreamOperatorTest {
    protected KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness() throws Exception {
        return this.createTestHarness(1, 1, 0);
    }

    protected KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness(int maxParalelism, int numSubtasks, int subtaskIndex) throws Exception {
        TestOperator testOperator = new TestOperator();
        return new KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)testOperator, (KeySelector<Tuple2<Integer, String>, Integer>)new TestKeySelector(), (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO, maxParalelism, numSubtasks, subtaskIndex);
    }

    protected <K, IN, OUT> KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> createTestHarness(int maxParalelism, int numSubtasks, int subtaskIndex, OneInputStreamOperator<IN, OUT> testOperator, KeySelector<IN, K> keySelector, TypeInformation<K> keyTypeInfo) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>(testOperator, keySelector, keyTypeInfo, maxParalelism, numSubtasks, subtaskIndex);
    }

    @Test
    void testStateDoesNotInterfere() throws Exception {
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness();){
            testHarness.open();
            testHarness.processElement(new Tuple2((Object)0, (Object)"SET_STATE:HELLO"), 0L);
            testHarness.processElement(new Tuple2((Object)1, (Object)"SET_STATE:CIAO"), 0L);
            testHarness.processElement(new Tuple2((Object)1, (Object)"EMIT_STATE"), 0L);
            testHarness.processElement(new Tuple2((Object)0, (Object)"EMIT_STATE"), 0L);
            Assertions.assertThat(this.extractResult(testHarness)).contains((Object[])new String[]{"ON_ELEMENT:1:CIAO", "ON_ELEMENT:0:HELLO"});
        }
    }

    @Test
    void testEventTimeTimersDontInterfere() throws Exception {
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness();){
            testHarness.open();
            testHarness.processWatermark(0L);
            testHarness.processElement(new Tuple2((Object)1, (Object)"SET_EVENT_TIME_TIMER:20"), 0L);
            testHarness.processElement(new Tuple2((Object)0, (Object)"SET_STATE:HELLO"), 0L);
            testHarness.processElement(new Tuple2((Object)1, (Object)"SET_STATE:CIAO"), 0L);
            testHarness.processElement(new Tuple2((Object)0, (Object)"SET_EVENT_TIME_TIMER:10"), 0L);
            testHarness.processWatermark(10L);
            Assertions.assertThat(this.extractResult(testHarness)).contains((Object[])new String[]{"ON_EVENT_TIME:HELLO"});
            testHarness.processWatermark(20L);
            Assertions.assertThat(this.extractResult(testHarness)).contains((Object[])new String[]{"ON_EVENT_TIME:CIAO"});
        }
    }

    @Test
    void testProcessingTimeTimersDontInterfere() throws Exception {
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness();){
            testHarness.open();
            testHarness.setProcessingTime(0L);
            testHarness.processElement(new Tuple2((Object)1, (Object)"SET_PROC_TIME_TIMER:20"), 0L);
            testHarness.processElement(new Tuple2((Object)0, (Object)"SET_STATE:HELLO"), 0L);
            testHarness.processElement(new Tuple2((Object)1, (Object)"SET_STATE:CIAO"), 0L);
            testHarness.processElement(new Tuple2((Object)0, (Object)"SET_PROC_TIME_TIMER:10"), 0L);
            testHarness.setProcessingTime(10L);
            Assertions.assertThat(this.extractResult(testHarness)).contains((Object[])new String[]{"ON_PROC_TIME:HELLO"});
            testHarness.setProcessingTime(20L);
            Assertions.assertThat(this.extractResult(testHarness)).contains((Object[])new String[]{"ON_PROC_TIME:CIAO"});
        }
    }

    @Test
    void testEnsureProcessingTimeTimerRegisteredOnRestore() throws Exception {
        OperatorSubtaskState snapshot;
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness();){
            testHarness.open();
            testHarness.setProcessingTime(0L);
            testHarness.processElement(new Tuple2((Object)1, (Object)"SET_PROC_TIME_TIMER:20"), 0L);
            testHarness.processElement(new Tuple2((Object)0, (Object)"SET_STATE:HELLO"), 0L);
            testHarness.processElement(new Tuple2((Object)1, (Object)"SET_STATE:CIAO"), 0L);
            testHarness.processElement(new Tuple2((Object)0, (Object)"SET_PROC_TIME_TIMER:10"), 0L);
            snapshot = testHarness.snapshot(0L, 0L);
        }
        var3_2 = null;
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 = this.createTestHarness();){
            testHarness1.setProcessingTime(0L);
            testHarness1.setup();
            testHarness1.initializeState(snapshot);
            testHarness1.open();
            testHarness1.setProcessingTime(10L);
            Assertions.assertThat(this.extractResult(testHarness1)).contains((Object[])new String[]{"ON_PROC_TIME:HELLO"});
            testHarness1.setProcessingTime(20L);
            Assertions.assertThat(this.extractResult(testHarness1)).contains((Object[])new String[]{"ON_PROC_TIME:CIAO"});
        }
        catch (Throwable throwable) {
            var3_2 = throwable;
            throw throwable;
        }
    }

    @Test
    void testProcessingTimeAndEventTimeDontInterfere() throws Exception {
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness();){
            testHarness.open();
            testHarness.setProcessingTime(0L);
            testHarness.processWatermark(0L);
            testHarness.processElement(new Tuple2((Object)0, (Object)"SET_PROC_TIME_TIMER:10"), 0L);
            testHarness.processElement(new Tuple2((Object)0, (Object)"SET_EVENT_TIME_TIMER:20"), 0L);
            testHarness.processElement(new Tuple2((Object)0, (Object)"SET_STATE:HELLO"), 0L);
            testHarness.processWatermark(20L);
            Assertions.assertThat(this.extractResult(testHarness)).contains((Object[])new String[]{"ON_EVENT_TIME:HELLO"});
            testHarness.setProcessingTime(10L);
            Assertions.assertThat(this.extractResult(testHarness)).contains((Object[])new String[]{"ON_PROC_TIME:HELLO"});
        }
    }

    @Test
    void testStateAndTimerStateShufflingScalingUp() throws Exception {
        OperatorSubtaskState snapshot;
        int maxParallelism = 10;
        KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, 4);
        KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, 9);
        int key1 = AbstractStreamOperatorTest.getKeyInKeyGroupRange(subKeyGroupRange1, 10);
        int key2 = AbstractStreamOperatorTest.getKeyInKeyGroupRange(subKeyGroupRange2, 10);
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(10, 1, 0);){
            testHarness.open();
            testHarness.processWatermark(0L);
            testHarness.setProcessingTime(0L);
            testHarness.processElement(new Tuple2((Object)key1, (Object)"SET_EVENT_TIME_TIMER:10"), 0L);
            testHarness.processElement(new Tuple2((Object)key2, (Object)"SET_EVENT_TIME_TIMER:20"), 0L);
            testHarness.processElement(new Tuple2((Object)key1, (Object)"SET_PROC_TIME_TIMER:10"), 0L);
            testHarness.processElement(new Tuple2((Object)key2, (Object)"SET_PROC_TIME_TIMER:20"), 0L);
            testHarness.processElement(new Tuple2((Object)key1, (Object)"SET_STATE:HELLO"), 0L);
            testHarness.processElement(new Tuple2((Object)key2, (Object)"SET_STATE:CIAO"), 0L);
            Assertions.assertThat(this.extractResult(testHarness)).isEmpty();
            snapshot = testHarness.snapshot(0L, 0L);
        }
        OperatorSubtaskState initState1 = AbstractStreamOperatorTestHarness.repartitionOperatorState(snapshot, 10, 1, 2, 0);
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 = this.createTestHarness(10, 2, 0);){
            testHarness1.setup();
            testHarness1.initializeState(initState1);
            testHarness1.open();
            testHarness1.processWatermark(10L);
            Assertions.assertThat(this.extractResult(testHarness1)).contains((Object[])new String[]{"ON_EVENT_TIME:HELLO"});
            Assertions.assertThat(this.extractResult(testHarness1)).isEmpty();
            testHarness1.processWatermark(20L);
            Assertions.assertThat(this.extractResult(testHarness1)).isEmpty();
            testHarness1.setProcessingTime(10L);
            Assertions.assertThat(this.extractResult(testHarness1)).contains((Object[])new String[]{"ON_PROC_TIME:HELLO"});
            Assertions.assertThat(this.extractResult(testHarness1)).isEmpty();
            testHarness1.setProcessingTime(20L);
            Assertions.assertThat(this.extractResult(testHarness1)).isEmpty();
        }
        OperatorSubtaskState initState2 = AbstractStreamOperatorTestHarness.repartitionOperatorState(snapshot, 10, 1, 2, 1);
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness2 = this.createTestHarness(10, 2, 1);){
            testHarness2.setup();
            testHarness2.initializeState(initState2);
            testHarness2.open();
            testHarness2.processWatermark(10L);
            Assertions.assertThat(this.extractResult(testHarness2)).isEmpty();
            testHarness2.processWatermark(20L);
            Assertions.assertThat(this.extractResult(testHarness2)).contains((Object[])new String[]{"ON_EVENT_TIME:CIAO"});
            testHarness2.setProcessingTime(10L);
            Assertions.assertThat(this.extractResult(testHarness2)).isEmpty();
            testHarness2.setProcessingTime(20L);
            Assertions.assertThat(this.extractResult(testHarness2)).contains((Object[])new String[]{"ON_PROC_TIME:CIAO"});
            Assertions.assertThat(this.extractResult(testHarness2)).isEmpty();
        }
    }

    @Test
    void testStateAndTimerStateShufflingScalingDown() throws Exception {
        OperatorSubtaskState snapshot2;
        OperatorSubtaskState snapshot1;
        int maxParallelism = 10;
        KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, 4);
        KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, 9);
        int key1 = AbstractStreamOperatorTest.getKeyInKeyGroupRange(subKeyGroupRange1, 10);
        int key2 = AbstractStreamOperatorTest.getKeyInKeyGroupRange(subKeyGroupRange2, 10);
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 = this.createTestHarness(10, 2, 0);){
            testHarness1.setup();
            testHarness1.open();
            testHarness1.processWatermark(0L);
            testHarness1.setProcessingTime(0L);
            testHarness1.processElement(new Tuple2((Object)key1, (Object)"SET_EVENT_TIME_TIMER:30"), 0L);
            testHarness1.processElement(new Tuple2((Object)key1, (Object)"SET_PROC_TIME_TIMER:30"), 0L);
            testHarness1.processElement(new Tuple2((Object)key1, (Object)"SET_STATE:HELLO"), 0L);
            snapshot1 = testHarness1.snapshot(0L, 0L);
        }
        var9_7 = null;
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness2 = this.createTestHarness(10, 2, 1);){
            testHarness2.setup();
            testHarness2.open();
            testHarness2.processWatermark(0L);
            testHarness2.setProcessingTime(0L);
            testHarness2.processElement(new Tuple2((Object)key2, (Object)"SET_EVENT_TIME_TIMER:40"), 0L);
            testHarness2.processElement(new Tuple2((Object)key2, (Object)"SET_PROC_TIME_TIMER:40"), 0L);
            testHarness2.processElement(new Tuple2((Object)key2, (Object)"SET_STATE:CIAO"), 0L);
            snapshot2 = testHarness2.snapshot(0L, 0L);
        }
        catch (Throwable throwable) {
            var9_7 = throwable;
            throw throwable;
        }
        OperatorSubtaskState repackagedState = AbstractStreamOperatorTestHarness.repackageState(snapshot1, snapshot2);
        OperatorSubtaskState initSubTaskState = AbstractStreamOperatorTestHarness.repartitionOperatorState(repackagedState, 10, 2, 1, 0);
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness3 = this.createTestHarness(10, 1, 0);){
            testHarness3.setup();
            testHarness3.initializeState(initSubTaskState);
            testHarness3.open();
            testHarness3.processWatermark(30L);
            Assertions.assertThat(this.extractResult(testHarness3)).contains((Object[])new String[]{"ON_EVENT_TIME:HELLO"});
            Assertions.assertThat(this.extractResult(testHarness3)).isEmpty();
            testHarness3.processWatermark(40L);
            Assertions.assertThat(this.extractResult(testHarness3)).contains((Object[])new String[]{"ON_EVENT_TIME:CIAO"});
            Assertions.assertThat(this.extractResult(testHarness3)).isEmpty();
            testHarness3.setProcessingTime(30L);
            Assertions.assertThat(this.extractResult(testHarness3)).contains((Object[])new String[]{"ON_PROC_TIME:HELLO"});
            Assertions.assertThat(this.extractResult(testHarness3)).isEmpty();
            testHarness3.setProcessingTime(40L);
            Assertions.assertThat(this.extractResult(testHarness3)).contains((Object[])new String[]{"ON_PROC_TIME:CIAO"});
            Assertions.assertThat(this.extractResult(testHarness3)).isEmpty();
        }
    }

    @Test
    void testCustomRawKeyedStateSnapshotAndRestore() throws Exception {
        OperatorSubtaskState snapshot;
        int maxParallelism = 10;
        boolean numSubtasks = true;
        boolean subtaskIndex = false;
        List<Integer> keyGroupsToWrite = Arrays.asList(2, 3, 8);
        byte[] testSnapshotData = "TEST".getBytes(StandardCharsets.UTF_8);
        CustomRawKeyedStateTestOperator testOperator = new CustomRawKeyedStateTestOperator(testSnapshotData, keyGroupsToWrite);
        try (KeyedOneInputStreamOperatorTestHarness testHarness = this.createTestHarness(10, 1, 0, testOperator, (KeySelector)(KeySelector & Serializable)input -> input, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);){
            testHarness.setup();
            testHarness.open();
            snapshot = testHarness.snapshot(0L, 0L);
        }
        testHarness = this.createTestHarness(10, 1, 0, testOperator, (KeySelector)(KeySelector & Serializable)input -> input, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        var9_8 = null;
        try {
            testHarness.setup();
            testHarness.initializeState(snapshot);
            testHarness.open();
        }
        catch (Throwable throwable) {
            var9_8 = throwable;
            throw throwable;
        }
        finally {
            if (testHarness != null) {
                if (var9_8 != null) {
                    try {
                        testHarness.close();
                    }
                    catch (Throwable throwable) {
                        var9_8.addSuppressed(throwable);
                    }
                } else {
                    testHarness.close();
                }
            }
        }
        Assertions.assertThat((Map)testOperator.restoredRawKeyedState).is((Condition)HamcrestCondition.matching(AbstractStreamOperatorTest.hasRestoredKeyGroupsWith(testSnapshotData, keyGroupsToWrite)));
    }

    @Test
    void testIdleWatermarkHandling() throws Exception {
        WatermarkTestingOperator testOperator = new WatermarkTestingOperator();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeySelector & Serializable dummyKeySelector = (KeySelector & Serializable)l -> 0;
        try (KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator<Long, Long, Long>)testOperator, dummyKeySelector, dummyKeySelector, BasicTypeInfo.INT_TYPE_INFO);){
            testHarness.setup();
            testHarness.open();
            testHarness.processElement1(1L, 1L);
            testHarness.processElement1(3L, 3L);
            testHarness.processElement1(4L, 4L);
            testHarness.processWatermark1(new Watermark(1L));
            Assertions.assertThat(testHarness.getOutput()).isEmpty();
            testHarness.processWatermarkStatus2(WatermarkStatus.IDLE);
            expectedOutput.add(new StreamRecord((Object)1L));
            expectedOutput.add(new Watermark(1L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", expectedOutput, testHarness.getOutput());
            testHarness.processWatermark1(new Watermark(3L));
            expectedOutput.add(new StreamRecord((Object)3L));
            expectedOutput.add(new Watermark(3L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", expectedOutput, testHarness.getOutput());
            testHarness.processWatermarkStatus2(WatermarkStatus.ACTIVE);
            testHarness.processWatermark1(new Watermark(4L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", expectedOutput, testHarness.getOutput());
        }
    }

    @Test
    void testIdlenessForwarding() throws Exception {
        WatermarkTestingOperator testOperator = new WatermarkTestingOperator();
        ConcurrentLinkedQueue<WatermarkStatus> expectedOutput = new ConcurrentLinkedQueue<WatermarkStatus>();
        KeySelector & Serializable dummyKeySelector = (KeySelector & Serializable)l -> 0;
        try (KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator<Long, Long, Long>)testOperator, dummyKeySelector, dummyKeySelector, BasicTypeInfo.INT_TYPE_INFO);){
            testHarness.setup();
            testHarness.open();
            testHarness.processWatermarkStatus1(WatermarkStatus.IDLE);
            testHarness.processWatermarkStatus2(WatermarkStatus.IDLE);
            expectedOutput.add(WatermarkStatus.IDLE);
            TestHarnessUtil.assertOutputEquals("Output was not correct", expectedOutput, testHarness.getOutput());
        }
    }

    @Test
    void testTwoInputsRecordAttributesForwarding() throws Exception {
        WatermarkTestingOperator testOperator = new WatermarkTestingOperator();
        ConcurrentLinkedQueue<RecordAttributes> expectedOutput = new ConcurrentLinkedQueue<RecordAttributes>();
        KeySelector & Serializable dummyKeySelector = (KeySelector & Serializable)l -> 0;
        try (KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator<Long, Long, Long>)testOperator, dummyKeySelector, dummyKeySelector, BasicTypeInfo.INT_TYPE_INFO);){
            testHarness.setup();
            testHarness.open();
            RecordAttributes backlogRecordAttributes = new RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build();
            RecordAttributes nonBacklogRecordAttributes = new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build();
            testHarness.processRecordAttributes1(backlogRecordAttributes);
            testHarness.processRecordAttributes2(backlogRecordAttributes);
            expectedOutput.add(backlogRecordAttributes);
            expectedOutput.add(backlogRecordAttributes);
            TestHarnessUtil.assertOutputEquals("Output was not correct", expectedOutput, testHarness.getOutput());
            testHarness.processRecordAttributes1(nonBacklogRecordAttributes);
            testHarness.processRecordAttributes2(nonBacklogRecordAttributes);
            expectedOutput.add(backlogRecordAttributes);
            expectedOutput.add(nonBacklogRecordAttributes);
            TestHarnessUtil.assertOutputEquals("Output was not correct", expectedOutput, testHarness.getOutput());
        }
    }

    @Test
    void testOneInputRecordAttributesForwarding() throws Exception {
        ConcurrentLinkedQueue<RecordAttributes> expectedOutput = new ConcurrentLinkedQueue<RecordAttributes>();
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness();){
            testHarness.open();
            RecordAttributes backlogRecordAttributes = new RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build();
            RecordAttributes nonBacklogRecordAttributes = new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build();
            testHarness.processRecordAttributes(backlogRecordAttributes);
            testHarness.processRecordAttributes(nonBacklogRecordAttributes);
            expectedOutput.add(backlogRecordAttributes);
            expectedOutput.add(nonBacklogRecordAttributes);
            TestHarnessUtil.assertOutputEquals("Output was not correct", expectedOutput, testHarness.getOutput());
        }
    }

    private <T> List<T> extractResult(OneInputStreamOperatorTestHarness<?, T> testHarness) {
        List streamRecords = testHarness.extractOutputStreamRecords();
        ArrayList<Object> result = new ArrayList<Object>();
        for (StreamRecord in : streamRecords) {
            if (!(in instanceof StreamRecord)) continue;
            result.add(in.getValue());
        }
        testHarness.getOutput().clear();
        return result;
    }

    private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism) {
        Random rand = new Random(System.currentTimeMillis());
        int result = rand.nextInt();
        while (!range.contains(KeyGroupRangeAssignment.assignToKeyGroup((Object)result, (int)maxParallelism))) {
            result = rand.nextInt();
        }
        return result;
    }

    private static Matcher<Map<Integer, byte[]>> hasRestoredKeyGroupsWith(final byte[] testSnapshotData, final List<Integer> writtenKeyGroups) {
        return new TypeSafeMatcher<Map<Integer, byte[]>>(){

            protected boolean matchesSafely(Map<Integer, byte[]> restored) {
                if (restored.size() != writtenKeyGroups.size()) {
                    return false;
                }
                Iterator iterator = writtenKeyGroups.iterator();
                while (iterator.hasNext()) {
                    int writtenKeyGroupId = (Integer)iterator.next();
                    if (Arrays.equals(restored.get(writtenKeyGroupId), testSnapshotData)) continue;
                    return false;
                }
                return true;
            }

            public void describeTo(Description description) {
                description.appendText("Key groups: " + writtenKeyGroups + " with snapshot data " + Arrays.toString(testSnapshotData));
            }
        };
    }

    private static class CustomRawKeyedStateTestOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 1L;
        private final byte[] snapshotBytes;
        private final List<Integer> keyGroupsToWrite;
        private Map<Integer, byte[]> restoredRawKeyedState;

        CustomRawKeyedStateTestOperator(byte[] snapshotBytes, List<Integer> keyGroupsToWrite) {
            this.snapshotBytes = Arrays.copyOf(snapshotBytes, snapshotBytes.length);
            this.keyGroupsToWrite = (List)Preconditions.checkNotNull(keyGroupsToWrite);
        }

        public void processElement(StreamRecord<String> element) throws Exception {
        }

        protected boolean isUsingCustomRawKeyedState() {
            return true;
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            super.snapshotState(context);
            KeyedStateCheckpointOutputStream rawKeyedStateStream = context.getRawKeyedOperatorStateOutput();
            for (int keyGroupId : this.keyGroupsToWrite) {
                rawKeyedStateStream.startNewKeyGroup(keyGroupId);
                rawKeyedStateStream.write(this.snapshotBytes);
            }
            rawKeyedStateStream.close();
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.restoredRawKeyedState = new HashMap<Integer, byte[]>();
            for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) {
                byte[] readBuffer = new byte[this.snapshotBytes.length];
                int ignored = streamProvider.getStream().read(readBuffer);
                this.restoredRawKeyedState.put(streamProvider.getKeyGroupId(), readBuffer);
            }
        }
    }

    private static class TestOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<Tuple2<Integer, String>, String>,
    Triggerable<Integer, VoidNamespace> {
        private static final long serialVersionUID = 1L;
        private transient InternalTimerService<VoidNamespace> timerService;
        private final ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor("state", (TypeSerializer)StringSerializer.INSTANCE);

        private TestOperator() {
        }

        public void open() throws Exception {
            super.open();
            this.timerService = this.getInternalTimerService("test-timers", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
        }

        public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
            String[] command = ((String)((Tuple2)element.getValue()).f1).split(":");
            switch (command[0]) {
                case "SET_STATE": {
                    ((ValueState)this.getPartitionedState((StateDescriptor)this.stateDescriptor)).update((Object)command[1]);
                    break;
                }
                case "DELETE_STATE": {
                    ((ValueState)this.getPartitionedState((StateDescriptor)this.stateDescriptor)).clear();
                    break;
                }
                case "SET_EVENT_TIME_TIMER": {
                    this.timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, Long.parseLong(command[1]));
                    break;
                }
                case "SET_PROC_TIME_TIMER": {
                    this.timerService.registerProcessingTimeTimer((Object)VoidNamespace.INSTANCE, Long.parseLong(command[1]));
                    break;
                }
                case "EMIT_STATE": {
                    String stateValue = (String)((ValueState)this.getPartitionedState((StateDescriptor)this.stateDescriptor)).value();
                    this.output.collect((Object)new StreamRecord((Object)("ON_ELEMENT:" + ((Tuple2)element.getValue()).f0 + ":" + stateValue)));
                    break;
                }
                default: {
                    throw new IllegalArgumentException();
                }
            }
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            String stateValue = (String)((ValueState)this.getPartitionedState((StateDescriptor)this.stateDescriptor)).value();
            this.output.collect((Object)new StreamRecord((Object)("ON_EVENT_TIME:" + stateValue)));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            String stateValue = (String)((ValueState)this.getPartitionedState((StateDescriptor)this.stateDescriptor)).value();
            this.output.collect((Object)new StreamRecord((Object)("ON_PROC_TIME:" + stateValue)));
        }
    }

    private static class WatermarkTestingOperator
    extends AbstractStreamOperator<Long>
    implements TwoInputStreamOperator<Long, Long, Long>,
    Triggerable<Integer, VoidNamespace> {
        private transient InternalTimerService<VoidNamespace> timerService;

        private WatermarkTestingOperator() {
        }

        public void open() throws Exception {
            super.open();
            this.timerService = this.getInternalTimerService("test-timers", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            this.output.collect((Object)new StreamRecord((Object)timer.getTimestamp()));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
        }

        public void processElement1(StreamRecord<Long> element) throws Exception {
            this.timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, ((Long)element.getValue()).longValue());
        }

        public void processElement2(StreamRecord<Long> element) throws Exception {
            this.timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, ((Long)element.getValue()).longValue());
        }
    }

    protected static class TestKeySelector
    implements KeySelector<Tuple2<Integer, String>, Integer> {
        private static final long serialVersionUID = 1L;

        protected TestKeySelector() {
        }

        public Integer getKey(Tuple2<Integer, String> value) throws Exception {
            return (Integer)value.f0;
        }
    }
}

