/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.dispatcher.DispatcherCachedOperationsHandler;
import org.apache.flink.runtime.dispatcher.TriggerCheckpointFunction;
import org.apache.flink.runtime.dispatcher.TriggerSavepointFunction;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.dispatcher.UnknownOperationKeyException;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
import org.apache.flink.runtime.rest.handler.async.OperationResult;
import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class DispatcherCachedOperationsHandlerTest
extends TestLogger {
    private static final Duration TIMEOUT = Duration.ofMinutes(10L);
    private CompletedOperationCache<AsynchronousJobOperationKey, Long> checkpointTriggerCache;
    private CompletedOperationCache<AsynchronousJobOperationKey, String> savepointTriggerCache;
    private DispatcherCachedOperationsHandler handler;
    private TriggerCheckpointSpyFunction triggerCheckpointFunction;
    private TriggerSavepointSpyFunction triggerSavepointFunction;
    private TriggerSavepointSpyFunction stopWithSavepointFunction;
    private CompletableFuture<Long> checkpointIdFuture = new CompletableFuture();
    private CompletableFuture<String> savepointLocationFuture = new CompletableFuture();
    private final JobID jobID = new JobID();
    private final String targetDirectory = "dummyDirectory";
    private AsynchronousJobOperationKey operationKey;

    @BeforeEach
    public void setup() {
        this.checkpointIdFuture = new CompletableFuture();
        this.triggerCheckpointFunction = TriggerCheckpointSpyFunction.wrap(new TriggerCheckpointSpyFunction(){

            @Override
            CompletableFuture<Long> applyWrappedFunction(JobID jobID, CheckpointType checkpointType, Duration timeout) {
                return DispatcherCachedOperationsHandlerTest.this.checkpointIdFuture;
            }
        });
        this.savepointLocationFuture = new CompletableFuture();
        this.triggerSavepointFunction = TriggerSavepointSpyFunction.wrap((jobID, targetDirectory, formatType, savepointMode, timeout) -> this.savepointLocationFuture);
        this.stopWithSavepointFunction = TriggerSavepointSpyFunction.wrap((jobID, targetDirectory, formatType, savepointMode, timeout) -> this.savepointLocationFuture);
        this.checkpointTriggerCache = new CompletedOperationCache((Duration)RestOptions.ASYNC_OPERATION_STORE_DURATION.defaultValue());
        this.savepointTriggerCache = new CompletedOperationCache((Duration)RestOptions.ASYNC_OPERATION_STORE_DURATION.defaultValue());
        this.handler = new DispatcherCachedOperationsHandler((TriggerCheckpointFunction)this.triggerCheckpointFunction, this.checkpointTriggerCache, (TriggerSavepointFunction)this.triggerSavepointFunction, (TriggerSavepointFunction)this.stopWithSavepointFunction, this.savepointTriggerCache);
        this.operationKey = AsynchronousJobOperationKey.of((TriggerId)new TriggerId(), (JobID)this.jobID);
    }

    @Test
    public void triggerSavepointRepeatedly() throws ExecutionException, InterruptedException {
        CompletableFuture firstAcknowledge = this.handler.triggerSavepoint(this.operationKey, "dummyDirectory", SavepointFormatType.CANONICAL, TriggerSavepointMode.SAVEPOINT, TIMEOUT);
        CompletableFuture secondAcknowledge = this.handler.triggerSavepoint(this.operationKey, "dummyDirectory", SavepointFormatType.CANONICAL, TriggerSavepointMode.SAVEPOINT, TIMEOUT);
        Assertions.assertThat((int)this.triggerSavepointFunction.getNumberOfInvocations()).isOne();
        Assertions.assertThat(this.triggerSavepointFunction.getInvocationParameters().get(0)).isEqualTo((Object)new Tuple4((Object)this.jobID, (Object)"dummyDirectory", (Object)SavepointFormatType.CANONICAL, (Object)TriggerSavepointMode.SAVEPOINT));
        Assertions.assertThat((Object)((Acknowledge)firstAcknowledge.get())).isEqualTo((Object)Acknowledge.get());
        Assertions.assertThat((Object)((Acknowledge)secondAcknowledge.get())).isEqualTo((Object)Acknowledge.get());
    }

    @Test
    public void stopWithSavepointRepeatedly() throws ExecutionException, InterruptedException {
        CompletableFuture firstAcknowledge = this.handler.stopWithSavepoint(this.operationKey, "dummyDirectory", SavepointFormatType.CANONICAL, TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT, TIMEOUT);
        CompletableFuture secondAcknowledge = this.handler.stopWithSavepoint(this.operationKey, "dummyDirectory", SavepointFormatType.CANONICAL, TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT, TIMEOUT);
        Assertions.assertThat((int)this.stopWithSavepointFunction.getNumberOfInvocations()).isOne();
        Assertions.assertThat(this.stopWithSavepointFunction.getInvocationParameters().get(0)).isEqualTo((Object)new Tuple4((Object)this.jobID, (Object)"dummyDirectory", (Object)SavepointFormatType.CANONICAL, (Object)TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT));
        Assertions.assertThat((Object)((Acknowledge)firstAcknowledge.get())).isEqualTo((Object)Acknowledge.get());
        Assertions.assertThat((Object)((Acknowledge)secondAcknowledge.get())).isEqualTo((Object)Acknowledge.get());
    }

    @Test
    public void retryingCompletedOperationDoesNotMarkCacheEntryAsAccessed() throws ExecutionException, InterruptedException {
        this.handler.triggerSavepoint(this.operationKey, "dummyDirectory", SavepointFormatType.CANONICAL, TriggerSavepointMode.SAVEPOINT, TIMEOUT).get();
        this.savepointLocationFuture.complete("");
        this.handler.triggerSavepoint(this.operationKey, "dummyDirectory", SavepointFormatType.CANONICAL, TriggerSavepointMode.SAVEPOINT, TIMEOUT).get();
        FlinkAssertions.assertThatFuture((CompletableFuture)this.savepointTriggerCache.closeAsync()).willNotCompleteWithin(Duration.ofMillis(10L));
    }

    @Test
    public void throwsIfCacheIsShuttingDown() {
        this.savepointTriggerCache.closeAsync();
        Assertions.assertThatThrownBy(() -> this.handler.triggerSavepoint(this.operationKey, "dummyDirectory", SavepointFormatType.CANONICAL, TriggerSavepointMode.SAVEPOINT, TIMEOUT)).isInstanceOf(IllegalStateException.class);
    }

    @Test
    public void getStatus() throws ExecutionException, InterruptedException {
        this.handler.triggerSavepoint(this.operationKey, "dummyDirectory", SavepointFormatType.CANONICAL, TriggerSavepointMode.SAVEPOINT, TIMEOUT);
        String savepointLocation = "location";
        this.savepointLocationFuture.complete(savepointLocation);
        CompletableFuture statusFuture = this.handler.getSavepointStatus(this.operationKey);
        Assertions.assertThat((Object)((OperationResult)statusFuture.get())).isEqualTo((Object)OperationResult.success((Serializable)((Object)savepointLocation)));
    }

    @Test
    public void getStatusFailsIfKeyUnknown() {
        CompletableFuture statusFuture = this.handler.getSavepointStatus(this.operationKey);
        FlinkAssertions.assertThatFuture((CompletableFuture)statusFuture).eventuallyFails().withCauseOfType(UnknownOperationKeyException.class);
    }

    private static abstract class TriggerSavepointSpyFunction
    implements TriggerSavepointFunction {
        private final List<Tuple4<JobID, String, SavepointFormatType, TriggerSavepointMode>> invocations = new ArrayList<Tuple4<JobID, String, SavepointFormatType, TriggerSavepointMode>>();

        private TriggerSavepointSpyFunction() {
        }

        public CompletableFuture<String> apply(JobID jobID, String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, Duration timeout) {
            this.invocations.add((Tuple4<JobID, String, SavepointFormatType, TriggerSavepointMode>)new Tuple4((Object)jobID, (Object)targetDirectory, (Object)formatType, (Object)savepointMode));
            return this.applyWrappedFunction(jobID, targetDirectory, formatType, savepointMode, timeout);
        }

        abstract CompletableFuture<String> applyWrappedFunction(JobID var1, String var2, SavepointFormatType var3, TriggerSavepointMode var4, Duration var5);

        public List<Tuple4<JobID, String, SavepointFormatType, TriggerSavepointMode>> getInvocationParameters() {
            return this.invocations;
        }

        public int getNumberOfInvocations() {
            return this.invocations.size();
        }

        public static TriggerSavepointSpyFunction wrap(final TriggerSavepointFunction wrappedFunction) {
            return new TriggerSavepointSpyFunction(){

                @Override
                CompletableFuture<String> applyWrappedFunction(JobID jobID, String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, Duration timeout) {
                    return wrappedFunction.apply(jobID, targetDirectory, formatType, savepointMode, timeout);
                }
            };
        }
    }

    private static abstract class TriggerCheckpointSpyFunction
    implements TriggerCheckpointFunction {
        private final List<Tuple2<JobID, CheckpointType>> invocations = new ArrayList<Tuple2<JobID, CheckpointType>>();

        private TriggerCheckpointSpyFunction() {
        }

        public CompletableFuture<Long> apply(JobID jobID, CheckpointType checkpointType, Duration timeout) {
            this.invocations.add((Tuple2<JobID, CheckpointType>)new Tuple2((Object)jobID, (Object)checkpointType));
            return this.applyWrappedFunction(jobID, checkpointType, timeout);
        }

        abstract CompletableFuture<Long> applyWrappedFunction(JobID var1, CheckpointType var2, Duration var3);

        public List<Tuple2<JobID, CheckpointType>> getInvocationParameters() {
            return this.invocations;
        }

        public int getNumberOfInvocations() {
            return this.invocations.size();
        }

        public static TriggerCheckpointSpyFunction wrap(final TriggerCheckpointSpyFunction wrappedFunction) {
            return new TriggerCheckpointSpyFunction(){

                @Override
                CompletableFuture<Long> applyWrappedFunction(JobID jobID, CheckpointType checkpointType, Duration timeout) {
                    return wrappedFunction.apply(jobID, checkpointType, timeout);
                }
            };
        }
    }
}

