package org.apache.flink.streaming.runtime.tasks;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.class */
public class RestoreStreamTaskTest extends TestLogger {
    private static final Map<OperatorID, Long> RESTORED_OPERATORS = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest$CounterOperator.class */
    private static class CounterOperator extends RestoreWatchOperator<String, String> {
        private static final long serialVersionUID = 2048954179291813243L;
        private ListState<Long> counterState;
        private long counter;

        private CounterOperator() {
            super();
            this.counter = 0L;
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            this.counter++;
            this.output.collect(streamRecord);
        }

        @Override // org.apache.flink.streaming.runtime.tasks.RestoreStreamTaskTest.RestoreWatchOperator
        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            super.initializeState(stateInitializationContext);
            this.counterState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("counter-state", LongSerializer.INSTANCE));
            if (stateInitializationContext.isRestored()) {
                Iterator it = ((Iterable) this.counterState.get()).iterator();
                while (it.hasNext()) {
                    this.counter += ((Long) it.next()).longValue();
                }
                this.counterState.clear();
            }
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
            this.counterState.add(Long.valueOf(this.counter));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest$RestoreWatchOperator.class */
    private static abstract class RestoreWatchOperator<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
        private RestoreWatchOperator() {
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            Assert.assertEquals("Restored context id should be set iff is restored", Boolean.valueOf(stateInitializationContext.isRestored()), Boolean.valueOf(stateInitializationContext.getRestoredCheckpointId().isPresent()));
            if (stateInitializationContext.isRestored()) {
                RestoreStreamTaskTest.RESTORED_OPERATORS.put(getOperatorID(), Long.valueOf(stateInitializationContext.getRestoredCheckpointId().getAsLong()));
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest$StatelessOperator.class */
    private static class StatelessOperator extends RestoreWatchOperator<String, String> {
        private static final long serialVersionUID = 2048954179291813244L;

        private StatelessOperator() {
            super();
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        }
    }

    @Before
    public void setup() {
        RESTORED_OPERATORS.clear();
    }

    @Test
    public void testRestore() throws Exception {
        OperatorID operatorID = new OperatorID(42L, 42L);
        OperatorID operatorID2 = new OperatorID(44L, 44L);
        JobManagerTaskRestore createRunAndCheckpointOperatorChain = createRunAndCheckpointOperatorChain(operatorID, new CounterOperator(), operatorID2, new CounterOperator(), Optional.empty());
        Assert.assertEquals(2L, createRunAndCheckpointOperatorChain.getTaskStateSnapshot().getSubtaskStateMappings().size());
        createRunAndCheckpointOperatorChain(operatorID, new CounterOperator(), operatorID2, new CounterOperator(), Optional.of(createRunAndCheckpointOperatorChain));
        Assert.assertEquals(new HashSet(Arrays.asList(operatorID, operatorID2)), RESTORED_OPERATORS.keySet());
        MatcherAssert.assertThat(new HashSet(RESTORED_OPERATORS.values()), Matchers.contains(new Long[]{Long.valueOf(createRunAndCheckpointOperatorChain.getRestoreCheckpointId())}));
    }

    @Test
    public void testRestoreHeadWithNewId() throws Exception {
        OperatorID operatorID = new OperatorID(44L, 44L);
        JobManagerTaskRestore createRunAndCheckpointOperatorChain = createRunAndCheckpointOperatorChain(new OperatorID(42L, 42L), new CounterOperator(), operatorID, new CounterOperator(), Optional.empty());
        Assert.assertEquals(2L, createRunAndCheckpointOperatorChain.getTaskStateSnapshot().getSubtaskStateMappings().size());
        createRunAndCheckpointOperatorChain(new OperatorID(4242L, 4242L), new CounterOperator(), operatorID, new CounterOperator(), Optional.of(createRunAndCheckpointOperatorChain));
        Assert.assertEquals(Collections.singleton(operatorID), RESTORED_OPERATORS.keySet());
        MatcherAssert.assertThat(new HashSet(RESTORED_OPERATORS.values()), Matchers.contains(new Long[]{Long.valueOf(createRunAndCheckpointOperatorChain.getRestoreCheckpointId())}));
    }

    @Test
    public void testRestoreTailWithNewId() throws Exception {
        OperatorID operatorID = new OperatorID(42L, 42L);
        JobManagerTaskRestore createRunAndCheckpointOperatorChain = createRunAndCheckpointOperatorChain(operatorID, new CounterOperator(), new OperatorID(44L, 44L), new CounterOperator(), Optional.empty());
        Assert.assertEquals(2L, createRunAndCheckpointOperatorChain.getTaskStateSnapshot().getSubtaskStateMappings().size());
        createRunAndCheckpointOperatorChain(operatorID, new CounterOperator(), new OperatorID(4444L, 4444L), new CounterOperator(), Optional.of(createRunAndCheckpointOperatorChain));
        Assert.assertEquals(Collections.singleton(operatorID), RESTORED_OPERATORS.keySet());
        MatcherAssert.assertThat(new HashSet(RESTORED_OPERATORS.values()), Matchers.contains(new Long[]{Long.valueOf(createRunAndCheckpointOperatorChain.getRestoreCheckpointId())}));
    }

    @Test
    public void testRestoreAfterScaleUp() throws Exception {
        OperatorID operatorID = new OperatorID(42L, 42L);
        OperatorID operatorID2 = new OperatorID(44L, 44L);
        JobManagerTaskRestore createRunAndCheckpointOperatorChain = createRunAndCheckpointOperatorChain(operatorID, new CounterOperator(), operatorID2, new CounterOperator(), Optional.empty());
        TaskStateSnapshot taskStateSnapshot = createRunAndCheckpointOperatorChain.getTaskStateSnapshot();
        Assert.assertEquals(2L, taskStateSnapshot.getSubtaskStateMappings().size());
        taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, OperatorSubtaskState.builder().build());
        createRunAndCheckpointOperatorChain(operatorID, new CounterOperator(), operatorID2, new CounterOperator(), Optional.of(createRunAndCheckpointOperatorChain));
        Assert.assertEquals(new HashSet(Arrays.asList(operatorID, operatorID2)), RESTORED_OPERATORS.keySet());
        MatcherAssert.assertThat(new HashSet(RESTORED_OPERATORS.values()), Matchers.contains(new Long[]{Long.valueOf(createRunAndCheckpointOperatorChain.getRestoreCheckpointId())}));
    }

    @Test
    public void testRestoreWithoutState() throws Exception {
        OperatorID operatorID = new OperatorID(42L, 42L);
        OperatorID operatorID2 = new OperatorID(44L, 44L);
        JobManagerTaskRestore createRunAndCheckpointOperatorChain = createRunAndCheckpointOperatorChain(operatorID, new StatelessOperator(), operatorID2, new CounterOperator(), Optional.empty());
        Assert.assertEquals(2L, createRunAndCheckpointOperatorChain.getTaskStateSnapshot().getSubtaskStateMappings().size());
        createRunAndCheckpointOperatorChain(operatorID, new StatelessOperator(), operatorID2, new CounterOperator(), Optional.of(createRunAndCheckpointOperatorChain));
        Assert.assertEquals(new HashSet(Arrays.asList(operatorID, operatorID2)), RESTORED_OPERATORS.keySet());
        MatcherAssert.assertThat(new HashSet(RESTORED_OPERATORS.values()), Matchers.contains(new Long[]{Long.valueOf(createRunAndCheckpointOperatorChain.getRestoreCheckpointId())}));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private JobManagerTaskRestore createRunAndCheckpointOperatorChain(OperatorID operatorID, OneInputStreamOperator<String, String> oneInputStreamOperator, OperatorID operatorID2, OneInputStreamOperator<String, String> oneInputStreamOperator2, Optional<JobManagerTaskRestore> optional) throws Exception {
        OneInputStreamTaskTestHarness<String, String> oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, 1, 1, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOperatorChain(operatorID, (StreamOperator<?>) oneInputStreamOperator).chain(operatorID2, (OneInputStreamOperator) oneInputStreamOperator2, (TypeSerializer) StringSerializer.INSTANCE).finish();
        if (optional.isPresent()) {
            JobManagerTaskRestore jobManagerTaskRestore = optional.get();
            oneInputStreamTaskTestHarness.setTaskStateSnapshot(jobManagerTaskRestore.getRestoreCheckpointId(), jobManagerTaskRestore.getTaskStateSnapshot());
        }
        oneInputStreamTaskTestHarness.invoke(new StreamMockEnvironment(oneInputStreamTaskTestHarness.jobConfig, oneInputStreamTaskTestHarness.taskConfig, oneInputStreamTaskTestHarness.executionConfig, oneInputStreamTaskTestHarness.memorySize, new MockInputSplitProvider(), oneInputStreamTaskTestHarness.bufferSize, oneInputStreamTaskTestHarness.taskStateManager));
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        OneInputStreamTask<String, String> mo166getTask = oneInputStreamTaskTestHarness.mo166getTask();
        processRecords(oneInputStreamTaskTestHarness);
        triggerCheckpoint(oneInputStreamTaskTestHarness, mo166getTask);
        TestTaskStateManager testTaskStateManager = oneInputStreamTaskTestHarness.taskStateManager;
        JobManagerTaskRestore jobManagerTaskRestore2 = new JobManagerTaskRestore(testTaskStateManager.getReportedCheckpointId(), testTaskStateManager.getLastJobManagerTaskStateSnapshot());
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        return jobManagerTaskRestore2;
    }

    private void triggerCheckpoint(OneInputStreamTaskTestHarness<String, String> oneInputStreamTaskTestHarness, OneInputStreamTask<String, String> oneInputStreamTask) throws Exception {
        oneInputStreamTask.triggerCheckpointAsync(new CheckpointMetaData(1L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation());
        oneInputStreamTaskTestHarness.taskStateManager.getWaitForReportLatch().await();
        Assert.assertEquals(1L, oneInputStreamTaskTestHarness.taskStateManager.getReportedCheckpointId());
    }

    private void processRecords(OneInputStreamTaskTestHarness<String, String> oneInputStreamTaskTestHarness) throws Exception {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("10"), 0, 0);
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("20"), 0, 0);
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("30"), 0, 0);
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new StreamRecord("10"));
        concurrentLinkedQueue.add(new StreamRecord("20"));
        concurrentLinkedQueue.add(new StreamRecord("30"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
    }
}
