/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.deployment;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.CachedShuffleDescriptors;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IndexRange;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.types.Either;
import org.apache.flink.util.CompressedSerializedValue;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

public class TaskDeploymentDescriptorFactory {
    @Experimental
    public static final ConfigOption<Integer> OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD = ConfigOptions.key((String)"jobmanager.task-deployment.offload-shuffle-descriptors-to-blob-server.threshold-num").intType().defaultValue((Object)0x400000).withDescription("Threshold for offloading shuffle descriptors to blob server. Once the number of shuffle descriptors exceeds this value, we will offload the shuffle descriptors to blob server. This default value means JobManager need to serialize and transport 2048 shuffle descriptors (almost 32KB) to 2048 consumers (64MB in total)");
    private final TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> serializedJobInformation;
    private final JobID jobID;
    private final PartitionLocationConstraint partitionDeploymentConstraint;
    private final boolean nonFinishedHybridPartitionShouldBeUnknown;
    private final ShuffleDescriptorSerializer shuffleDescriptorSerializer;

    public TaskDeploymentDescriptorFactory(Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey, JobID jobID, PartitionLocationConstraint partitionDeploymentConstraint, BlobWriter blobWriter, boolean nonFinishedHybridPartitionShouldBeUnknown, int offloadShuffleDescriptorsThreshold) {
        this.serializedJobInformation = TaskDeploymentDescriptorFactory.getSerializedJobInformation(jobInformationOrBlobKey);
        this.jobID = jobID;
        this.partitionDeploymentConstraint = partitionDeploymentConstraint;
        this.nonFinishedHybridPartitionShouldBeUnknown = nonFinishedHybridPartitionShouldBeUnknown;
        this.shuffleDescriptorSerializer = new DefaultShuffleDescriptorSerializer(jobID, blobWriter, offloadShuffleDescriptorsThreshold);
    }

    public TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> getSerializedJobInformation() {
        return this.serializedJobInformation;
    }

    public TaskDeploymentDescriptor createDeploymentDescriptor(Execution execution, AllocationID allocationID, @Nullable JobManagerTaskRestore taskRestore, Collection<ResultPartitionDeploymentDescriptor> producedPartitions) throws IOException, ClusterDatasetCorruptedException {
        ExecutionVertex executionVertex = execution.getVertex();
        return new TaskDeploymentDescriptor(this.jobID, this.serializedJobInformation, TaskDeploymentDescriptorFactory.getSerializedTaskInformation(executionVertex.getJobVertex().getTaskInformationOrBlobKey()), execution.getAttemptId(), allocationID, taskRestore, new ArrayList<ResultPartitionDeploymentDescriptor>(producedPartitions), this.createInputGateDeploymentDescriptors(executionVertex));
    }

    private List<InputGateDeploymentDescriptor> createInputGateDeploymentDescriptors(ExecutionVertex executionVertex) throws IOException, ClusterDatasetCorruptedException {
        Map<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> consumedClusterPartitionShuffleDescriptors;
        List<ConsumedPartitionGroup> consumedPartitionGroups = executionVertex.getAllConsumedPartitionGroups();
        ArrayList<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(consumedPartitionGroups.size());
        for (ConsumedPartitionGroup consumedPartitionGroup : consumedPartitionGroups) {
            IntermediateResult consumedIntermediateResult = executionVertex.getExecutionGraphAccessor().getResultPartitionOrThrow(consumedPartitionGroup.getFirst()).getIntermediateResult();
            IntermediateDataSetID resultId = consumedIntermediateResult.getId();
            ResultPartitionType partitionType = consumedIntermediateResult.getResultType();
            IndexRange subpartitionRange = executionVertex.getExecutionVertexInputInfo(resultId).getSubpartitionIndexRange();
            inputGates.add(new InputGateDeploymentDescriptor(resultId, partitionType, subpartitionRange, consumedPartitionGroup.size(), this.getConsumedPartitionShuffleDescriptors(consumedIntermediateResult, consumedPartitionGroup, executionVertex.getExecutionGraphAccessor())));
        }
        try {
            consumedClusterPartitionShuffleDescriptors = TaskDeploymentDescriptorFactory.getClusterPartitionShuffleDescriptors(executionVertex);
        }
        catch (Throwable e) {
            throw new ClusterDatasetCorruptedException(e, executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume());
        }
        for (Map.Entry<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> entry : consumedClusterPartitionShuffleDescriptors.entrySet()) {
            inputGates.add(new InputGateDeploymentDescriptor(entry.getKey(), ResultPartitionType.BLOCKING_PERSISTENT, 0, entry.getValue()));
        }
        return inputGates;
    }

