/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.function.ThrowingConsumer;
import org.slf4j.Logger;

public class ExecutionGraphHandler {
    private final ExecutionGraph executionGraph;
    private final Logger log;
    private final Executor ioExecutor;
    private final ComponentMainThreadExecutor mainThreadExecutor;

    public ExecutionGraphHandler(ExecutionGraph executionGraph, Logger log, Executor ioExecutor, ComponentMainThreadExecutor mainThreadExecutor) {
        this.executionGraph = executionGraph;
        this.log = log;
        this.ioExecutor = ioExecutor;
        this.mainThreadExecutor = mainThreadExecutor;
    }

    public void reportCheckpointMetrics(ExecutionAttemptID attemptId, long id, CheckpointMetrics metrics) {
        this.processCheckpointCoordinatorMessage("ReportCheckpointStats", (ThrowingConsumer<CheckpointCoordinator, Exception>)((ThrowingConsumer)coordinator -> coordinator.reportCheckpointMetrics(id, attemptId, metrics)));
    }

    public void reportInitializationMetrics(ExecutionAttemptID executionAttemptId, SubTaskInitializationMetrics initializationMetrics) {
        CheckpointCoordinatorConfiguration checkpointConfig = this.executionGraph.getCheckpointCoordinatorConfiguration();
        if (checkpointConfig == null || !checkpointConfig.isCheckpointingEnabled()) {
            this.log.debug("Ignoring reportInitializationMetrics if checkpointing is not present");
            return;
        }
        this.processCheckpointCoordinatorMessage("ReportInitializationMetrics", (ThrowingConsumer<CheckpointCoordinator, Exception>)((ThrowingConsumer)coordinator -> coordinator.reportInitializationMetrics(executionAttemptId, initializationMetrics)));
    }

    public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot checkpointState) {
        this.processCheckpointCoordinatorMessage("AcknowledgeCheckpoint", (ThrowingConsumer<CheckpointCoordinator, Exception>)((ThrowingConsumer)coordinator -> coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState), this.retrieveTaskManagerLocation(executionAttemptID))));
    }

    public void declineCheckpoint(DeclineCheckpoint decline) {
        this.processCheckpointCoordinatorMessage("DeclineCheckpoint", (ThrowingConsumer<CheckpointCoordinator, Exception>)((ThrowingConsumer)coordinator -> coordinator.receiveDeclineMessage(decline, this.retrieveTaskManagerLocation(decline.getTaskExecutionId()))));
    }

    private void processCheckpointCoordinatorMessage(String messageType, ThrowingConsumer<CheckpointCoordinator, Exception> process) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator != null) {
            this.ioExecutor.execute(() -> {
                try {
                    process.accept((Object)checkpointCoordinator);
                }
                catch (Exception t) {
                    this.log.warn("Error while processing " + messageType + " message", (Throwable)t);
                }
            });
        } else {
            String errorMessage = "Received " + messageType + " message for job {} with no CheckpointCoordinator";
            if (this.executionGraph.getState() == JobStatus.RUNNING) {
                this.log.error(errorMessage, (Object)this.executionGraph.getJobID());
            } else {
                this.log.debug(errorMessage, (Object)this.executionGraph.getJobID());
            }
        }
    }

    private String retrieveTaskManagerLocation(ExecutionAttemptID executionAttemptID) {
        Optional<Execution> currentExecution = Optional.ofNullable(this.executionGraph.getRegisteredExecutions().get(executionAttemptID));
        return currentExecution.map(Execution::getAssignedResourceLocation).map(TaskManagerLocation::toString).orElse("Unknown location");
    }

    public ExecutionState requestPartitionState(IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {
        Execution execution = this.executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
        if (execution != null) {
            return execution.getState();
        }
        IntermediateResult intermediateResult = this.executionGraph.getAllIntermediateResults().get(intermediateResultId);
        if (intermediateResult != null) {
            Execution producerExecution = intermediateResult.getPartitionById(resultPartitionId.getPartitionId()).getProducer().getCurrentExecutionAttempt();
            if (producerExecution.getAttemptId().equals(resultPartitionId.getProducerId())) {
                return producerExecution.getState();
            }
            throw new PartitionProducerDisposedException(resultPartitionId);
        }
        throw new IllegalArgumentException("Intermediate data set with ID " + intermediateResultId + " not found.");
    }

    public SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException {
        InputSplit nextInputSplit;
        Execution execution = this.executionGraph.getRegisteredExecutions().get(executionAttempt);
        if (execution == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Can not find Execution for attempt {}.", (Object)executionAttempt);
            }
            throw new IllegalArgumentException("Can not find Execution for attempt " + executionAttempt);
        }
        ExecutionJobVertex vertex = this.executionGraph.getJobVertex(vertexID);
        if (vertex == null) {
            throw new IllegalArgumentException("Cannot find execution vertex for vertex ID " + vertexID);
        }
        if (vertex.getSplitAssigner() == null) {
            throw new IllegalStateException("No InputSplitAssigner for vertex ID " + vertexID);
        }
        Optional<InputSplit> optionalNextInputSplit = execution.getNextInputSplit();
        if (optionalNextInputSplit.isPresent()) {
            nextInputSplit = optionalNextInputSplit.get();
            this.log.debug("Send next input split {}.", (Object)nextInputSplit);
        } else {
            nextInputSplit = null;
            this.log.debug("No more input splits available");
        }
        try {
            byte[] serializedInputSplit = InstantiationUtil.serializeObject((Object)nextInputSplit);
            return new SerializedInputSplit(serializedInputSplit);
        }
        catch (Exception ex) {
            IOException reason = new IOException("Could not serialize the next input split of class " + nextInputSplit.getClass() + ".", ex);
            vertex.fail(reason);
            throw reason;
        }
    }
}

