package org.apache.flink.streaming.api.operators;

import java.io.Closeable;
import java.util.Collections;
import java.util.Random;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateHandleDummyUtil;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.state.TestTaskLocalStateStore;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.clock.SystemClock;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.class */
class StreamTaskStateInitializerImplTest {
    StreamTaskStateInitializerImplTest() {
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    void testNoRestore() throws Exception {
        StreamTaskStateInitializer streamTaskStateManager = streamTaskStateManager((MemoryStateBackend) Mockito.spy(new MemoryStateBackend(StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE)), null, new SubTaskInitializationMetricsBuilder(SystemClock.getInstance().absoluteTimeMillis()), true);
        OperatorID operatorID = new OperatorID(47L, 11L);
        AbstractStreamOperator abstractStreamOperator = (AbstractStreamOperator) Mockito.mock(AbstractStreamOperator.class);
        Mockito.when(abstractStreamOperator.getOperatorID()).thenReturn(operatorID);
        IntSerializer intSerializer = new IntSerializer();
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        StreamOperatorStateContext streamOperatorStateContext = streamTaskStateManager.streamOperatorStateContext(abstractStreamOperator.getOperatorID(), abstractStreamOperator.getClass().getSimpleName(), new TestProcessingTimeService(), abstractStreamOperator, intSerializer, closeableRegistry, new UnregisteredMetricsGroup(), 1.0d, false);
        OperatorStateBackend operatorStateBackend = streamOperatorStateContext.operatorStateBackend();
        CheckpointableKeyedStateBackend keyedStateBackend = streamOperatorStateContext.keyedStateBackend();
        InternalTimeServiceManager internalTimerServiceManager = streamOperatorStateContext.internalTimerServiceManager();
        CloseableIterable rawKeyedStateInputs = streamOperatorStateContext.rawKeyedStateInputs();
        CloseableIterable rawOperatorStateInputs = streamOperatorStateContext.rawOperatorStateInputs();
        ((AbstractBooleanAssert) Assertions.assertThat(streamOperatorStateContext.isRestored()).as("Expected the context to NOT be restored", new Object[0])).isFalse();
        Assertions.assertThat(operatorStateBackend).isNotNull();
        Assertions.assertThat(keyedStateBackend).isNotNull();
        Assertions.assertThat(internalTimerServiceManager).isNotNull();
        Assertions.assertThat(rawKeyedStateInputs).isNotNull();
        Assertions.assertThat(rawOperatorStateInputs).isNotNull();
        checkCloseablesRegistered(closeableRegistry, operatorStateBackend, keyedStateBackend, rawKeyedStateInputs, rawOperatorStateInputs);
        Assertions.assertThat(rawKeyedStateInputs.iterator()).isExhausted();
        Assertions.assertThat(rawOperatorStateInputs.iterator()).isExhausted();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    void testWithRestore() throws Exception {
        StateBackend stateBackend = (StateBackend) Mockito.spy(new StateBackend() { // from class: org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImplTest.1
            /* renamed from: createKeyedStateBackend, reason: merged with bridge method [inline-methods] */
            public <K> AbstractKeyedStateBackend<K> m51createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> keyedStateBackendParameters) throws Exception {
                return (AbstractKeyedStateBackend) Mockito.mock(AbstractKeyedStateBackend.class);
            }

            public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters operatorStateBackendParameters) throws Exception {
                return (OperatorStateBackend) Mockito.mock(OperatorStateBackend.class);
            }
        });
        OperatorID operatorID = new OperatorID(47L, 11L);
        TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
        Random random = new Random(66L);
        OperatorSubtaskState build = OperatorSubtaskState.builder().setManagedOperatorState(new OperatorStreamStateHandle(Collections.singletonMap("a", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)), CheckpointTestUtils.createDummyStreamStateHandle(random, (String) null))).setRawOperatorState(new OperatorStreamStateHandle(Collections.singletonMap("_default_", new OperatorStateHandle.StateMetaInfo(new long[]{0, 20, 30}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)), CheckpointTestUtils.createDummyStreamStateHandle(random, (String) null))).setManagedKeyedState(CheckpointTestUtils.createDummyKeyGroupStateHandle(random, (String) null)).setRawKeyedState(CheckpointTestUtils.createDummyKeyGroupStateHandle(random, (String) null)).setInputChannelState(StateObjectCollection.singleton(StateHandleDummyUtil.createNewInputChannelStateHandle(10, random))).setResultSubpartitionState(StateObjectCollection.singleton(StateHandleDummyUtil.createNewResultSubpartitionStateHandle(10, random))).build();
        taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, build);
        JobManagerTaskRestore jobManagerTaskRestore = new JobManagerTaskRestore(42L, taskStateSnapshot);
        SubTaskInitializationMetricsBuilder subTaskInitializationMetricsBuilder = new SubTaskInitializationMetricsBuilder(SystemClock.getInstance().absoluteTimeMillis());
        StreamTaskStateInitializer streamTaskStateManager = streamTaskStateManager(stateBackend, jobManagerTaskRestore, subTaskInitializationMetricsBuilder, false);
        AbstractStreamOperator abstractStreamOperator = (AbstractStreamOperator) Mockito.mock(AbstractStreamOperator.class);
        Mockito.when(abstractStreamOperator.getOperatorID()).thenReturn(operatorID);
        IntSerializer intSerializer = new IntSerializer();
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        StreamOperatorStateContext streamOperatorStateContext = streamTaskStateManager.streamOperatorStateContext(abstractStreamOperator.getOperatorID(), abstractStreamOperator.getClass().getSimpleName(), new TestProcessingTimeService(), abstractStreamOperator, intSerializer, closeableRegistry, new UnregisteredMetricsGroup(), 1.0d, false);
        OperatorStateBackend operatorStateBackend = streamOperatorStateContext.operatorStateBackend();
        CheckpointableKeyedStateBackend keyedStateBackend = streamOperatorStateContext.keyedStateBackend();
        InternalTimeServiceManager internalTimerServiceManager = streamOperatorStateContext.internalTimerServiceManager();
        CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = streamOperatorStateContext.rawKeyedStateInputs();
        CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = streamOperatorStateContext.rawOperatorStateInputs();
        ((AbstractBooleanAssert) Assertions.assertThat(streamOperatorStateContext.isRestored()).as("Expected the context to be restored", new Object[0])).isTrue();
        Assertions.assertThat(streamOperatorStateContext.getRestoredCheckpointId()).hasValue(42L);
        Assertions.assertThat(operatorStateBackend).isNotNull();
        Assertions.assertThat(keyedStateBackend).isNotNull();
        Assertions.assertThat(internalTimerServiceManager).isNull();
        Assertions.assertThat(rawKeyedStateInputs).isNotNull();
        Assertions.assertThat(rawOperatorStateInputs).isNotNull();
        int i = 0;
        for (KeyGroupStatePartitionStreamProvider keyGroupStatePartitionStreamProvider : rawKeyedStateInputs) {
            i++;
        }
        Assertions.assertThat(i).isOne();
        int i2 = 0;
        for (StatePartitionStreamProvider statePartitionStreamProvider : rawOperatorStateInputs) {
            i2++;
        }
        Assertions.assertThat(i2).isEqualTo(3);
        Assertions.assertThat(subTaskInitializationMetricsBuilder.build().getDurationMetrics()).hasSize(2).containsEntry("RestoredStateSizeBytes." + StateObject.StateObjectLocation.LOCAL_MEMORY.name(), Long.valueOf(Stream.of((Object[]) new Stream[]{build.getManagedOperatorState().stream(), build.getManagedKeyedState().stream(), build.getRawKeyedState().stream(), build.getRawOperatorState().stream()}).flatMap(stream -> {
            return stream;
        }).mapToLong((v0) -> {
            return v0.getStateSize();
        }).sum())).containsEntry("RestoredStateSizeBytes." + StateObject.StateObjectLocation.UNKNOWN.name(), Long.valueOf(Stream.concat(build.getInputChannelState().stream(), build.getResultSubpartitionState().stream()).mapToLong((v0) -> {
            return v0.getStateSize();
        }).sum()));
        checkCloseablesRegistered(closeableRegistry, operatorStateBackend, keyedStateBackend, rawKeyedStateInputs, rawOperatorStateInputs);
    }

    private static void checkCloseablesRegistered(CloseableRegistry closeableRegistry, Closeable... closeableArr) {
        for (Closeable closeable : closeableArr) {
            Assertions.assertThat(closeableRegistry.unregisterCloseable(closeable)).isTrue();
        }
    }

    private StreamTaskStateInitializer streamTaskStateManager(StateBackend stateBackend, JobManagerTaskRestore jobManagerTaskRestore, SubTaskInitializationMetricsBuilder subTaskInitializationMetricsBuilder, boolean z) {
        JobID jobID = new JobID(42L, 43L);
        ExecutionAttemptID createExecutionAttemptId = ExecutionGraphTestUtils.createExecutionAttemptId();
        TaskStateManagerImpl taskStateManagerImpl = new TaskStateManagerImpl(jobID, createExecutionAttemptId, new TestTaskLocalStateStore(), (FileMergingSnapshotManager) null, new InMemoryStateChangelogStorage(), new TaskExecutorStateChangelogStoragesManager(), jobManagerTaskRestore, new TestCheckpointResponder());
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test-task", 1, createExecutionAttemptId.getExecutionVertexId().getSubtaskIndex());
        dummyEnvironment.setTaskStateManager(taskStateManagerImpl);
        return z ? new StreamTaskStateInitializerImpl(dummyEnvironment, stateBackend) : new StreamTaskStateInitializerImpl(dummyEnvironment, stateBackend, subTaskInitializationMetricsBuilder, TtlTimeProvider.DEFAULT, new InternalTimeServiceManager.Provider() { // from class: org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImplTest.2
            public <K> InternalTimeServiceManager<K> create(TaskIOMetricGroup taskIOMetricGroup, CheckpointableKeyedStateBackend<K> checkpointableKeyedStateBackend, ClassLoader classLoader, KeyContext keyContext, ProcessingTimeService processingTimeService, Iterable<KeyGroupStatePartitionStreamProvider> iterable, StreamTaskCancellationContext streamTaskCancellationContext) throws Exception {
                return null;
            }
        }, StreamTaskCancellationContext.alwaysRunning());
    }
}