    private List<TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptorGroup>> getConsumedPartitionShuffleDescriptors(IntermediateResult intermediateResult, ConsumedPartitionGroup consumedPartitionGroup, InternalExecutionGraphAccessor internalExecutionGraphAccessor) throws IOException {
        CachedShuffleDescriptors cachedShuffleDescriptors = intermediateResult.getCachedShuffleDescriptors(consumedPartitionGroup);
        if (cachedShuffleDescriptors == null) {
            cachedShuffleDescriptors = intermediateResult.cacheShuffleDescriptors(consumedPartitionGroup, this.computeConsumedPartitionShuffleDescriptors(consumedPartitionGroup, internalExecutionGraphAccessor));
        }
        cachedShuffleDescriptors.serializeShuffleDescriptors(this.shuffleDescriptorSerializer);
        return cachedShuffleDescriptors.getAllSerializedShuffleDescriptorGroups();
    }

    private ShuffleDescriptorAndIndex[] computeConsumedPartitionShuffleDescriptors(ConsumedPartitionGroup consumedPartitionGroup, InternalExecutionGraphAccessor internalExecutionGraphAccessor) {
        ShuffleDescriptorAndIndex[] shuffleDescriptors = new ShuffleDescriptorAndIndex[consumedPartitionGroup.size()];
        int i = 0;
        for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
            shuffleDescriptors[i] = new ShuffleDescriptorAndIndex(TaskDeploymentDescriptorFactory.getConsumedPartitionShuffleDescriptor(internalExecutionGraphAccessor.getResultPartitionOrThrow(partitionId), this.partitionDeploymentConstraint, this.nonFinishedHybridPartitionShouldBeUnknown), i);
            ++i;
        }
        return shuffleDescriptors;
    }

    private static Map<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> getClusterPartitionShuffleDescriptors(ExecutionVertex executionVertex) {
        InternalExecutionGraphAccessor internalExecutionGraphAccessor = executionVertex.getExecutionGraphAccessor();
        List<IntermediateDataSetID> consumedClusterDataSetIds = executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume();
        HashMap<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> clusterPartitionShuffleDescriptors = new HashMap<IntermediateDataSetID, ShuffleDescriptorAndIndex[]>();
        for (IntermediateDataSetID consumedClusterDataSetId : consumedClusterDataSetIds) {
            List<ShuffleDescriptor> shuffleDescriptors = internalExecutionGraphAccessor.getClusterPartitionShuffleDescriptors(consumedClusterDataSetId);
            Preconditions.checkState((executionVertex.getTotalNumberOfParallelSubtasks() == shuffleDescriptors.size() ? 1 : 0) != 0, (String)"The parallelism (%s) of the cache consuming job vertex is different from the number of shuffle descriptors (%s) of the intermediate data set", (Object[])new Object[]{executionVertex.getTotalNumberOfParallelSubtasks(), shuffleDescriptors.size()});
            clusterPartitionShuffleDescriptors.put(consumedClusterDataSetId, new ShuffleDescriptorAndIndex[]{new ShuffleDescriptorAndIndex(shuffleDescriptors.get(executionVertex.getParallelSubtaskIndex()), 0)});
        }
        return clusterPartitionShuffleDescriptors;
    }

    private static TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> getSerializedJobInformation(Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey) {
        if (jobInformationOrBlobKey.isLeft()) {
            return new TaskDeploymentDescriptor.NonOffloaded<JobInformation>((SerializedValue)jobInformationOrBlobKey.left());
        }
        return new TaskDeploymentDescriptor.Offloaded<JobInformation>((PermanentBlobKey)jobInformationOrBlobKey.right());
    }

    private static TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> getSerializedTaskInformation(Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInfo) {
        return taskInfo.isLeft() ? new TaskDeploymentDescriptor.NonOffloaded((SerializedValue)taskInfo.left()) : new TaskDeploymentDescriptor.Offloaded((PermanentBlobKey)taskInfo.right());
    }

    public static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(IntermediateResultPartition consumedPartition, PartitionLocationConstraint partitionDeploymentConstraint, boolean nonFinishedHybridPartitionShouldBeUnknown) {
        Execution producer = consumedPartition.getProducer().getPartitionProducer();
        ExecutionState producerState = producer.getState();
        Optional<ResultPartitionDeploymentDescriptor> consumedPartitionDescriptor = producer.getResultPartitionDeploymentDescriptor(consumedPartition.getPartitionId());
        ResultPartitionID consumedPartitionId = new ResultPartitionID(consumedPartition.getPartitionId(), producer.getAttemptId());
        return TaskDeploymentDescriptorFactory.getConsumedPartitionShuffleDescriptor(consumedPartitionId, consumedPartition.getResultType(), consumedPartition.hasDataAllProduced(), producerState, partitionDeploymentConstraint, consumedPartitionDescriptor.orElse(null), nonFinishedHybridPartitionShouldBeUnknown);
    }

    @VisibleForTesting
    static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(ResultPartitionID consumedPartitionId, ResultPartitionType resultPartitionType, boolean hasAllDataProduced, ExecutionState producerState, PartitionLocationConstraint partitionDeploymentConstraint, @Nullable ResultPartitionDeploymentDescriptor consumedPartitionDescriptor, boolean nonFinishedHybridPartitionShouldBeUnknown) {
        if ((resultPartitionType.canBePipelinedConsumed() || hasAllDataProduced) && consumedPartitionDescriptor != null && TaskDeploymentDescriptorFactory.isProducerAvailable(producerState)) {
            if (resultPartitionType.isHybridResultPartition() && nonFinishedHybridPartitionShouldBeUnknown && producerState != ExecutionState.FINISHED) {
                Preconditions.checkState((partitionDeploymentConstraint == PartitionLocationConstraint.CAN_BE_UNKNOWN ? 1 : 0) != 0, (Object)"partition location constraint should allow unknown shuffle descriptor when nonFinishedHybridPartitionShouldBeUnknown is true.");
                return new UnknownShuffleDescriptor(consumedPartitionId);
            }
            return consumedPartitionDescriptor.getShuffleDescriptor();
        }
        if (partitionDeploymentConstraint == PartitionLocationConstraint.CAN_BE_UNKNOWN) {
            return new UnknownShuffleDescriptor(consumedPartitionId);
        }
        throw TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(consumedPartitionId, resultPartitionType, hasAllDataProduced, producerState);
    }

    private static RuntimeException handleConsumedPartitionShuffleDescriptorErrors(ResultPartitionID consumedPartitionId, ResultPartitionType resultPartitionType, boolean hasAllDataProduced, ExecutionState producerState) {
        Object msg = TaskDeploymentDescriptorFactory.isProducerFailedOrCanceled(producerState) ? "Trying to consume an input partition whose producer has been canceled or failed. The producer is in state " + producerState + "." : String.format("Trying to consume an input partition whose producer is not ready (result type: %s, hasAllDataProduced: %s, producer state: %s, partition id: %s).", new Object[]{resultPartitionType, hasAllDataProduced, producerState, consumedPartitionId});
        return new IllegalStateException((String)msg);
    }

    private static boolean isProducerAvailable(ExecutionState producerState) {
        return producerState == ExecutionState.RUNNING || producerState == ExecutionState.INITIALIZING || producerState == ExecutionState.FINISHED || producerState == ExecutionState.SCHEDULED || producerState == ExecutionState.DEPLOYING;
    }

    private static boolean isProducerFailedOrCanceled(ExecutionState producerState) {
        return producerState == ExecutionState.CANCELING || producerState == ExecutionState.CANCELED || producerState == ExecutionState.FAILED;
    }

    private static class DefaultShuffleDescriptorSerializer
    implements ShuffleDescriptorSerializer {
        private final JobID jobID;
        private final BlobWriter blobWriter;
        private final int offloadShuffleDescriptorsThreshold;

        public DefaultShuffleDescriptorSerializer(JobID jobID, BlobWriter blobWriter, int offloadShuffleDescriptorsThreshold) {
            this.jobID = (JobID)Preconditions.checkNotNull((Object)jobID);
            this.blobWriter = (BlobWriter)Preconditions.checkNotNull((Object)blobWriter);
            this.offloadShuffleDescriptorsThreshold = offloadShuffleDescriptorsThreshold;
        }

        @Override
        public TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptorGroup> serializeAndTryOffloadShuffleDescriptor(ShuffleDescriptorGroup shuffleDescriptorGroup, int numConsumer) throws IOException {
            Either serializedValueOrBlobKey;
            CompressedSerializedValue compressedSerializedValue = CompressedSerializedValue.fromObject((Object)shuffleDescriptorGroup);
            Either either = serializedValueOrBlobKey = this.shouldOffload(shuffleDescriptorGroup.getShuffleDescriptors(), numConsumer) ? BlobWriter.offloadWithException(compressedSerializedValue, this.jobID, this.blobWriter) : Either.Left((Object)compressedSerializedValue);
            if (serializedValueOrBlobKey.isLeft()) {
                return new TaskDeploymentDescriptor.NonOffloaded<ShuffleDescriptorGroup>((SerializedValue)serializedValueOrBlobKey.left());
            }
            return new TaskDeploymentDescriptor.Offloaded<ShuffleDescriptorGroup>((PermanentBlobKey)serializedValueOrBlobKey.right());
        }

        private boolean shouldOffload(ShuffleDescriptorAndIndex[] shuffleDescriptorsToSerialize, int numConsumers) {
            return shuffleDescriptorsToSerialize.length * numConsumers >= this.offloadShuffleDescriptorsThreshold;
        }
    }

    static interface ShuffleDescriptorSerializer {
        public TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptorGroup> serializeAndTryOffloadShuffleDescriptor(ShuffleDescriptorGroup var1, int var2) throws IOException;
    }

    public static class ShuffleDescriptorGroup
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final ShuffleDescriptorAndIndex[] shuffleDescriptors;

        public ShuffleDescriptorGroup(ShuffleDescriptorAndIndex[] shuffleDescriptors) {
            this.shuffleDescriptors = (ShuffleDescriptorAndIndex[])Preconditions.checkNotNull((Object)shuffleDescriptors);
        }

        public ShuffleDescriptorAndIndex[] getShuffleDescriptors() {
            return this.shuffleDescriptors;
        }
    }

    public static class ShuffleDescriptorAndIndex
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final ShuffleDescriptor shuffleDescriptor;
        private final int index;

        public ShuffleDescriptorAndIndex(ShuffleDescriptor shuffleDescriptor, int index) {
            this.shuffleDescriptor = shuffleDescriptor;
            this.index = index;
        }

        public ShuffleDescriptor getShuffleDescriptor() {
            return this.shuffleDescriptor;
        }

        public int getIndex() {
            return this.index;
        }
    }

    public static enum PartitionLocationConstraint {
        MUST_BE_KNOWN,
        CAN_BE_UNKNOWN;


        public static PartitionLocationConstraint fromJobType(JobType jobType) {
            switch (jobType) {
                case BATCH: {
                    return CAN_BE_UNKNOWN;
                }
                case STREAMING: {
                    return MUST_BE_KNOWN;
                }
            }
            throw new IllegalArgumentException(String.format("Unknown JobType %s. Cannot derive partition location constraint for it.", new Object[]{jobType}));
        }
    }
}

