/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.shuffle;

import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.AllTieredShuffleMasterSnapshots;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredInternalShuffleMaster;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredInternalShuffleMasterSnapshot;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
import org.apache.flink.runtime.shuffle.EmptyShuffleMasterSnapshot;
import org.apache.flink.runtime.shuffle.JobShuffleContext;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
import org.apache.flink.runtime.shuffle.ShuffleMasterSnapshot;
import org.apache.flink.runtime.shuffle.ShuffleMasterSnapshotContext;
import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.util.Preconditions;

public class NettyShuffleMaster
implements ShuffleMaster<NettyShuffleDescriptor> {
    private final int buffersPerInputChannel;
    private final int floatingBuffersPerGate;
    private final Optional<Integer> maxRequiredBuffersPerGate;
    private final int sortShuffleMinParallelism;
    private final int sortShuffleMinBuffers;
    private final int networkBufferSize;
    private final boolean enableJobMasterFailover;
    @Nullable
    private final TieredInternalShuffleMaster tieredInternalShuffleMaster;
    private final Map<JobID, JobShuffleContext> jobShuffleContexts = new HashMap<JobID, JobShuffleContext>();
    private final Map<JobID, Map<ResultPartitionID, ShuffleDescriptor>> jobShuffleDescriptors = new HashMap<JobID, Map<ResultPartitionID, ShuffleDescriptor>>();

    public NettyShuffleMaster(ShuffleMasterContext shuffleMasterContext) {
        Configuration conf = shuffleMasterContext.getConfiguration();
        Preconditions.checkNotNull((Object)conf);
        this.buffersPerInputChannel = 2;
        this.floatingBuffersPerGate = 8;
        this.maxRequiredBuffersPerGate = conf.getOptional(NettyShuffleEnvironmentOptions.NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE);
        this.sortShuffleMinParallelism = 1;
        this.sortShuffleMinBuffers = (Integer)conf.get(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS);
        this.networkBufferSize = ConfigurationParserUtils.getPageSize(conf);
        this.tieredInternalShuffleMaster = this.isHybridShuffleEnabled(conf) ? new TieredInternalShuffleMaster(shuffleMasterContext, this::getShuffleDescriptor) : null;
        this.enableJobMasterFailover = (Boolean)conf.get(BatchExecutionOptions.JOB_RECOVERY_ENABLED) != false && this.supportsBatchSnapshot();
        Preconditions.checkArgument((!this.maxRequiredBuffersPerGate.isPresent() || this.maxRequiredBuffersPerGate.get() >= 1 ? 1 : 0) != 0, (Object)String.format("At least one buffer is required for each gate, please increase the value of %s.", NettyShuffleEnvironmentOptions.NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE.key()));
    }

    @Override
    public CompletableFuture<NettyShuffleDescriptor> registerPartitionWithProducer(JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
        ResultPartitionID resultPartitionID = new ResultPartitionID(partitionDescriptor.getPartitionId(), producerDescriptor.getProducerExecutionId());
        List<TierShuffleDescriptor> tierShuffleDescriptors = null;
        if (this.tieredInternalShuffleMaster != null) {
            tierShuffleDescriptors = this.tieredInternalShuffleMaster.addPartitionAndGetShuffleDescriptor(jobID, partitionDescriptor.getNumberOfSubpartitions(), resultPartitionID);
        }
        NettyShuffleDescriptor shuffleDeploymentDescriptor = new NettyShuffleDescriptor(producerDescriptor.getProducerLocation(), NettyShuffleMaster.createConnectionInfo(producerDescriptor, partitionDescriptor.getConnectionIndex()), resultPartitionID, tierShuffleDescriptors);
        if (this.enableJobMasterFailover) {
            Map shuffleDescriptorMap = this.jobShuffleDescriptors.computeIfAbsent(jobID, k -> new HashMap());
            shuffleDescriptorMap.put(resultPartitionID, shuffleDeploymentDescriptor);
        }
        return CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
    }

    @Override
    public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
        if (this.tieredInternalShuffleMaster != null) {
            this.tieredInternalShuffleMaster.releasePartition(shuffleDescriptor);
        }
    }

    public Optional<ShuffleDescriptor> getShuffleDescriptor(JobID jobID, ResultPartitionID resultPartitionID) {
        return Optional.ofNullable(this.jobShuffleDescriptors.get(jobID)).map(descriptorMap -> (ShuffleDescriptor)descriptorMap.get(resultPartitionID));
    }

    private static NettyShuffleDescriptor.PartitionConnectionInfo createConnectionInfo(ProducerDescriptor producerDescriptor, int connectionIndex) {
        return producerDescriptor.getDataPort() >= 0 ? NettyShuffleDescriptor.NetworkPartitionConnectionInfo.fromProducerDescriptor(producerDescriptor, connectionIndex) : NettyShuffleDescriptor.LocalExecutionPartitionConnectionInfo.INSTANCE;
    }

    @Override
    public MemorySize computeShuffleMemorySizeForTask(TaskInputsOutputsDescriptor desc) {
        Preconditions.checkNotNull((Object)desc);
        int numRequiredNetworkBuffers = NettyShuffleUtils.computeNetworkBuffersForAnnouncing(this.buffersPerInputChannel, this.floatingBuffersPerGate, this.maxRequiredBuffersPerGate, this.sortShuffleMinParallelism, this.sortShuffleMinBuffers, desc.getInputChannelNums(), desc.getPartitionReuseCount(), desc.getSubpartitionNums(), desc.getInputPartitionTypes(), desc.getPartitionTypes());
        return new MemorySize((long)this.networkBufferSize * (long)numRequiredNetworkBuffers);
    }

    private boolean isHybridShuffleEnabled(Configuration conf) {
        return conf.get(ExecutionOptions.BATCH_SHUFFLE_MODE) == BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL || conf.get(ExecutionOptions.BATCH_SHUFFLE_MODE) == BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE;
    }

    @Override
    public CompletableFuture<Collection<PartitionWithMetrics>> getPartitionWithMetrics(JobID jobId, Duration timeout, Set<ResultPartitionID> expectedPartitions) {
        if (this.tieredInternalShuffleMaster != null) {
            return this.tieredInternalShuffleMaster.getPartitionWithMetrics(this.jobShuffleContexts.get(jobId), timeout, expectedPartitions);
        }
        return ((JobShuffleContext)Preconditions.checkNotNull((Object)this.jobShuffleContexts.get(jobId))).getPartitionWithMetrics(timeout, expectedPartitions);
    }

    @Override
    public void registerJob(JobShuffleContext context) {
        this.jobShuffleContexts.put(context.getJobId(), context);
        if (this.tieredInternalShuffleMaster != null) {
            this.tieredInternalShuffleMaster.registerJob(context);
        }
    }

    @Override
    public void unregisterJob(JobID jobId) {
        this.jobShuffleContexts.remove(jobId);
        if (this.tieredInternalShuffleMaster != null) {
            if (this.enableJobMasterFailover) {
                this.jobShuffleDescriptors.remove(jobId);
            }
            this.tieredInternalShuffleMaster.unregisterJob(jobId);
        }
    }

    @Override
    public boolean supportsBatchSnapshot() {
        if (this.tieredInternalShuffleMaster != null) {
            return this.tieredInternalShuffleMaster.supportsBatchSnapshot();
        }
        return true;
    }

    @Override
    public void snapshotState(CompletableFuture<ShuffleMasterSnapshot> snapshotFuture, ShuffleMasterSnapshotContext context, JobID jobId) {
        if (this.tieredInternalShuffleMaster != null) {
            Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptorMap = this.jobShuffleDescriptors.remove(jobId);
            CompletableFuture<AllTieredShuffleMasterSnapshots> allSnapshotFuture = new CompletableFuture<AllTieredShuffleMasterSnapshots>();
            this.tieredInternalShuffleMaster.snapshotState(allSnapshotFuture, context, jobId);
            allSnapshotFuture.thenAccept(allSnap -> snapshotFuture.complete(new TieredInternalShuffleMasterSnapshot(shuffleDescriptorMap, (AllTieredShuffleMasterSnapshots)allSnap)));
            return;
        }
        snapshotFuture.complete(EmptyShuffleMasterSnapshot.getInstance());
    }

    @Override
    public void snapshotState(CompletableFuture<ShuffleMasterSnapshot> snapshotFuture) {
        if (this.tieredInternalShuffleMaster != null) {
            CompletableFuture<AllTieredShuffleMasterSnapshots> allSnapshotFuture = new CompletableFuture<AllTieredShuffleMasterSnapshots>();
            this.tieredInternalShuffleMaster.snapshotState(allSnapshotFuture);
            allSnapshotFuture.thenAccept(allSnap -> snapshotFuture.complete(new TieredInternalShuffleMasterSnapshot(null, (AllTieredShuffleMasterSnapshots)allSnap)));
            return;
        }
        snapshotFuture.complete(EmptyShuffleMasterSnapshot.getInstance());
    }

    @Override
    public void restoreState(ShuffleMasterSnapshot snapshot) {
        if (this.tieredInternalShuffleMaster != null) {
            Preconditions.checkState((boolean)(snapshot instanceof TieredInternalShuffleMasterSnapshot));
            this.tieredInternalShuffleMaster.restoreState((TieredInternalShuffleMasterSnapshot)snapshot);
        }
    }

    @Override
    public void restoreState(List<ShuffleMasterSnapshot> snapshots, JobID jobId) {
        if (this.tieredInternalShuffleMaster != null) {
            List<TieredInternalShuffleMasterSnapshot> snapshotList = snapshots.stream().map(snap -> {
                Preconditions.checkState((boolean)(snap instanceof TieredInternalShuffleMasterSnapshot));
                Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors = ((TieredInternalShuffleMasterSnapshot)snap).getShuffleDescriptors();
                if (shuffleDescriptors != null) {
                    this.jobShuffleDescriptors.computeIfAbsent(jobId, k -> new HashMap()).putAll(shuffleDescriptors);
                }
                return (TieredInternalShuffleMasterSnapshot)snap;
            }).collect(Collectors.toList());
            this.tieredInternalShuffleMaster.restoreState(snapshotList, jobId);
        }
    }

    @Override
    public void notifyPartitionRecoveryStarted(JobID jobId) {
        ((JobShuffleContext)Preconditions.checkNotNull((Object)this.jobShuffleContexts.get(jobId))).notifyPartitionRecoveryStarted();
    }

    @Override
    public void close() throws Exception {
        if (this.tieredInternalShuffleMaster != null) {
            this.tieredInternalShuffleMaster.close();
        }
    }
}

