/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionRequestListener;
import org.apache.flink.runtime.io.network.partition.PartitionRequestListenerManager;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.shuffle.DefaultShuffleMetrics;
import org.apache.flink.runtime.shuffle.ShuffleMetrics;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResultPartitionManager
implements ResultPartitionProvider {
    private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionManager.class);
    private final Map<ResultPartitionID, ResultPartition> registeredPartitions = CollectionUtil.newHashMapWithExpectedSize((int)16);
    @GuardedBy(value="registeredPartitions")
    private final Map<ResultPartitionID, PartitionRequestListenerManager> listenerManagers = new HashMap<ResultPartitionID, PartitionRequestListenerManager>();
    @Nullable
    private ScheduledFuture<?> partitionListenerTimeoutChecker;
    private final int partitionListenerTimeout;
    private boolean isShutdown;

    @VisibleForTesting
    public ResultPartitionManager() {
        this(0, null);
    }

    public ResultPartitionManager(int partitionListenerTimeout, ScheduledExecutor scheduledExecutor) {
        this.partitionListenerTimeout = partitionListenerTimeout;
        if (partitionListenerTimeout > 0 && scheduledExecutor != null) {
            this.partitionListenerTimeoutChecker = scheduledExecutor.scheduleWithFixedDelay(this::checkRequestPartitionListeners, (long)partitionListenerTimeout, (long)partitionListenerTimeout, TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerResultPartition(ResultPartition partition) throws IOException {
        PartitionRequestListenerManager listenerManager;
        Map<ResultPartitionID, ResultPartition> map = this.registeredPartitions;
        synchronized (map) {
            Preconditions.checkState((!this.isShutdown ? 1 : 0) != 0, (Object)"Result partition manager already shut down.");
            ResultPartition previous = this.registeredPartitions.put(partition.getPartitionId(), partition);
            if (previous != null) {
                throw new IllegalStateException("Result partition already registered.");
            }
            listenerManager = this.listenerManagers.remove(partition.getPartitionId());
        }
        if (listenerManager != null) {
            for (PartitionRequestListener listener : listenerManager.getPartitionRequestListeners()) {
                listener.notifyPartitionCreated(partition);
            }
        }
        LOG.debug("Registered {}.", (Object)partition);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ResultSubpartitionView createSubpartitionView(ResultPartitionID partitionId, ResultSubpartitionIndexSet subpartitionIndexSet, BufferAvailabilityListener availabilityListener) throws IOException {
        ResultSubpartitionView subpartitionView;
        Map<ResultPartitionID, ResultPartition> map = this.registeredPartitions;
        synchronized (map) {
            ResultPartition partition = this.registeredPartitions.get(partitionId);
            if (partition == null) {
                throw new PartitionNotFoundException(partitionId);
            }
            LOG.debug("Requesting subpartitions {} of {}.", (Object)subpartitionIndexSet, (Object)partition);
            subpartitionView = partition.createSubpartitionView(subpartitionIndexSet, availabilityListener);
        }
        return subpartitionView;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Optional<ResultSubpartitionView> createSubpartitionViewOrRegisterListener(ResultPartitionID partitionId, ResultSubpartitionIndexSet subpartitionIndexSet, BufferAvailabilityListener availabilityListener, PartitionRequestListener partitionRequestListener) throws IOException {
        ResultSubpartitionView subpartitionView;
        Map<ResultPartitionID, ResultPartition> map = this.registeredPartitions;
        synchronized (map) {
            ResultPartition partition = this.registeredPartitions.get(partitionId);
            if (partition == null) {
                this.listenerManagers.computeIfAbsent(partitionId, key -> new PartitionRequestListenerManager()).registerListener(partitionRequestListener);
                subpartitionView = null;
            } else {
                LOG.debug("Requesting subpartitions {} of {}.", (Object)subpartitionIndexSet, (Object)partition);
                subpartitionView = partition.createSubpartitionView(subpartitionIndexSet, availabilityListener);
            }
        }
        return subpartitionView == null ? Optional.empty() : Optional.of(subpartitionView);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void releasePartitionRequestListener(PartitionRequestListener listener) {
        Map<ResultPartitionID, ResultPartition> map = this.registeredPartitions;
        synchronized (map) {
            PartitionRequestListenerManager listenerManager = this.listenerManagers.get(listener.getResultPartitionId());
            if (listenerManager != null) {
                listenerManager.remove(listener.getReceiverId());
                if (listenerManager.isEmpty()) {
                    this.listenerManagers.remove(listener.getResultPartitionId());
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releasePartition(ResultPartitionID partitionId, Throwable cause) {
        PartitionRequestListenerManager listenerManager;
        Map<ResultPartitionID, ResultPartition> map = this.registeredPartitions;
        synchronized (map) {
            ResultPartition resultPartition = this.registeredPartitions.remove(partitionId);
            if (resultPartition != null) {
                resultPartition.release(cause);
                LOG.debug("Released partition {} produced by {}.", (Object)partitionId.getPartitionId(), (Object)partitionId.getProducerId());
            }
            listenerManager = this.listenerManagers.remove(partitionId);
        }
        if (listenerManager != null && !listenerManager.isEmpty()) {
            for (PartitionRequestListener listener : listenerManager.getPartitionRequestListeners()) {
                listener.notifyPartitionCreatedTimeout();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Map<ResultPartitionID, ResultPartition> map = this.registeredPartitions;
        synchronized (map) {
            LOG.debug("Releasing {} partitions because of shutdown.", (Object)this.registeredPartitions.values().size());
            for (ResultPartition partition : this.registeredPartitions.values()) {
                partition.release();
            }
            this.registeredPartitions.clear();
            this.releaseListenerManagers();
            if (this.partitionListenerTimeoutChecker != null) {
                this.partitionListenerTimeoutChecker.cancel(false);
                this.partitionListenerTimeoutChecker = null;
            }
            this.isShutdown = true;
            LOG.debug("Successful shutdown.");
        }
    }

    private void releaseListenerManagers() {
        for (PartitionRequestListenerManager listenerManager : this.listenerManagers.values()) {
            for (PartitionRequestListener listener : listenerManager.getPartitionRequestListeners()) {
                listener.notifyPartitionCreatedTimeout();
            }
        }
        this.listenerManagers.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkRequestPartitionListeners() {
        LinkedList<PartitionRequestListener> timeoutPartitionRequestListeners = new LinkedList<PartitionRequestListener>();
        Map<ResultPartitionID, ResultPartition> map = this.registeredPartitions;
        synchronized (map) {
            if (this.isShutdown) {
                return;
            }
            long now = System.currentTimeMillis();
            Iterator<Map.Entry<ResultPartitionID, PartitionRequestListenerManager>> iterator = this.listenerManagers.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<ResultPartitionID, PartitionRequestListenerManager> entry = iterator.next();
                PartitionRequestListenerManager partitionRequestListeners = entry.getValue();
                partitionRequestListeners.removeExpiration(now, this.partitionListenerTimeout, timeoutPartitionRequestListeners);
                if (!partitionRequestListeners.isEmpty()) continue;
                iterator.remove();
            }
        }
        for (PartitionRequestListener partitionRequestListener : timeoutPartitionRequestListeners) {
            partitionRequestListener.notifyPartitionCreatedTimeout();
        }
    }

    @VisibleForTesting
    public Map<ResultPartitionID, PartitionRequestListenerManager> getListenerManagers() {
        return this.listenerManagers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onConsumedPartition(ResultPartition partition) {
        LOG.debug("Received consume notification from {}.", (Object)partition);
        Map<ResultPartitionID, ResultPartition> map = this.registeredPartitions;
        synchronized (map) {
            PartitionRequestListenerManager listenerManager;
            ResultPartition previous = this.registeredPartitions.remove(partition.getPartitionId());
            if (partition == previous) {
                partition.release();
                ResultPartitionID partitionId = partition.getPartitionId();
                LOG.debug("Released partition {} produced by {}.", (Object)partitionId.getPartitionId(), (Object)partitionId.getProducerId());
            }
            Preconditions.checkState(((listenerManager = this.listenerManagers.remove(partition.getPartitionId())) == null || listenerManager.isEmpty() ? 1 : 0) != 0, (Object)("The partition request listeners is not empty for " + String.valueOf(partition.getPartitionId())));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<ResultPartitionID> getUnreleasedPartitions() {
        Map<ResultPartitionID, ResultPartition> map = this.registeredPartitions;
        synchronized (map) {
            return this.registeredPartitions.keySet();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Optional<ShuffleMetrics> getMetricsOfPartition(ResultPartitionID partitionId) {
        Map<ResultPartitionID, ResultPartition> map = this.registeredPartitions;
        synchronized (map) {
            ResultPartition partition = this.registeredPartitions.get(partitionId);
            if (partition == null) {
                return Optional.empty();
            }
            return Optional.of(new DefaultShuffleMetrics(partition.getResultPartitionBytes().createSnapshot()));
        }
    }
}

