/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageInputChannelId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;

public class DiskTierConsumerAgent
implements TierConsumerAgent {
    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, CompletableFuture<NettyConnectionReader>>> nettyConnectionReaders = new HashMap<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, CompletableFuture<NettyConnectionReader>>>();

    public DiskTierConsumerAgent(List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, TieredStorageNettyService nettyService) {
        for (TieredStorageConsumerSpec tieredStorageConsumerSpec : tieredStorageConsumerSpecs) {
            TieredStoragePartitionId partitionId = tieredStorageConsumerSpec.getPartitionId();
            for (int subpartitionId : tieredStorageConsumerSpec.getSubpartitionIds().values()) {
                this.nettyConnectionReaders.computeIfAbsent(partitionId, ignore -> new HashMap()).put(new TieredStorageSubpartitionId(subpartitionId), nettyService.registerConsumer(partitionId, new TieredStorageSubpartitionId(subpartitionId)));
            }
        }
    }

    @Override
    public void setup(TieredStorageMemoryManager memoryManager) {
    }

    @Override
    public void start() {
    }

    @Override
    public void registerAvailabilityNotifier(AvailabilityNotifier notifier) {
    }

    @Override
    public void updateTierShuffleDescriptor(TieredStoragePartitionId partitionId, TieredStorageInputChannelId inputChannelId, TieredStorageSubpartitionId subpartitionId, TierShuffleDescriptor tierShuffleDescriptor) {
    }

    @Override
    public int peekNextBufferSubpartitionId(TieredStoragePartitionId partitionId, ResultSubpartitionIndexSet indexSet) throws IOException {
        for (CompletableFuture<NettyConnectionReader> readerFuture : this.nettyConnectionReaders.get(partitionId).values()) {
            int subpartitionId;
            try {
                subpartitionId = readerFuture.get().peekNextBufferSubpartitionId();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException("Failed to peek subpartition Id.", e);
            }
            if (!indexSet.contains(subpartitionId)) continue;
            return subpartitionId;
        }
        return -1;
    }

    @Override
    public Optional<Buffer> getNextBuffer(TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId) {
        try {
            return this.nettyConnectionReaders.get(partitionId).get(subpartitionId).get().readBuffer(subpartitionId.getSubpartitionId(), segmentId);
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Failed to get next buffer.", e);
        }
    }

    @Override
    public void close() throws IOException {
    }
}

