/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.DefaultExecutionPlanStore;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStoreUtil;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStoreWatcher;
import org.apache.flink.runtime.jobmanager.TestingExecutionPlanListener;
import org.apache.flink.runtime.jobmanager.TestingExecutionPlanStoreWatcher;
import org.apache.flink.runtime.persistence.IntegerResourceVersion;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.persistence.TestingStateHandleStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class DefaultExecutionPlanStoreTest
extends TestLogger {
    private final ExecutionPlan testingExecutionPlan = JobGraphTestUtils.emptyJobGraph();
    private final long timeout = 100L;
    private TestingStateHandleStore.Builder<ExecutionPlan> builder;
    private TestingRetrievableStateStorageHelper<ExecutionPlan> jobGraphStorageHelper;
    private TestingExecutionPlanStoreWatcher testingExecutionPlanStoreWatcher;
    private TestingExecutionPlanListener testingExecutionPlanListener;

    @Before
    public void setup() {
        this.builder = TestingStateHandleStore.newBuilder();
        this.testingExecutionPlanStoreWatcher = new TestingExecutionPlanStoreWatcher();
        this.testingExecutionPlanListener = new TestingExecutionPlanListener();
        this.jobGraphStorageHelper = new TestingRetrievableStateStorageHelper();
    }

    @After
    public void teardown() {
        if (this.testingExecutionPlanStoreWatcher != null) {
            this.testingExecutionPlanStoreWatcher.stop();
        }
    }

    @Test
    public void testRecoverExecutionPlan() throws Exception {
        RetrievableStateHandle<ExecutionPlan> stateHandle = this.jobGraphStorageHelper.store(this.testingExecutionPlan);
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setGetFunction((FunctionWithException<String, RetrievableStateHandle<ExecutionPlan>, Exception>)((FunctionWithException)ignore -> stateHandle)).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        ExecutionPlan recoveredExecutionPlan = executionPlanStore.recoverExecutionPlan(this.testingExecutionPlan.getJobID());
        Assertions.assertThat((Object)recoveredExecutionPlan).isNotNull();
        Assertions.assertThat((Comparable)recoveredExecutionPlan.getJobID()).isEqualTo((Object)this.testingExecutionPlan.getJobID());
    }

    @Test
    public void testRecoverExecutionPlanWhenNotExist() throws Exception {
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setGetFunction((FunctionWithException<String, RetrievableStateHandle<ExecutionPlan>, Exception>)((FunctionWithException)ignore -> {
            throw new StateHandleStore.NotExistException("Not exist exception.");
        })).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        ExecutionPlan recoveredExecutionPlan = executionPlanStore.recoverExecutionPlan(this.testingExecutionPlan.getJobID());
        Assertions.assertThat((Object)recoveredExecutionPlan).isNull();
    }

    @Test
    public void testRecoverExecutionPlanFailedShouldReleaseHandle() throws Exception {
        CompletableFuture releaseFuture = new CompletableFuture();
        FlinkException testException = new FlinkException("Test exception.");
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setGetFunction((FunctionWithException<String, RetrievableStateHandle<ExecutionPlan>, Exception>)((FunctionWithException)ignore -> {
            throw testException;
        })).setReleaseConsumer((ThrowingConsumer<String, Exception>)((ThrowingConsumer)releaseFuture::complete)).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        Assertions.assertThatThrownBy(() -> executionPlanStore.recoverExecutionPlan(this.testingExecutionPlan.getJobID())).hasCause((Throwable)testException);
        String actual = (String)releaseFuture.get(100L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((Comparable)this.testingExecutionPlan.getJobID()).hasToString(actual);
    }

    @Test
    public void testPutExecutionPlanWhenNotExist() throws Exception {
        CompletableFuture addFuture = new CompletableFuture();
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setExistsFunction((FunctionWithException<String, IntegerResourceVersion, Exception>)((FunctionWithException)ignore -> IntegerResourceVersion.notExisting())).setAddFunction((BiFunctionWithException<String, ExecutionPlan, RetrievableStateHandle<ExecutionPlan>, Exception>)((BiFunctionWithException)(ignore, state) -> {
            addFuture.complete(state);
            return this.jobGraphStorageHelper.store((ExecutionPlan)state);
        })).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        executionPlanStore.putExecutionPlan(this.testingExecutionPlan);
        ExecutionPlan actual = (ExecutionPlan)addFuture.get(100L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((Comparable)actual.getJobID()).isEqualTo((Object)this.testingExecutionPlan.getJobID());
    }

    @Test
    public void testPutExecutionPlanWhenAlreadyExist() throws Exception {
        CompletableFuture replaceFuture = new CompletableFuture();
        int resourceVersion = 100;
        AtomicBoolean alreadyExist = new AtomicBoolean(false);
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setExistsFunction((FunctionWithException<String, IntegerResourceVersion, Exception>)((FunctionWithException)ignore -> {
            if (alreadyExist.get()) {
                return IntegerResourceVersion.valueOf((int)100);
            }
            alreadyExist.set(true);
            return IntegerResourceVersion.notExisting();
        })).setAddFunction((BiFunctionWithException<String, ExecutionPlan, RetrievableStateHandle<ExecutionPlan>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((ExecutionPlan)state))).setReplaceConsumer((ThrowingConsumer<Tuple3<String, IntegerResourceVersion, ExecutionPlan>, Exception>)((ThrowingConsumer)replaceFuture::complete)).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        executionPlanStore.putExecutionPlan(this.testingExecutionPlan);
        executionPlanStore.putExecutionPlan(this.testingExecutionPlan);
        Tuple3 actual = (Tuple3)replaceFuture.get(100L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((String)((String)actual.f0)).isEqualTo(this.testingExecutionPlan.getJobID().toString());
        Assertions.assertThat((Comparable)((IntegerResourceVersion)actual.f1)).isEqualTo((Object)IntegerResourceVersion.valueOf((int)100));
        Assertions.assertThat((Comparable)((ExecutionPlan)actual.f2).getJobID()).isEqualTo((Object)this.testingExecutionPlan.getJobID());
    }

    @Test
    public void testGlobalCleanup() throws Exception {
        CompletableFuture removeFuture = new CompletableFuture();
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setAddFunction((BiFunctionWithException<String, ExecutionPlan, RetrievableStateHandle<ExecutionPlan>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((ExecutionPlan)state))).setRemoveFunction((FunctionWithException<String, Boolean, Exception>)((FunctionWithException)name -> removeFuture.complete(JobID.fromHexString((String)name)))).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        executionPlanStore.putExecutionPlan(this.testingExecutionPlan);
        executionPlanStore.globalCleanupAsync(this.testingExecutionPlan.getJobID(), Executors.directExecutor()).join();
        JobID actual = (JobID)removeFuture.get(100L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((Comparable)actual).isEqualTo((Object)this.testingExecutionPlan.getJobID());
    }

    @Test
    public void testGlobalCleanupWithNonExistName() throws Exception {
        CompletableFuture removeFuture = new CompletableFuture();
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setRemoveFunction((FunctionWithException<String, Boolean, Exception>)((FunctionWithException)name -> removeFuture.complete(JobID.fromHexString((String)name)))).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        executionPlanStore.globalCleanupAsync(this.testingExecutionPlan.getJobID(), Executors.directExecutor()).join();
        Assertions.assertThat(removeFuture).isDone();
    }

    @Test
    public void testGlobalCleanupFailsIfRemovalReturnsFalse() throws Exception {
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setRemoveFunction((FunctionWithException<String, Boolean, Exception>)((FunctionWithException)name -> false)).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        Assertions.assertThatThrownBy(() -> executionPlanStore.globalCleanupAsync(this.testingExecutionPlan.getJobID(), Executors.directExecutor()).get()).isInstanceOf(ExecutionException.class);
    }

    @Test
    public void testGetJobIds() throws Exception {
        List<JobID> existingJobIds = Arrays.asList(new JobID(0L, 0L), new JobID(0L, 1L));
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setGetAllHandlesSupplier((SupplierWithException<Collection<String>, Exception>)((SupplierWithException)() -> existingJobIds.stream().map(AbstractID::toString).collect(Collectors.toList()))).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        Collection jobIds = executionPlanStore.getJobIds();
        Assertions.assertThat((Collection)jobIds).containsAll(existingJobIds);
    }

    @Test
    public void testOnAddedExecutionPlanShouldNotProcessKnownExecutionPlans() throws Exception {
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setAddFunction((BiFunctionWithException<String, ExecutionPlan, RetrievableStateHandle<ExecutionPlan>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((ExecutionPlan)state))).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        executionPlanStore.putExecutionPlan(this.testingExecutionPlan);
        this.testingExecutionPlanStoreWatcher.addExecutionPlan(this.testingExecutionPlan.getJobID());
        Assertions.assertThat(this.testingExecutionPlanListener.getAddedExecutionPlans()).isEmpty();
    }

    @Test
    public void testOnAddedExecutionPlanShouldOnlyProcessUnknownExecutionPlans() throws Exception {
        RetrievableStateHandle<ExecutionPlan> stateHandle = this.jobGraphStorageHelper.store(this.testingExecutionPlan);
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setGetFunction((FunctionWithException<String, RetrievableStateHandle<ExecutionPlan>, Exception>)((FunctionWithException)ignore -> stateHandle)).setAddFunction((BiFunctionWithException<String, ExecutionPlan, RetrievableStateHandle<ExecutionPlan>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((ExecutionPlan)state))).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        executionPlanStore.recoverExecutionPlan(this.testingExecutionPlan.getJobID());
        this.testingExecutionPlanStoreWatcher.addExecutionPlan(this.testingExecutionPlan.getJobID());
        JobID unknownJobId = JobID.generate();
        this.testingExecutionPlanStoreWatcher.addExecutionPlan(unknownJobId);
        Assertions.assertThat(this.testingExecutionPlanListener.getAddedExecutionPlans()).hasSize(1);
        Assertions.assertThat(this.testingExecutionPlanListener.getAddedExecutionPlans()).contains((Object[])new JobID[]{unknownJobId});
    }

    @Test
    public void testOnRemovedExecutionPlanShouldOnlyProcessKnownExecutionPlans() throws Exception {
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setAddFunction((BiFunctionWithException<String, ExecutionPlan, RetrievableStateHandle<ExecutionPlan>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((ExecutionPlan)state))).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        executionPlanStore.putExecutionPlan(this.testingExecutionPlan);
        this.testingExecutionPlanStoreWatcher.removeExecutionPlan(JobID.generate());
        this.testingExecutionPlanStoreWatcher.removeExecutionPlan(this.testingExecutionPlan.getJobID());
        Assertions.assertThat(this.testingExecutionPlanListener.getRemovedExecutionPlans()).hasSize(1);
        Assertions.assertThat(this.testingExecutionPlanListener.getRemovedExecutionPlans()).contains((Object[])new JobID[]{this.testingExecutionPlan.getJobID()});
    }

    @Test
    public void testOnRemovedExecutionPlanShouldNotProcessUnknownExecutionPlans() throws Exception {
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setAddFunction((BiFunctionWithException<String, ExecutionPlan, RetrievableStateHandle<ExecutionPlan>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((ExecutionPlan)state))).build();
        this.createAndStartExecutionPlanStore(stateHandleStore);
        this.testingExecutionPlanStoreWatcher.removeExecutionPlan(this.testingExecutionPlan.getJobID());
        Assertions.assertThat(this.testingExecutionPlanListener.getRemovedExecutionPlans()).isEmpty();
    }

    @Test
    public void testOnAddedExecutionPlanIsIgnoredAfterBeingStop() throws Exception {
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setAddFunction((BiFunctionWithException<String, ExecutionPlan, RetrievableStateHandle<ExecutionPlan>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((ExecutionPlan)state))).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        executionPlanStore.stop();
        this.testingExecutionPlanStoreWatcher.addExecutionPlan(this.testingExecutionPlan.getJobID());
        Assertions.assertThat(this.testingExecutionPlanListener.getAddedExecutionPlans()).isEmpty();
    }

    @Test
    public void testOnRemovedExecutionPlanIsIgnoredAfterBeingStop() throws Exception {
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setAddFunction((BiFunctionWithException<String, ExecutionPlan, RetrievableStateHandle<ExecutionPlan>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((ExecutionPlan)state))).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        executionPlanStore.putExecutionPlan(this.testingExecutionPlan);
        executionPlanStore.stop();
        this.testingExecutionPlanStoreWatcher.removeExecutionPlan(this.testingExecutionPlan.getJobID());
        Assertions.assertThat(this.testingExecutionPlanListener.getRemovedExecutionPlans()).isEmpty();
    }

    @Test
    public void testStoppingExecutionPlanStoreShouldReleaseAllHandles() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setReleaseAllHandlesRunnable(() -> completableFuture.complete(null)).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        executionPlanStore.stop();
        Assertions.assertThat(completableFuture).isDone();
    }

    @Test
    public void testLocalCleanupShouldReleaseHandle() throws Exception {
        CompletableFuture releaseFuture = new CompletableFuture();
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setReleaseConsumer((ThrowingConsumer<String, Exception>)((ThrowingConsumer)releaseFuture::complete)).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        executionPlanStore.putExecutionPlan(this.testingExecutionPlan);
        executionPlanStore.localCleanupAsync(this.testingExecutionPlan.getJobID(), Executors.directExecutor()).join();
        String actual = (String)releaseFuture.get();
        Assertions.assertThat((Comparable)this.testingExecutionPlan.getJobID()).hasToString(actual);
    }

    @Test
    public void testRecoverPersistedJobResourceRequirements() throws Exception {
        HashMap handles = new HashMap();
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setAddFunction((BiFunctionWithException<String, ExecutionPlan, RetrievableStateHandle<ExecutionPlan>, Exception>)((BiFunctionWithException)(key, state) -> {
            RetrievableStateHandle<ExecutionPlan> handle = this.jobGraphStorageHelper.store((ExecutionPlan)state);
            handles.put(key, handle);
            return handle;
        })).setGetFunction((FunctionWithException<String, RetrievableStateHandle<ExecutionPlan>, Exception>)((FunctionWithException)key -> {
            RetrievableStateHandle handle = (RetrievableStateHandle)handles.get(key);
            if (handle != null) {
                return handle;
            }
            throw new StateHandleStore.NotExistException("Does not exist.");
        })).build();
        JobResourceRequirements jobResourceRequirements = JobResourceRequirements.newBuilder().setParallelismForJobVertex(new JobVertexID(), 1, 1).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        executionPlanStore.putExecutionPlan(this.testingExecutionPlan);
        executionPlanStore.putJobResourceRequirements(this.testingExecutionPlan.getJobID(), jobResourceRequirements);
        DefaultExecutionPlanStoreTest.assertStoredRequirementsAre(executionPlanStore, this.testingExecutionPlan.getJobID(), jobResourceRequirements);
        JobResourceRequirements updatedJobResourceRequirements = JobResourceRequirements.newBuilder().setParallelismForJobVertex(new JobVertexID(), 1, 1).build();
        executionPlanStore.putJobResourceRequirements(this.testingExecutionPlan.getJobID(), updatedJobResourceRequirements);
        DefaultExecutionPlanStoreTest.assertStoredRequirementsAre(executionPlanStore, this.testingExecutionPlan.getJobID(), updatedJobResourceRequirements);
    }

    private static void assertStoredRequirementsAre(ExecutionPlanStore executionPlanStore, JobID jobId, JobResourceRequirements expected) throws Exception {
        Optional maybeRecovered = JobResourceRequirements.readFromExecutionPlan((ExecutionPlan)((ExecutionPlan)Objects.requireNonNull((JobGraph)executionPlanStore.recoverExecutionPlan(jobId))));
        Assertions.assertThat((Optional)maybeRecovered).get().isEqualTo((Object)expected);
    }

    @Test
    public void testPutJobResourceRequirementsOfNonExistentJob() throws Exception {
        TestingStateHandleStore<ExecutionPlan> stateHandleStore = this.builder.setGetFunction((FunctionWithException<String, RetrievableStateHandle<ExecutionPlan>, Exception>)((FunctionWithException)ignore -> {
            throw new StateHandleStore.NotExistException("Does not exist.");
        })).build();
        ExecutionPlanStore executionPlanStore = this.createAndStartExecutionPlanStore(stateHandleStore);
        Assertions.assertThatThrownBy(() -> executionPlanStore.putJobResourceRequirements(new JobID(), JobResourceRequirements.empty())).isInstanceOf(NoSuchElementException.class);
    }

    private ExecutionPlanStore createAndStartExecutionPlanStore(TestingStateHandleStore<ExecutionPlan> stateHandleStore) throws Exception {
        DefaultExecutionPlanStore executionPlanStore = new DefaultExecutionPlanStore(stateHandleStore, (ExecutionPlanStoreWatcher)this.testingExecutionPlanStoreWatcher, new ExecutionPlanStoreUtil(){

            public String jobIDToName(JobID jobId) {
                return jobId.toString();
            }

            public JobID nameToJobID(String name) {
                return JobID.fromHexString((String)name);
            }
        });
        executionPlanStore.start((ExecutionPlanStore.ExecutionPlanListener)this.testingExecutionPlanListener);
        return executionPlanStore;
    }
}

