package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageFactory;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.class */
class TaskExecutorStateChangelogStoragesManagerTest {

    /* loaded from: input_file:org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest$TestStateChangelogStorage.class */
    private static class TestStateChangelogStorage implements StateChangelogStorage<ChangelogStateHandle> {
        public boolean closed;

        private TestStateChangelogStorage() {
            this.closed = false;
        }

        public StateChangelogWriter<ChangelogStateHandle> createWriter(String str, KeyGroupRange keyGroupRange, MailboxExecutor mailboxExecutor) {
            return null;
        }

        public StateChangelogHandleReader<ChangelogStateHandle> createReader() {
            return null;
        }

        public void close() {
            this.closed = true;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest$TestStateChangelogStorageFactory.class */
    private static class TestStateChangelogStorageFactory implements StateChangelogStorageFactory {
        public static String identifier = "test-factory";
        public static PluginManager pluginManager = new PluginManager() { // from class: org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManagerTest.TestStateChangelogStorageFactory.1
            public <P> Iterator<P> load(Class<P> cls) {
                Preconditions.checkArgument(cls.equals(StateChangelogStorageFactory.class));
                return Collections.singletonList(new TestStateChangelogStorageFactory()).iterator();
            }
        };

        private TestStateChangelogStorageFactory() {
        }

        public String getIdentifier() {
            return identifier;
        }

        public StateChangelogStorage<?> createStorage(JobID jobID, Configuration configuration, TaskManagerJobMetricGroup taskManagerJobMetricGroup, LocalRecoveryConfig localRecoveryConfig) {
            return new TestStateChangelogStorage();
        }

        public StateChangelogStorageView<?> createStorageView(Configuration configuration) throws IOException {
            return new TestStateChangelogStorage();
        }
    }

    TaskExecutorStateChangelogStoragesManagerTest() {
    }

    @Test
    void testDuplicatedAllocation() throws IOException {
        TaskExecutorStateChangelogStoragesManager taskExecutorStateChangelogStoragesManager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        JobID jobID = new JobID(1L, 1L);
        StateChangelogStorage stateChangelogStorageForJob = taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat(taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled())).isEqualTo(stateChangelogStorageForJob);
        Assertions.assertThat(taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(new JobID(1L, 2L), configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled())).isNotEqualTo(stateChangelogStorageForJob);
        taskExecutorStateChangelogStoragesManager.shutdown();
    }

    @Test
    void testReleaseForJob() throws IOException {
        StateChangelogStorageLoader.initialize(TestStateChangelogStorageFactory.pluginManager);
        TaskExecutorStateChangelogStoragesManager taskExecutorStateChangelogStoragesManager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, TestStateChangelogStorageFactory.identifier);
        JobID jobID = new JobID(1L, 1L);
        StateChangelogStorage stateChangelogStorageForJob = taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat(stateChangelogStorageForJob).isInstanceOf(TestStateChangelogStorage.class);
        Assertions.assertThat(((TestStateChangelogStorage) stateChangelogStorageForJob).closed).isFalse();
        taskExecutorStateChangelogStoragesManager.releaseResourcesForJob(jobID);
        Assertions.assertThat(((TestStateChangelogStorage) stateChangelogStorageForJob).closed).isTrue();
        Assertions.assertThat(taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled())).isNotEqualTo(stateChangelogStorageForJob);
        taskExecutorStateChangelogStoragesManager.shutdown();
        StateChangelogStorageLoader.initialize((PluginManager) null);
    }

    @Test
    void testConsistencyAmongTask() throws IOException {
        TaskExecutorStateChangelogStoragesManager taskExecutorStateChangelogStoragesManager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, "invalid");
        JobID jobID = new JobID(1L, 1L);
        Assertions.assertThat(taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled())).isNull();
        configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, StateChangelogOptions.STATE_CHANGE_LOG_STORAGE.defaultValue());
        Assertions.assertThat(taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled())).isNull();
        JobID jobID2 = new JobID(1L, 2L);
        StateChangelogStorage stateChangelogStorageForJob = taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID2, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat(stateChangelogStorageForJob).isNotNull();
        configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, "invalid");
        StateChangelogStorage stateChangelogStorageForJob2 = taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID2, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat(stateChangelogStorageForJob2).isNotNull();
        Assertions.assertThat(stateChangelogStorageForJob2).isEqualTo(stateChangelogStorageForJob);
        taskExecutorStateChangelogStoragesManager.shutdown();
    }

    @Test
    void testShutdown() throws IOException {
        StateChangelogStorageLoader.initialize(TestStateChangelogStorageFactory.pluginManager);
        TaskExecutorStateChangelogStoragesManager taskExecutorStateChangelogStoragesManager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, TestStateChangelogStorageFactory.identifier);
        JobID jobID = new JobID(1L, 1L);
        StateChangelogStorage stateChangelogStorageForJob = taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat(stateChangelogStorageForJob).isInstanceOf(TestStateChangelogStorage.class);
        Assertions.assertThat(((TestStateChangelogStorage) stateChangelogStorageForJob).closed).isFalse();
        new JobID(1L, 2L);
        StateChangelogStorage stateChangelogStorageForJob2 = taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat(stateChangelogStorageForJob2).isInstanceOf(TestStateChangelogStorage.class);
        Assertions.assertThat(((TestStateChangelogStorage) stateChangelogStorageForJob2).closed).isFalse();
        taskExecutorStateChangelogStoragesManager.shutdown();
        Assertions.assertThat(((TestStateChangelogStorage) stateChangelogStorageForJob).closed).isTrue();
        Assertions.assertThat(((TestStateChangelogStorage) stateChangelogStorageForJob2).closed).isTrue();
        StateChangelogStorageLoader.initialize((PluginManager) null);
    }
}
