/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyPayload;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DiskIOScheduler
implements Runnable,
BufferRecycler,
NettyServiceProducer {
    private static final Logger LOG = LoggerFactory.getLogger(DiskIOScheduler.class);
    private final Object lock = new Object();
    private final TieredStoragePartitionId partitionId;
    private final ScheduledExecutorService ioExecutor;
    private final BatchShuffleReadBufferPool bufferPool;
    private final int maxRequestedBuffers;
    private final Duration bufferRequestTimeout;
    private final BiFunction<Integer, Integer, Integer> segmentIdGetter;
    private final PartitionFileReader partitionFileReader;
    @GuardedBy(value="lock")
    private final Map<NettyConnectionId, ScheduledSubpartitionReader> allScheduledReaders = new HashMap<NettyConnectionId, ScheduledSubpartitionReader>();
    @GuardedBy(value="lock")
    private boolean isRunning;
    @GuardedBy(value="lock")
    private int numRequestedBuffers;
    @GuardedBy(value="lock")
    private boolean isReleased;

    public DiskIOScheduler(TieredStoragePartitionId partitionId, BatchShuffleReadBufferPool bufferPool, ScheduledExecutorService ioExecutor, int maxRequestedBuffers, Duration bufferRequestTimeout, BiFunction<Integer, Integer, Integer> segmentIdGetter, PartitionFileReader partitionFileReader) {
        this.partitionId = partitionId;
        this.bufferPool = (BatchShuffleReadBufferPool)Preconditions.checkNotNull((Object)bufferPool);
        this.ioExecutor = (ScheduledExecutorService)Preconditions.checkNotNull((Object)ioExecutor);
        this.maxRequestedBuffers = maxRequestedBuffers;
        this.bufferRequestTimeout = (Duration)Preconditions.checkNotNull((Object)bufferRequestTimeout);
        this.segmentIdGetter = segmentIdGetter;
        this.partitionFileReader = partitionFileReader;
        bufferPool.registerRequester(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void run() {
        int numBuffersRead = this.readBuffersFromFile();
        Object object = this.lock;
        synchronized (object) {
            this.numRequestedBuffers += numBuffersRead;
            this.isRunning = false;
        }
        if (numBuffersRead == 0) {
            try {
                this.ioExecutor.schedule(this::triggerScheduling, 5L, TimeUnit.MILLISECONDS);
            }
            catch (RejectedExecutionException e) {
                this.ignoreRejectedExecutionOnShutdown(e);
            }
        } else {
            this.triggerScheduling();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void connectionEstablished(TieredStorageSubpartitionId subpartitionId, NettyConnectionWriter nettyConnectionWriter) {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.isReleased ? 1 : 0) != 0, (Object)"DiskIOScheduler is already released.");
            ScheduledSubpartitionReader scheduledSubpartitionReader = new ScheduledSubpartitionReader(subpartitionId, nettyConnectionWriter);
            this.allScheduledReaders.put(nettyConnectionWriter.getNettyConnectionId(), scheduledSubpartitionReader);
            this.triggerScheduling();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void connectionBroken(NettyConnectionId id) {
        Object object = this.lock;
        synchronized (object) {
            this.allScheduledReaders.remove(id);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void recycle(MemorySegment segment) {
        Object object = this.lock;
        synchronized (object) {
            this.bufferPool.recycle(segment);
            --this.numRequestedBuffers;
            this.triggerScheduling();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void release() {
        Object object = this.lock;
        synchronized (object) {
            if (this.isReleased) {
                return;
            }
            this.isReleased = true;
            this.allScheduledReaders.clear();
            this.partitionFileReader.release();
            this.bufferPool.unregisterRequester(this);
        }
    }

    private int readBuffersFromFile() {
        Queue<MemorySegment> buffers;
        List<ScheduledSubpartitionReader> scheduledReaders = this.sortScheduledReaders();
        if (scheduledReaders.isEmpty()) {
            return 0;
        }
        try {
            buffers = this.allocateBuffers();
        }
        catch (Exception e) {
            this.notifyDownstreamSubpartitionFailed(scheduledReaders, e, "Failed to request buffers for data reading.");
            return 0;
        }
        int numBuffersAllocated = buffers.size();
        if (numBuffersAllocated <= 0) {
            return 0;
        }
        for (ScheduledSubpartitionReader scheduledReader : scheduledReaders) {
            if (buffers.isEmpty()) break;
            try {
                scheduledReader.loadDiskDataToBuffers(buffers, this);
            }
            catch (IOException e) {
                this.notifyDownstreamSubpartitionFailed(Collections.singletonList(scheduledReader), e, "Failed to read shuffle data.");
            }
        }
        int numBuffersRead = numBuffersAllocated - buffers.size();
        this.releaseBuffers(buffers);
        return numBuffersRead;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<ScheduledSubpartitionReader> sortScheduledReaders() {
        ArrayList<ScheduledSubpartitionReader> scheduledReaders;
        Iterator iterator = this.lock;
        synchronized (iterator) {
            if (this.isReleased) {
                return new ArrayList<ScheduledSubpartitionReader>();
            }
            scheduledReaders = new ArrayList<ScheduledSubpartitionReader>(this.allScheduledReaders.values());
        }
        for (ScheduledSubpartitionReader reader : scheduledReaders) {
            try {
                reader.prepareForScheduling();
            }
            catch (IOException e) {
                this.notifyDownstreamSubpartitionFailed(Collections.singletonList(reader), e, "Failed to prepare for scheduling.");
            }
        }
        Collections.sort(scheduledReaders);
        return scheduledReaders;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    private Queue<MemorySegment> allocateBuffers() throws Exception {
        timeoutTime = this.getBufferRequestTimeoutTime();
        do lbl-1000:
        // 3 sources

        {
            if (!(buffers = this.bufferPool.requestBuffers()).isEmpty()) {
                return new ArrayDeque<MemorySegment>(buffers);
            }
            var4_3 = this.lock;
            synchronized (var4_3) {
                if (this.isReleased) {
                    return new ArrayDeque<MemorySegment>();
                }
            }
            if (System.currentTimeMillis() < timeoutTime) ** GOTO lbl-1000
            timeoutTime = this.getBufferRequestTimeoutTime();
        } while (System.currentTimeMillis() < timeoutTime);
        throw new TimeoutException(String.format("Buffer request timeout, this means there is a fierce contention of the batch shuffle read memory, please increase '%s'.", new Object[]{TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()}));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyDownstreamSubpartitionFailed(List<ScheduledSubpartitionReader> scheduledReaders, Throwable failureCause, String errorLog) {
        for (ScheduledSubpartitionReader scheduledReader : scheduledReaders) {
            Object object = this.lock;
            synchronized (object) {
                this.allScheduledReaders.remove(scheduledReader.getId());
            }
            scheduledReader.failReader(failureCause);
        }
        LOG.error(errorLog);
    }

    private void releaseBuffers(Queue<MemorySegment> buffers) {
        if (!buffers.isEmpty()) {
            this.bufferPool.recycle(buffers);
            buffers.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void triggerScheduling() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.isRunning && !this.allScheduledReaders.isEmpty() && this.numRequestedBuffers + this.bufferPool.getNumBuffersPerRequest() <= this.maxRequestedBuffers && this.numRequestedBuffers < this.bufferPool.getAverageBuffersPerRequester()) {
                this.isRunning = true;
                try {
                    this.ioExecutor.execute(() -> {
                        try {
                            this.run();
                        }
                        catch (Throwable t) {
                            LOG.error("Failed to read data.", t);
                            FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), t);
                        }
                    });
                }
                catch (RejectedExecutionException e) {
                    this.ignoreRejectedExecutionOnShutdown(e);
                }
            }
        }
    }

    private long getBufferRequestTimeoutTime() {
        return this.bufferPool.getLastBufferOperationTimestamp() + this.bufferRequestTimeout.toMillis();
    }

    private void ignoreRejectedExecutionOnShutdown(RejectedExecutionException e) {
        LOG.warn("Attempt to submit a task to the shut down batch read thread pool should be ignored. No more tasks should be accepted.", (Throwable)e);
    }

    private class ScheduledSubpartitionReader
    implements Comparable<ScheduledSubpartitionReader> {
        private final TieredStorageSubpartitionId subpartitionId;
        private final NettyConnectionWriter nettyConnectionWriter;
        private int nextSegmentId = -1;
        private int nextBufferIndex;
        private long priority;
        private boolean isFailed;
        @Nullable
        private PartitionFileReader.ReadProgress readProgress;

        private ScheduledSubpartitionReader(TieredStorageSubpartitionId subpartitionId, NettyConnectionWriter nettyConnectionWriter) {
            this.subpartitionId = subpartitionId;
            this.nettyConnectionWriter = nettyConnectionWriter;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void loadDiskDataToBuffers(Queue<MemorySegment> buffers, BufferRecycler recycler) throws IOException {
            if (this.isFailed) {
                throw new IOException("The scheduled subpartition reader for " + this.subpartitionId + " has already been failed.");
            }
            CompositeBuffer partialBuffer = null;
            boolean shouldContinueRead = true;
            try {
                while (!buffers.isEmpty() && shouldContinueRead && this.nextSegmentId >= 0) {
                    PartitionFileReader.ReadBufferResult readBufferResult;
                    MemorySegment memorySegment;
                    block9: {
                        memorySegment = buffers.poll();
                        try {
                            readBufferResult = DiskIOScheduler.this.partitionFileReader.readBuffer(DiskIOScheduler.this.partitionId, this.subpartitionId, this.nextSegmentId, this.nextBufferIndex, memorySegment, recycler, this.readProgress, partialBuffer);
                            if (readBufferResult != null) break block9;
                            buffers.add(memorySegment);
                            break;
                        }
                        catch (IOException exception) {
                            buffers.add(memorySegment);
                            throw exception;
                        }
                    }
                    List<Buffer> readBuffers = readBufferResult.getReadBuffers();
                    shouldContinueRead = readBufferResult.continuousReadSuggested();
                    this.readProgress = readBufferResult.getReadProgress();
                    if (readBuffers.isEmpty()) {
                        buffers.add(memorySegment);
                        break;
                    }
                    partialBuffer = this.writeFullBuffersAndGetPartialBuffer(readBuffers);
                }
            }
            finally {
                if (partialBuffer != null) {
                    partialBuffer.recycleBuffer();
                }
            }
        }

        @Override
        public int compareTo(ScheduledSubpartitionReader reader) {
            Preconditions.checkArgument((reader != null ? 1 : 0) != 0);
            return Long.compare(this.getPriority(), reader.getPriority());
        }

        private void prepareForScheduling() throws IOException {
            if (this.nextSegmentId < 0) {
                this.updateSegmentId();
            }
            this.priority = this.nextSegmentId < 0 ? Long.MAX_VALUE : DiskIOScheduler.this.partitionFileReader.getPriority(DiskIOScheduler.this.partitionId, this.subpartitionId, this.nextSegmentId, this.nextBufferIndex, this.readProgress);
        }

        private CompositeBuffer writeFullBuffersAndGetPartialBuffer(List<Buffer> readBuffers) {
            CompositeBuffer partialBuffer = null;
            for (int i = 0; i < readBuffers.size(); ++i) {
                Buffer readBuffer = readBuffers.get(i);
                if (i == readBuffers.size() - 1 && this.isPartialBuffer(readBuffer)) {
                    partialBuffer = (CompositeBuffer)readBuffer;
                    continue;
                }
                this.writeNettyBufferAndUpdateSegmentId(readBuffer);
            }
            return partialBuffer;
        }

        private boolean isPartialBuffer(Buffer readBuffer) {
            return readBuffer instanceof CompositeBuffer && ((CompositeBuffer)readBuffer).missingLength() > 0;
        }

        private void writeNettyBufferAndUpdateSegmentId(Buffer readBuffer) {
            this.writeToNettyConnectionWriter(NettyPayload.newBuffer(readBuffer, this.nextBufferIndex++, this.subpartitionId.getSubpartitionId()));
            if (readBuffer.getDataType() == Buffer.DataType.END_OF_SEGMENT) {
                this.nextSegmentId = -1;
                this.updateSegmentId();
            }
        }

        private void writeToNettyConnectionWriter(NettyPayload nettyPayload) {
            this.nettyConnectionWriter.writeNettyPayload(nettyPayload);
            if (this.nettyConnectionWriter.numQueuedPayloads() <= 1 || this.nettyConnectionWriter.numQueuedBufferPayloads() <= 1) {
                this.notifyAvailable();
            }
        }

        private long getPriority() {
            return this.priority;
        }

        private void notifyAvailable() {
            this.nettyConnectionWriter.notifyAvailable();
        }

        private void failReader(Throwable failureCause) {
            if (this.isFailed) {
                return;
            }
            this.isFailed = true;
            this.nettyConnectionWriter.close(failureCause);
            this.nettyConnectionWriter.notifyAvailable();
        }

        private void updateSegmentId() {
            Integer segmentId = DiskIOScheduler.this.segmentIdGetter.apply(this.subpartitionId.getSubpartitionId(), this.nextBufferIndex);
            if (segmentId != null) {
                this.nextSegmentId = segmentId;
                this.writeToNettyConnectionWriter(NettyPayload.newSegment(segmentId));
            }
        }

        private NettyConnectionId getId() {
            return this.nettyConnectionWriter.getNettyConnectionId();
        }
    }
}

