/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateChunkReader;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl;
import org.apache.flink.runtime.checkpoint.channel.InputChannelRecoveredStateHandler;
import org.apache.flink.runtime.checkpoint.channel.RecoveredChannelStateHandler;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionRecoveredStateHandler;
import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.state.AbstractChannelStateHandle;
import org.apache.flink.runtime.state.ChannelStateHelper;
import org.apache.flink.runtime.state.StreamStateHandle;

public class SequentialChannelStateReaderImpl
implements SequentialChannelStateReader {
    private final TaskStateSnapshot taskStateSnapshot;
    private final ChannelStateSerializer serializer;
    private final ChannelStateChunkReader chunkReader;

    public SequentialChannelStateReaderImpl(TaskStateSnapshot taskStateSnapshot) {
        this.taskStateSnapshot = taskStateSnapshot;
        this.serializer = new ChannelStateSerializerImpl();
        this.chunkReader = new ChannelStateChunkReader(this.serializer);
    }

    @Override
    public void readInputData(InputGate[] inputGates) throws IOException, InterruptedException {
        try (InputChannelRecoveredStateHandler stateHandler = new InputChannelRecoveredStateHandler(inputGates, this.taskStateSnapshot.getInputRescalingDescriptor());){
            this.read(stateHandler, SequentialChannelStateReaderImpl.groupByDelegate(this.streamSubtaskStates(), ChannelStateHelper::extractUnmergedInputHandles));
        }
    }

    @Override
    public void readOutputData(ResultPartitionWriter[] writers, boolean notifyAndBlockOnCompletion) throws IOException, InterruptedException {
        try (ResultSubpartitionRecoveredStateHandler stateHandler = new ResultSubpartitionRecoveredStateHandler(writers, notifyAndBlockOnCompletion, this.taskStateSnapshot.getOutputRescalingDescriptor());){
            this.read(stateHandler, SequentialChannelStateReaderImpl.groupByDelegate(this.streamSubtaskStates(), ChannelStateHelper::extractUnmergedOutputHandles));
        }
    }

    private <Info, Context, Handle extends AbstractChannelStateHandle<Info>> void read(RecoveredChannelStateHandler<Info, Context> stateHandler, Map<StreamStateHandle, List<Handle>> streamStateHandleListMap) throws IOException, InterruptedException {
        for (Map.Entry<StreamStateHandle, List<Handle>> delegateAndHandles : streamStateHandleListMap.entrySet()) {
            this.readSequentially(delegateAndHandles.getKey(), delegateAndHandles.getValue(), stateHandler);
        }
    }

    private <Info, Context, Handle extends AbstractChannelStateHandle<Info>> void readSequentially(StreamStateHandle streamStateHandle, List<Handle> channelStateHandles, RecoveredChannelStateHandler<Info, Context> stateHandler) throws IOException, InterruptedException {
        try (FSDataInputStream is = streamStateHandle.openInputStream();){
            this.serializer.readHeader((InputStream)is);
            for (RescaledOffset<Info> offsetAndChannelInfo : SequentialChannelStateReaderImpl.extractOffsetsSorted(channelStateHandles)) {
                this.chunkReader.readChunk(is, offsetAndChannelInfo.offset, stateHandler, offsetAndChannelInfo.channelInfo, offsetAndChannelInfo.oldSubtaskIndex);
            }
        }
    }

    private Stream<OperatorSubtaskState> streamSubtaskStates() {
        return this.taskStateSnapshot.getSubtaskStateMappings().stream().map(Map.Entry::getValue);
    }

    private static <Info, Handle extends AbstractChannelStateHandle<Info>> Map<StreamStateHandle, List<Handle>> groupByDelegate(Stream<OperatorSubtaskState> states, Function<OperatorSubtaskState, StateObjectCollection<Handle>> stateHandleExtractor) {
        return states.map(stateHandleExtractor).flatMap(Collection::stream).peek(SequentialChannelStateReaderImpl.validate()).collect(Collectors.groupingBy(AbstractChannelStateHandle::getDelegate));
    }

    private static <Info, Handle extends AbstractChannelStateHandle<Info>> Consumer<Handle> validate() {
        HashSet seen = new HashSet();
        return handle -> {
            if (!seen.add(new Tuple2(handle.getInfo(), (Object)handle.getSubtaskIndex()))) {
                throw new IllegalStateException("Duplicate channel info: " + String.valueOf(handle));
            }
        };
    }

    private static <Info, Handle extends AbstractChannelStateHandle<Info>> List<RescaledOffset<Info>> extractOffsetsSorted(List<Handle> channelStateHandles) {
        return channelStateHandles.stream().flatMap(SequentialChannelStateReaderImpl::extractOffsets).sorted(Comparator.comparingLong(offsetAndInfo -> offsetAndInfo.offset)).collect(Collectors.toList());
    }

    private static <Info, Handle extends AbstractChannelStateHandle<Info>> Stream<RescaledOffset<Info>> extractOffsets(Handle handle) {
        return handle.getOffsets().stream().map(offset -> new RescaledOffset((Long)offset, handle.getInfo(), handle.getSubtaskIndex()));
    }

    @Override
    public void close() throws Exception {
    }

    static class RescaledOffset<Info> {
        final Long offset;
        final Info channelInfo;
        final int oldSubtaskIndex;

        RescaledOffset(Long offset, Info channelInfo, int oldSubtaskIndex) {
            this.offset = offset;
            this.channelInfo = channelInfo;
            this.oldSubtaskIndex = oldSubtaskIndex;
        }
    }
}

