/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStoreUtil;
import org.apache.flink.runtime.jobmanager.JobGraphStoreWatcher;
import org.apache.flink.runtime.jobmanager.TestingJobGraphListener;
import org.apache.flink.runtime.jobmanager.TestingJobGraphStoreWatcher;
import org.apache.flink.runtime.persistence.IntegerResourceVersion;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.persistence.TestingStateHandleStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DefaultJobGraphStoreTest
extends TestLogger {
    private final JobGraph testingJobGraph = JobGraphTestUtils.emptyJobGraph();
    private final long timeout = 100L;
    private TestingStateHandleStore.Builder<JobGraph> builder;
    private TestingRetrievableStateStorageHelper<JobGraph> jobGraphStorageHelper;
    private TestingJobGraphStoreWatcher testingJobGraphStoreWatcher;
    private TestingJobGraphListener testingJobGraphListener;

    @Before
    public void setup() {
        this.builder = TestingStateHandleStore.builder();
        this.testingJobGraphStoreWatcher = new TestingJobGraphStoreWatcher();
        this.testingJobGraphListener = new TestingJobGraphListener();
        this.jobGraphStorageHelper = new TestingRetrievableStateStorageHelper();
    }

    @After
    public void teardown() {
        if (this.testingJobGraphStoreWatcher != null) {
            this.testingJobGraphStoreWatcher.stop();
        }
    }

    @Test
    public void testRecoverJobGraph() throws Exception {
        RetrievableStateHandle<JobGraph> stateHandle = this.jobGraphStorageHelper.store(this.testingJobGraph);
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setGetFunction((FunctionWithException<String, RetrievableStateHandle<JobGraph>, Exception>)((FunctionWithException)ignore -> stateHandle)).build();
        JobGraphStore jobGraphStore = this.createAndStartJobGraphStore(stateHandleStore);
        JobGraph recoveredJobGraph = jobGraphStore.recoverJobGraph(this.testingJobGraph.getJobID());
        Assert.assertThat((Object)recoveredJobGraph, (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        Assert.assertThat((Object)recoveredJobGraph.getJobID(), (Matcher)Matchers.is((Object)this.testingJobGraph.getJobID()));
    }

    @Test
    public void testRecoverJobGraphWhenNotExist() throws Exception {
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setGetFunction((FunctionWithException<String, RetrievableStateHandle<JobGraph>, Exception>)((FunctionWithException)ignore -> {
            throw new StateHandleStore.NotExistException("Not exist exception.");
        })).build();
        JobGraphStore jobGraphStore = this.createAndStartJobGraphStore(stateHandleStore);
        JobGraph recoveredJobGraph = jobGraphStore.recoverJobGraph(this.testingJobGraph.getJobID());
        Assert.assertThat((Object)recoveredJobGraph, (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
    }

    @Test
    public void testRecoverJobGraphFailedShouldReleaseHandle() throws Exception {
        CompletableFuture releaseFuture = new CompletableFuture();
        FlinkException testException = new FlinkException("Test exception.");
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setGetFunction((FunctionWithException<String, RetrievableStateHandle<JobGraph>, Exception>)((FunctionWithException)ignore -> {
            throw testException;
        })).setReleaseConsumer((ThrowingConsumer<String, Exception>)((ThrowingConsumer)releaseFuture::complete)).build();
        JobGraphStore jobGraphStore = this.createAndStartJobGraphStore(stateHandleStore);
        try {
            jobGraphStore.recoverJobGraph(this.testingJobGraph.getJobID());
            Assert.fail((String)"recoverJobGraph should fail when there is exception in getting the state handle.");
        }
        catch (Exception ex) {
            Assert.assertThat((Object)ex, (Matcher)FlinkMatchers.containsCause((Throwable)testException));
            String actual = (String)releaseFuture.get(100L, TimeUnit.MILLISECONDS);
            Assert.assertThat((Object)actual, (Matcher)Matchers.is((Object)this.testingJobGraph.getJobID().toString()));
        }
    }

    @Test
    public void testPutJobGraphWhenNotExist() throws Exception {
        CompletableFuture addFuture = new CompletableFuture();
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setExistsFunction((FunctionWithException<String, IntegerResourceVersion, Exception>)((FunctionWithException)ignore -> IntegerResourceVersion.notExisting())).setAddFunction((BiFunctionWithException<String, JobGraph, RetrievableStateHandle<JobGraph>, Exception>)((BiFunctionWithException)(ignore, state) -> {
            addFuture.complete(state);
            return this.jobGraphStorageHelper.store((JobGraph)state);
        })).build();
        JobGraphStore jobGraphStore = this.createAndStartJobGraphStore(stateHandleStore);
        jobGraphStore.putJobGraph(this.testingJobGraph);
        JobGraph actual = (JobGraph)addFuture.get(100L, TimeUnit.MILLISECONDS);
        Assert.assertThat((Object)actual.getJobID(), (Matcher)Matchers.is((Object)this.testingJobGraph.getJobID()));
    }

    @Test
    public void testPutJobGraphWhenAlreadyExist() throws Exception {
        CompletableFuture replaceFuture = new CompletableFuture();
        int resourceVersion = 100;
        AtomicBoolean alreadyExist = new AtomicBoolean(false);
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setExistsFunction((FunctionWithException<String, IntegerResourceVersion, Exception>)((FunctionWithException)ignore -> {
            if (alreadyExist.get()) {
                return IntegerResourceVersion.valueOf((int)100);
            }
            alreadyExist.set(true);
            return IntegerResourceVersion.notExisting();
        })).setAddFunction((BiFunctionWithException<String, JobGraph, RetrievableStateHandle<JobGraph>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((JobGraph)state))).setReplaceConsumer((ThrowingConsumer<Tuple3<String, IntegerResourceVersion, JobGraph>, Exception>)((ThrowingConsumer)replaceFuture::complete)).build();
        JobGraphStore jobGraphStore = this.createAndStartJobGraphStore(stateHandleStore);
        jobGraphStore.putJobGraph(this.testingJobGraph);
        jobGraphStore.putJobGraph(this.testingJobGraph);
        Tuple3 actual = (Tuple3)replaceFuture.get(100L, TimeUnit.MILLISECONDS);
        Assert.assertThat((Object)actual.f0, (Matcher)Matchers.is((Object)this.testingJobGraph.getJobID().toString()));
        Assert.assertThat((Object)actual.f1, (Matcher)Matchers.is((Object)IntegerResourceVersion.valueOf((int)100)));
        Assert.assertThat((Object)((JobGraph)actual.f2).getJobID(), (Matcher)Matchers.is((Object)this.testingJobGraph.getJobID()));
    }

    @Test
    public void testRemoveJobGraph() throws Exception {
        CompletableFuture removeFuture = new CompletableFuture();
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setAddFunction((BiFunctionWithException<String, JobGraph, RetrievableStateHandle<JobGraph>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((JobGraph)state))).setRemoveFunction((FunctionWithException<String, Boolean, Exception>)((FunctionWithException)name -> removeFuture.complete(JobID.fromHexString((String)name)))).build();
        JobGraphStore jobGraphStore = this.createAndStartJobGraphStore(stateHandleStore);
        jobGraphStore.putJobGraph(this.testingJobGraph);
        jobGraphStore.removeJobGraph(this.testingJobGraph.getJobID());
        JobID actual = (JobID)removeFuture.get(100L, TimeUnit.MILLISECONDS);
        Assert.assertThat((Object)actual, (Matcher)Matchers.is((Object)this.testingJobGraph.getJobID()));
    }

    @Test
    public void testRemoveJobGraphWithNonExistName() throws Exception {
        CompletableFuture removeFuture = new CompletableFuture();
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setRemoveFunction((FunctionWithException<String, Boolean, Exception>)((FunctionWithException)name -> removeFuture.complete(JobID.fromHexString((String)name)))).build();
        JobGraphStore jobGraphStore = this.createAndStartJobGraphStore(stateHandleStore);
        jobGraphStore.removeJobGraph(this.testingJobGraph.getJobID());
        try {
            removeFuture.get(100L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"We should get an expected timeout because we are removing a non-existed job graph.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Assert.assertThat((Object)removeFuture.isDone(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void testGetJobIds() throws Exception {
        List<JobID> existingJobIds = Arrays.asList(new JobID(0L, 0L), new JobID(0L, 1L));
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setGetAllHandlesSupplier((SupplierWithException<Collection<String>, Exception>)((SupplierWithException)() -> existingJobIds.stream().map(AbstractID::toString).collect(Collectors.toList()))).build();
        JobGraphStore jobGraphStore = this.createAndStartJobGraphStore(stateHandleStore);
        Collection jobIds = jobGraphStore.getJobIds();
        Assert.assertThat((Object)jobIds, (Matcher)Matchers.contains((Object[])existingJobIds.toArray()));
    }

    @Test
    public void testOnAddedJobGraphShouldNotProcessKnownJobGraphs() throws Exception {
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setAddFunction((BiFunctionWithException<String, JobGraph, RetrievableStateHandle<JobGraph>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((JobGraph)state))).build();
        JobGraphStore jobGraphStore = this.createAndStartJobGraphStore(stateHandleStore);
        jobGraphStore.putJobGraph(this.testingJobGraph);
        this.testingJobGraphStoreWatcher.addJobGraph(this.testingJobGraph.getJobID());
        Assert.assertThat((Object)this.testingJobGraphListener.getAddedJobGraphs().size(), (Matcher)Matchers.is((Object)0));
    }

    @Test
    public void testOnAddedJobGraphShouldOnlyProcessUnknownJobGraphs() throws Exception {
        RetrievableStateHandle<JobGraph> stateHandle = this.jobGraphStorageHelper.store(this.testingJobGraph);
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setGetFunction((FunctionWithException<String, RetrievableStateHandle<JobGraph>, Exception>)((FunctionWithException)ignore -> stateHandle)).setAddFunction((BiFunctionWithException<String, JobGraph, RetrievableStateHandle<JobGraph>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((JobGraph)state))).build();
        JobGraphStore jobGraphStore = this.createAndStartJobGraphStore(stateHandleStore);
        jobGraphStore.recoverJobGraph(this.testingJobGraph.getJobID());
        this.testingJobGraphStoreWatcher.addJobGraph(this.testingJobGraph.getJobID());
        JobID unknownJobId = JobID.generate();
        this.testingJobGraphStoreWatcher.addJobGraph(unknownJobId);
        Assert.assertThat((Object)this.testingJobGraphListener.getAddedJobGraphs().size(), (Matcher)Matchers.is((Object)1));
        Assert.assertThat(this.testingJobGraphListener.getAddedJobGraphs(), (Matcher)Matchers.contains((Object[])new JobID[]{unknownJobId}));
    }

    @Test
    public void testOnRemovedJobGraphShouldOnlyProcessKnownJobGraphs() throws Exception {
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setAddFunction((BiFunctionWithException<String, JobGraph, RetrievableStateHandle<JobGraph>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((JobGraph)state))).build();
        JobGraphStore jobGraphStore = this.createAndStartJobGraphStore(stateHandleStore);
        jobGraphStore.putJobGraph(this.testingJobGraph);
        this.testingJobGraphStoreWatcher.removeJobGraph(JobID.generate());
        this.testingJobGraphStoreWatcher.removeJobGraph(this.testingJobGraph.getJobID());
        Assert.assertThat((Object)this.testingJobGraphListener.getRemovedJobGraphs().size(), (Matcher)Matchers.is((Object)1));
        Assert.assertThat(this.testingJobGraphListener.getRemovedJobGraphs(), (Matcher)Matchers.contains((Object[])new JobID[]{this.testingJobGraph.getJobID()}));
    }

    @Test
    public void testOnRemovedJobGraphShouldNotProcessUnknownJobGraphs() throws Exception {
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setAddFunction((BiFunctionWithException<String, JobGraph, RetrievableStateHandle<JobGraph>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((JobGraph)state))).build();
        this.createAndStartJobGraphStore(stateHandleStore);
        this.testingJobGraphStoreWatcher.removeJobGraph(this.testingJobGraph.getJobID());
        Assert.assertThat((Object)this.testingJobGraphListener.getRemovedJobGraphs().size(), (Matcher)Matchers.is((Object)0));
    }

    @Test
    public void testOnAddedJobGraphIsIgnoredAfterBeingStop() throws Exception {
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setAddFunction((BiFunctionWithException<String, JobGraph, RetrievableStateHandle<JobGraph>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((JobGraph)state))).build();
        JobGraphStore jobGraphStore = this.createAndStartJobGraphStore(stateHandleStore);
        jobGraphStore.stop();
        this.testingJobGraphStoreWatcher.addJobGraph(this.testingJobGraph.getJobID());
        Assert.assertThat((Object)this.testingJobGraphListener.getAddedJobGraphs().size(), (Matcher)Matchers.is((Object)0));
    }

    @Test
    public void testOnRemovedJobGraphIsIgnoredAfterBeingStop() throws Exception {
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setAddFunction((BiFunctionWithException<String, JobGraph, RetrievableStateHandle<JobGraph>, Exception>)((BiFunctionWithException)(ignore, state) -> this.jobGraphStorageHelper.store((JobGraph)state))).build();
        JobGraphStore jobGraphStore = this.createAndStartJobGraphStore(stateHandleStore);
        jobGraphStore.putJobGraph(this.testingJobGraph);
        jobGraphStore.stop();
        this.testingJobGraphStoreWatcher.removeJobGraph(this.testingJobGraph.getJobID());
        Assert.assertThat((Object)this.testingJobGraphListener.getRemovedJobGraphs().size(), (Matcher)Matchers.is((Object)0));
    }

    @Test
    public void testStoppingJobGraphStoreShouldReleaseAllHandles() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setReleaseAllHandlesRunnable(() -> completableFuture.complete(null)).build();
        JobGraphStore jobGraphStore = this.createAndStartJobGraphStore(stateHandleStore);
        jobGraphStore.stop();
        Assert.assertThat((Object)completableFuture.isDone(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void testReleasingJobGraphShouldReleaseHandle() throws Exception {
        CompletableFuture releaseFuture = new CompletableFuture();
        TestingStateHandleStore<JobGraph> stateHandleStore = this.builder.setReleaseConsumer((ThrowingConsumer<String, Exception>)((ThrowingConsumer)releaseFuture::complete)).build();
        JobGraphStore jobGraphStore = this.createAndStartJobGraphStore(stateHandleStore);
        jobGraphStore.putJobGraph(this.testingJobGraph);
        jobGraphStore.releaseJobGraph(this.testingJobGraph.getJobID());
        String actual = (String)releaseFuture.get();
        Assert.assertThat((Object)actual, (Matcher)Matchers.is((Object)this.testingJobGraph.getJobID().toString()));
    }

    private JobGraphStore createAndStartJobGraphStore(TestingStateHandleStore<JobGraph> stateHandleStore) throws Exception {
        DefaultJobGraphStore jobGraphStore = new DefaultJobGraphStore(stateHandleStore, (JobGraphStoreWatcher)this.testingJobGraphStoreWatcher, new JobGraphStoreUtil(){

            public String jobIDToName(JobID jobId) {
                return jobId.toString();
            }

            public JobID nameToJobID(String name) {
                return JobID.fromHexString((String)name);
            }
        });
        jobGraphStore.start((JobGraphStore.JobGraphListener)this.testingJobGraphListener);
        return jobGraphStore;
    }
}

