/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.minicluster;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MiniClusterJobClient
implements JobClient,
CoordinationRequestGateway {
    private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobClient.class);
    private final JobID jobID;
    private final MiniCluster miniCluster;
    private final ClassLoader classLoader;
    private final CompletableFuture<JobResult> jobResultFuture;

    public MiniClusterJobClient(JobID jobID, MiniCluster miniCluster, ClassLoader classLoader, JobFinalizationBehavior finalizationBehaviour) {
        this.jobID = jobID;
        this.miniCluster = miniCluster;
        this.classLoader = classLoader;
        if (finalizationBehaviour == JobFinalizationBehavior.SHUTDOWN_CLUSTER) {
            this.jobResultFuture = miniCluster.requestJobResult(jobID).whenComplete((result, throwable) -> MiniClusterJobClient.shutDownCluster(miniCluster));
        } else if (finalizationBehaviour == JobFinalizationBehavior.NOTHING) {
            this.jobResultFuture = miniCluster.requestJobResult(jobID);
        } else {
            throw new IllegalArgumentException("Unexpected shutdown behavior: " + String.valueOf((Object)finalizationBehaviour));
        }
    }

    @Override
    public JobID getJobID() {
        return this.jobID;
    }

    @Override
    public CompletableFuture<JobStatus> getJobStatus() {
        return this.miniCluster.getJobStatus(this.jobID);
    }

    @Override
    public CompletableFuture<Void> cancel() {
        return this.miniCluster.cancelJob(this.jobID).thenAccept(result -> {});
    }

    @Override
    public CompletableFuture<String> stopWithSavepoint(boolean terminate, @Nullable String savepointDirectory, SavepointFormatType formatType) {
        return this.miniCluster.stopWithSavepoint(this.jobID, savepointDirectory, terminate, formatType);
    }

    @Override
    public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDirectory, SavepointFormatType formatType) {
        return this.miniCluster.triggerSavepoint(this.jobID, savepointDirectory, false, formatType);
    }

    @Override
    public CompletableFuture<Map<String, Object>> getAccumulators() {
        CompletableFuture<JobExecutionResult> jobExecutionResult = this.getJobExecutionResult();
        if (jobExecutionResult.isDone()) {
            return jobExecutionResult.thenApply(JobExecutionResult::getAllAccumulatorResults);
        }
        return ((CompletableFuture)this.miniCluster.getExecutionGraph(this.jobID).thenApply(AccessExecutionGraph::getAccumulatorsSerialized)).thenApply(accumulators -> {
            try {
                return AccumulatorHelper.deserializeAndUnwrapAccumulators(accumulators, this.classLoader);
            }
            catch (Exception e) {
                throw new CompletionException("Cannot deserialize and unwrap accumulators properly.", e);
            }
        });
    }

    @Override
    public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
        return this.jobResultFuture.thenApply(result -> {
            try {
                return result.toJobExecutionResult(this.classLoader);
            }
            catch (Exception e) {
                throw new CompletionException("Failed to convert JobResult to JobExecutionResult.", e);
            }
        });
    }

    @Override
    public CompletableFuture<CoordinationResponse> sendCoordinationRequest(String operatorUid, CoordinationRequest request) {
        try {
            SerializedValue<CoordinationRequest> serializedRequest = new SerializedValue<CoordinationRequest>(request);
            return this.miniCluster.deliverCoordinationRequestToCoordinator(this.jobID, operatorUid, serializedRequest);
        }
        catch (IOException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override
    public void reportHeartbeat(long expiredTimestamp) {
        this.miniCluster.reportHeartbeat(this.jobID, expiredTimestamp);
    }

    private static void shutDownCluster(MiniCluster miniCluster) {
        miniCluster.closeAsync().whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                LOG.warn("Shutdown of MiniCluster failed.", throwable);
            }
        });
    }

    public static enum JobFinalizationBehavior {
        SHUTDOWN_CLUSTER,
        NOTHING;

    }
}

