/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.FileExecutionGraphInfoStore;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.ManualTicker;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.shaded.guava30.com.google.common.base.Ticker;
import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class FileExecutionGraphInfoStoreTest
extends TestLogger {
    private static final List<JobStatus> GLOBALLY_TERMINAL_JOB_STATUS = Arrays.stream(JobStatus.values()).filter(JobStatus::isGloballyTerminalState).collect(Collectors.toList());
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testPut() throws IOException {
        this.assertPutJobGraphWithStatus(JobStatus.FINISHED);
    }

    @Test
    public void testPutSuspendedJob() throws IOException {
        this.assertPutJobGraphWithStatus(JobStatus.SUSPENDED);
    }

    @Test
    public void testUnknownGet() throws IOException {
        File rootDir = temporaryFolder.newFolder();
        try (FileExecutionGraphInfoStore executionGraphStore = this.createDefaultExecutionGraphInfoStore(rootDir);){
            Assert.assertThat((Object)executionGraphStore.get(new JobID()), (Matcher)Matchers.nullValue());
        }
    }

    @Test
    public void testStoredJobsOverview() throws IOException {
        int numberExecutionGraphs = 10;
        Collection<ExecutionGraphInfo> executionGraphInfos = this.generateTerminalExecutionGraphInfos(10);
        List jobStatuses = executionGraphInfos.stream().map(ExecutionGraphInfo::getArchivedExecutionGraph).map(ArchivedExecutionGraph::getState).collect(Collectors.toList());
        JobsOverview expectedJobsOverview = JobsOverview.create(jobStatuses);
        File rootDir = temporaryFolder.newFolder();
        try (FileExecutionGraphInfoStore executionGraphInfoStore = this.createDefaultExecutionGraphInfoStore(rootDir);){
            for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
                executionGraphInfoStore.put(executionGraphInfo);
            }
            Assert.assertThat((Object)executionGraphInfoStore.getStoredJobsOverview(), (Matcher)Matchers.equalTo((Object)expectedJobsOverview));
        }
    }

    @Test
    public void testAvailableJobDetails() throws IOException {
        int numberExecutionGraphs = 10;
        Collection<ExecutionGraphInfo> executionGraphInfos = this.generateTerminalExecutionGraphInfos(10);
        Collection<JobDetails> jobDetails = FileExecutionGraphInfoStoreTest.generateJobDetails(executionGraphInfos);
        File rootDir = temporaryFolder.newFolder();
        try (FileExecutionGraphInfoStore executionGraphInfoStore = this.createDefaultExecutionGraphInfoStore(rootDir);){
            for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
                executionGraphInfoStore.put(executionGraphInfo);
            }
            Assert.assertThat((Object)executionGraphInfoStore.getAvailableJobDetails(), (Matcher)Matchers.containsInAnyOrder((Object[])jobDetails.toArray()));
        }
    }

    @Test
    public void testExecutionGraphExpiration() throws Exception {
        File rootDir = temporaryFolder.newFolder();
        Time expirationTime = Time.milliseconds((long)1L);
        ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        ManualTicker manualTicker = new ManualTicker();
        try (FileExecutionGraphInfoStore executionGraphInfoStore = new FileExecutionGraphInfoStore(rootDir, expirationTime, Integer.MAX_VALUE, 10000L, (ScheduledExecutor)scheduledExecutor, (Ticker)manualTicker);){
            ExecutionGraphInfo executionGraphInfo = new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build());
            executionGraphInfoStore.put(executionGraphInfo);
            Assert.assertThat((Object)executionGraphInfoStore.size(), (Matcher)Matchers.equalTo((Object)1));
            manualTicker.advanceTime(expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS);
            scheduledExecutor.triggerScheduledTasks();
            Assert.assertThat((Object)executionGraphInfoStore.size(), (Matcher)Matchers.equalTo((Object)0));
            Assert.assertThat((Object)executionGraphInfoStore.get(executionGraphInfo.getJobId()), (Matcher)Matchers.nullValue());
            File storageDirectory = executionGraphInfoStore.getStorageDir();
            Assert.assertThat((Object)storageDirectory.listFiles().length, (Matcher)Matchers.equalTo((Object)0));
        }
    }

    @Test
    public void testCloseCleansUp() throws IOException {
        File rootDir = temporaryFolder.newFolder();
        Assert.assertThat((Object)rootDir.listFiles().length, (Matcher)Matchers.equalTo((Object)0));
        try (FileExecutionGraphInfoStore executionGraphInfoStore = this.createDefaultExecutionGraphInfoStore(rootDir);){
            Assert.assertThat((Object)rootDir.listFiles().length, (Matcher)Matchers.equalTo((Object)1));
            File storageDirectory = executionGraphInfoStore.getStorageDir();
            Assert.assertThat((Object)storageDirectory.listFiles().length, (Matcher)Matchers.equalTo((Object)0));
            executionGraphInfoStore.put(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build()));
            Assert.assertThat((Object)storageDirectory.listFiles().length, (Matcher)Matchers.equalTo((Object)1));
        }
        Assert.assertThat((Object)rootDir.listFiles().length, (Matcher)Matchers.equalTo((Object)0));
    }

    @Test
    public void testCacheLoading() throws IOException {
        File rootDir = temporaryFolder.newFolder();
        try (FileExecutionGraphInfoStore executionGraphInfoStore = new FileExecutionGraphInfoStore(rootDir, Time.hours((long)1L), Integer.MAX_VALUE, 102400L, TestingUtils.defaultScheduledExecutor(), Ticker.systemTicker());){
            LoadingCache executionGraphInfoCache = executionGraphInfoStore.getExecutionGraphInfoCache();
            ArrayList<ExecutionGraphInfo> executionGraphInfos = new ArrayList<ExecutionGraphInfo>(64);
            boolean continueInserting = true;
            while (continueInserting) {
                ExecutionGraphInfo executionGraphInfo = new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build());
                executionGraphInfoStore.put(executionGraphInfo);
                executionGraphInfos.add(executionGraphInfo);
                continueInserting = executionGraphInfoCache.size() == (long)executionGraphInfos.size();
            }
            File storageDirectory = executionGraphInfoStore.getStorageDir();
            Assert.assertThat((Object)storageDirectory.listFiles().length, (Matcher)Matchers.equalTo((Object)executionGraphInfos.size()));
            for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
                Assert.assertThat((Object)executionGraphInfoStore.get(executionGraphInfo.getJobId()), FileExecutionGraphInfoStoreTest.matchesPartiallyWith(executionGraphInfo));
            }
        }
    }

    @Test
    public void testMaximumCapacity() throws IOException {
        File rootDir = temporaryFolder.newFolder();
        int maxCapacity = 10;
        int numberExecutionGraphs = 10;
        Collection<ExecutionGraphInfo> oldExecutionGraphInfos = this.generateTerminalExecutionGraphInfos(10);
        Collection<ExecutionGraphInfo> newExecutionGraphInfos = this.generateTerminalExecutionGraphInfos(10);
        Collection<JobDetails> jobDetails = FileExecutionGraphInfoStoreTest.generateJobDetails(newExecutionGraphInfos);
        try (FileExecutionGraphInfoStore executionGraphInfoStore = new FileExecutionGraphInfoStore(rootDir, Time.hours((long)1L), 10, 10000L, TestingUtils.defaultScheduledExecutor(), Ticker.systemTicker());){
            for (ExecutionGraphInfo executionGraphInfo : oldExecutionGraphInfos) {
                executionGraphInfoStore.put(executionGraphInfo);
                Assert.assertTrue((executionGraphInfoStore.size() <= 10 ? 1 : 0) != 0);
            }
            for (ExecutionGraphInfo executionGraphInfo : newExecutionGraphInfos) {
                executionGraphInfoStore.put(executionGraphInfo);
                Assert.assertEquals((long)10L, (long)executionGraphInfoStore.size());
            }
            Assert.assertThat((Object)executionGraphInfoStore.getAvailableJobDetails(), (Matcher)Matchers.containsInAnyOrder((Object[])jobDetails.toArray()));
        }
    }

    @Test
    public void testPutSuspendedJobOnClusterShutdown() throws Exception {
        try (PersistingMiniCluster miniCluster = new PersistingMiniCluster(new MiniClusterConfiguration.Builder().build());){
            miniCluster.start();
            JobVertex vertex = new JobVertex("blockingVertex");
            vertex.setParallelism(1);
            vertex.setInvokableClass(SignallingBlockingNoOpInvokable.class);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex);
            miniCluster.submitJob(jobGraph);
            SignallingBlockingNoOpInvokable.LATCH.await();
        }
    }

    private Collection<ExecutionGraphInfo> generateTerminalExecutionGraphInfos(int number) {
        ArrayList<ExecutionGraphInfo> executionGraphInfos = new ArrayList<ExecutionGraphInfo>(number);
        for (int i = 0; i < number; ++i) {
            JobStatus state = GLOBALLY_TERMINAL_JOB_STATUS.get(ThreadLocalRandom.current().nextInt(GLOBALLY_TERMINAL_JOB_STATUS.size()));
            executionGraphInfos.add(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setState(state).build()));
        }
        return executionGraphInfos;
    }

    private FileExecutionGraphInfoStore createDefaultExecutionGraphInfoStore(File storageDirectory) throws IOException {
        return new FileExecutionGraphInfoStore(storageDirectory, Time.hours((long)1L), Integer.MAX_VALUE, 10000L, TestingUtils.defaultScheduledExecutor(), Ticker.systemTicker());
    }

    private void assertPutJobGraphWithStatus(JobStatus jobStatus) throws IOException {
        ExecutionGraphInfo dummyExecutionGraphInfo = new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setState(jobStatus).build());
        File rootDir = temporaryFolder.newFolder();
        try (FileExecutionGraphInfoStore executionGraphStore = this.createDefaultExecutionGraphInfoStore(rootDir);){
            File storageDirectory = executionGraphStore.getStorageDir();
            Assert.assertThat((Object)storageDirectory.listFiles().length, (Matcher)Matchers.equalTo((Object)0));
            executionGraphStore.put(dummyExecutionGraphInfo);
            Assert.assertThat((Object)storageDirectory.listFiles().length, (Matcher)Matchers.equalTo((Object)1));
            Assert.assertThat((Object)executionGraphStore.get(dummyExecutionGraphInfo.getJobId()), (Matcher)new PartialExecutionGraphInfoMatcher(dummyExecutionGraphInfo));
        }
    }

    private static Matcher<ExecutionGraphInfo> matchesPartiallyWith(ExecutionGraphInfo executionGraphInfo) {
        return new PartialExecutionGraphInfoMatcher(executionGraphInfo);
    }

    private static Collection<JobDetails> generateJobDetails(Collection<ExecutionGraphInfo> executionGraphInfos) {
        return executionGraphInfos.stream().map(ExecutionGraphInfo::getArchivedExecutionGraph).map(JobDetails::createDetailsForJob).collect(Collectors.toList());
    }

    private static final class PartialExecutionGraphInfoMatcher
    extends BaseMatcher<ExecutionGraphInfo> {
        private final ExecutionGraphInfo expectedExecutionGraphInfo;

        private PartialExecutionGraphInfoMatcher(ExecutionGraphInfo expectedExecutionGraphInfo) {
            this.expectedExecutionGraphInfo = (ExecutionGraphInfo)Preconditions.checkNotNull((Object)expectedExecutionGraphInfo);
        }

        public boolean matches(Object o) {
            if (this.expectedExecutionGraphInfo == o) {
                return true;
            }
            if (o == null || this.expectedExecutionGraphInfo.getClass() != o.getClass()) {
                return false;
            }
            ExecutionGraphInfo that = (ExecutionGraphInfo)o;
            ArchivedExecutionGraph thisExecutionGraph = this.expectedExecutionGraphInfo.getArchivedExecutionGraph();
            ArchivedExecutionGraph thatExecutionGraph = that.getArchivedExecutionGraph();
            return thisExecutionGraph.isStoppable() == thatExecutionGraph.isStoppable() && Objects.equals(thisExecutionGraph.getJobID(), thatExecutionGraph.getJobID()) && Objects.equals(thisExecutionGraph.getJobName(), thatExecutionGraph.getJobName()) && thisExecutionGraph.getState() == thatExecutionGraph.getState() && Objects.equals(thisExecutionGraph.getJsonPlan(), thatExecutionGraph.getJsonPlan()) && Objects.equals(thisExecutionGraph.getAccumulatorsSerialized(), thatExecutionGraph.getAccumulatorsSerialized()) && Objects.equals(thisExecutionGraph.getCheckpointCoordinatorConfiguration(), thatExecutionGraph.getCheckpointCoordinatorConfiguration()) && thisExecutionGraph.getAllVertices().size() == thatExecutionGraph.getAllVertices().size() && Objects.equals(this.expectedExecutionGraphInfo.getExceptionHistory(), that.getExceptionHistory());
        }

        public void describeTo(Description description) {
            description.appendText("Matches against " + ExecutionGraphInfo.class.getSimpleName() + '.');
        }
    }

    private class PersistingMiniCluster
    extends MiniCluster {
        PersistingMiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
            super(miniClusterConfiguration);
        }

        protected Collection<? extends DispatcherResourceManagerComponent> createDispatcherResourceManagerComponents(Configuration configuration, MiniCluster.RpcServiceFactory rpcServiceFactory, HighAvailabilityServices haServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception {
            DefaultDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory((ResourceManagerFactory)StandaloneResourceManagerFactory.getInstance());
            File rootDir = temporaryFolder.newFolder();
            FileExecutionGraphInfoStore executionGraphInfoStore = FileExecutionGraphInfoStoreTest.this.createDefaultExecutionGraphInfoStore(rootDir);
            return Collections.singleton(dispatcherResourceManagerComponentFactory.create(configuration, this.getIOExecutor(), rpcServiceFactory.createRpcService(), haServices, blobServer, heartbeatServices, metricRegistry, (ExecutionGraphInfoStore)executionGraphInfoStore, metricQueryServiceRetriever, fatalErrorHandler));
        }
    }

    public static class SignallingBlockingNoOpInvokable
    extends AbstractInvokable {
        public static final OneShotLatch LATCH = new OneShotLatch();

        public SignallingBlockingNoOpInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            LATCH.trigger();
            Thread.sleep(Long.MAX_VALUE);
        }
    }
}

