/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.FileChannelInputView;
import org.apache.flink.runtime.io.disk.FileChannelOutputView;
import org.apache.flink.runtime.io.disk.InputViewIterator;
import org.apache.flink.runtime.io.disk.SeekableFileChannelInputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.sort.Sorter;
import org.apache.flink.types.NullKeyFieldException;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LargeRecordHandler<T> {
    private static final Logger LOG = LoggerFactory.getLogger(LargeRecordHandler.class);
    private static final int MIN_SEGMENTS_FOR_KEY_SPILLING = 1;
    private static final int MAX_SEGMENTS_FOR_KEY_SPILLING = 4;
    private final TypeSerializer<T> serializer;
    private final TypeComparator<T> comparator;
    private TupleSerializer<Tuple> keySerializer;
    private TupleComparator<Tuple> keyComparator;
    private FileChannelOutputView recordsOutFile;
    private FileChannelOutputView keysOutFile;
    private Tuple keyTuple;
    private FileChannelInputView keysReader;
    private SeekableFileChannelInputView recordsReader;
    private FileIOChannel.ID recordsChannel;
    private FileIOChannel.ID keysChannel;
    private final IOManager ioManager;
    private final MemoryManager memManager;
    private final List<MemorySegment> memory;
    private Sorter<Tuple> keySorter;
    private final TaskInvokable memoryOwner;
    private long recordCounter;
    private int numKeyFields;
    private final int maxFilehandles;
    private volatile boolean closed;
    private final ExecutionConfig executionConfig;

    public LargeRecordHandler(TypeSerializer<T> serializer, TypeComparator<T> comparator, IOManager ioManager, MemoryManager memManager, List<MemorySegment> memory, TaskInvokable memoryOwner, int maxFilehandles, ExecutionConfig executionConfig) {
        this.serializer = (TypeSerializer)Preconditions.checkNotNull(serializer);
        this.comparator = (TypeComparator)Preconditions.checkNotNull(comparator);
        this.ioManager = (IOManager)Preconditions.checkNotNull((Object)ioManager);
        this.memManager = (MemoryManager)Preconditions.checkNotNull((Object)memManager);
        this.memory = (List)Preconditions.checkNotNull(memory);
        this.memoryOwner = (TaskInvokable)Preconditions.checkNotNull((Object)memoryOwner);
        this.maxFilehandles = maxFilehandles;
        this.executionConfig = (ExecutionConfig)Preconditions.checkNotNull((Object)executionConfig);
        Preconditions.checkArgument((maxFilehandles >= 2 ? 1 : 0) != 0);
    }

    public long addRecord(T record) throws IOException {
        long offset;
        if (this.recordsOutFile == null) {
            int i;
            if (this.closed) {
                throw new IllegalStateException("The large record handler has been closed.");
            }
            if (this.recordsReader != null) {
                throw new IllegalStateException("The handler has already switched to sorting.");
            }
            LOG.debug("Initializing the large record spilling...");
            TypeComparator[] keyComps = this.comparator.getFlatComparators();
            this.numKeyFields = keyComps.length;
            Object[] keyHolder = new Object[this.numKeyFields];
            this.comparator.extractKeys(record, keyHolder, 0);
            TypeSerializer[] keySers = new TypeSerializer[this.numKeyFields];
            TypeSerializer[] tupleSers = new TypeSerializer[this.numKeyFields + 1];
            int[] keyPos = new int[this.numKeyFields];
            for (int i2 = 0; i2 < this.numKeyFields; ++i2) {
                keyPos[i2] = i2;
                keySers[i2] = this.createSerializer(keyHolder[i2], i2);
                tupleSers[i2] = keySers[i2];
            }
            tupleSers[this.numKeyFields] = LongSerializer.INSTANCE;
            this.keySerializer = new TupleSerializer(Tuple.getTupleClass((int)(this.numKeyFields + 1)), tupleSers);
            this.keyComparator = new TupleComparator(keyPos, keyComps, keySers);
            this.keyTuple = this.keySerializer.createInstance();
            int totalNumSegments = this.memory.size();
            int segmentsForKeys = totalNumSegments >= 8 ? 4 : Math.max(1, totalNumSegments - 4);
            ArrayList<MemorySegment> recordsMemory = new ArrayList<MemorySegment>();
            ArrayList<MemorySegment> keysMemory = new ArrayList<MemorySegment>();
            for (i = 0; i < segmentsForKeys; ++i) {
                keysMemory.add(this.memory.get(i));
            }
            for (i = segmentsForKeys; i < totalNumSegments; ++i) {
                recordsMemory.add(this.memory.get(i));
            }
            this.recordsChannel = this.ioManager.createChannel();
            this.keysChannel = this.ioManager.createChannel();
            this.recordsOutFile = new FileChannelOutputView(this.ioManager.createBlockChannelWriter(this.recordsChannel), this.memManager, recordsMemory, this.memManager.getPageSize());
            this.keysOutFile = new FileChannelOutputView(this.ioManager.createBlockChannelWriter(this.keysChannel), this.memManager, keysMemory, this.memManager.getPageSize());
        }
        if ((offset = this.recordsOutFile.getWriteOffset()) < 0L) {
            throw new RuntimeException("wrong offset");
        }
        Object[] keyHolder = new Object[this.numKeyFields];
        this.comparator.extractKeys(record, keyHolder, 0);
        for (int i = 0; i < this.numKeyFields; ++i) {
            this.keyTuple.setField(keyHolder[i], i);
        }
        this.keyTuple.setField((Object)offset, this.numKeyFields);
        this.keySerializer.serialize(this.keyTuple, (DataOutputView)this.keysOutFile);
        this.serializer.serialize(record, (DataOutputView)this.recordsOutFile);
        ++this.recordCounter;
        return offset;
    }

    public MutableObjectIterator<T> finishWriteAndSortKeys(List<MemorySegment> memory) throws IOException {
        MutableObjectIterator result;
        int i;
        if (this.recordsOutFile == null || this.keysOutFile == null) {
            throw new IllegalStateException("The LargeRecordHandler has not spilled any records");
        }
        this.recordsOutFile.close();
        this.keysOutFile.close();
        int lastBlockBytesKeys = this.keysOutFile.getBytesInLatestSegment();
        int lastBlockBytesRecords = this.recordsOutFile.getBytesInLatestSegment();
        this.recordsOutFile = null;
        this.keysOutFile = null;
        int pagesForReaders = Math.max(3, Math.min(8, memory.size() / 50));
        int pagesForKeyReader = Math.min(pagesForReaders - 1, 4);
        int pagesForRecordReader = pagesForReaders - pagesForKeyReader;
        ArrayList<MemorySegment> memForRecordReader = new ArrayList<MemorySegment>();
        ArrayList<MemorySegment> memForKeysReader = new ArrayList<MemorySegment>();
        for (i = 0; i < pagesForRecordReader; ++i) {
            memForRecordReader.add(memory.remove(memory.size() - 1));
        }
        for (i = 0; i < pagesForKeyReader; ++i) {
            memForKeysReader.add(memory.remove(memory.size() - 1));
        }
        this.keysReader = new FileChannelInputView(this.ioManager.createBlockChannelReader(this.keysChannel), this.memManager, memForKeysReader, lastBlockBytesKeys);
        InputViewIterator<Tuple> keyIterator = new InputViewIterator<Tuple>(this.keysReader, (TypeSerializer<Tuple>)this.keySerializer);
        try {
            this.keySorter = ExternalSorter.newBuilder(this.memManager, this.memoryOwner, this.keySerializer, this.keyComparator, this.executionConfig).maxNumFileHandles(this.maxFilehandles).sortBuffers(1).enableSpilling(this.ioManager, 1.0).memory(memory).objectReuse(this.executionConfig.isObjectReuseEnabled()).largeRecords(false).build(keyIterator);
        }
        catch (MemoryAllocationException e) {
            throw new IllegalStateException("We should not try allocating memory. Instead the sorter should use the provided memory.", e);
        }
        try {
            result = this.keySorter.getIterator();
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
        this.recordsReader = new SeekableFileChannelInputView(this.ioManager, this.recordsChannel, this.memManager, memForRecordReader, lastBlockBytesRecords);
        return new FetchingIterator<T>(this.serializer, (MutableObjectIterator<Tuple>)result, this.recordsReader, (TypeSerializer<Tuple>)this.keySerializer, this.numKeyFields);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws IOException {
        Throwable ex = null;
        LargeRecordHandler largeRecordHandler = this;
        synchronized (largeRecordHandler) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.recordsOutFile != null) {
                try {
                    this.recordsOutFile.close();
                    this.recordsOutFile = null;
                }
                catch (Throwable t) {
                    LOG.error("Cannot close the large records spill file.", t);
                    ex = t;
                }
            }
            if (this.keysOutFile != null) {
                try {
                    this.keysOutFile.close();
                    this.keysOutFile = null;
                }
                catch (Throwable t) {
                    LOG.error("Cannot close the large records key spill file.", t);
                    Throwable throwable = ex = ex == null ? t : ex;
                }
            }
            if (this.recordsReader != null) {
                try {
                    this.recordsReader.close();
                    this.recordsReader = null;
                }
                catch (Throwable t) {
                    LOG.error("Cannot close the large records reader.", t);
                    Throwable throwable = ex = ex == null ? t : ex;
                }
            }
            if (this.keysReader != null) {
                try {
                    this.keysReader.close();
                    this.keysReader = null;
                }
                catch (Throwable t) {
                    LOG.error("Cannot close the large records key reader.", t);
                    Throwable throwable = ex = ex == null ? t : ex;
                }
            }
            if (this.recordsChannel != null) {
                try {
                    IOManager.deleteChannel(this.recordsChannel);
                    this.recordsChannel = null;
                }
                catch (Throwable t) {
                    LOG.error("Cannot delete the large records spill file.", t);
                    Throwable throwable = ex = ex == null ? t : ex;
                }
            }
            if (this.keysChannel != null) {
                try {
                    IOManager.deleteChannel(this.keysChannel);
                    this.keysChannel = null;
                }
                catch (Throwable t) {
                    LOG.error("Cannot delete the large records key spill file.", t);
                    Throwable throwable = ex = ex == null ? t : ex;
                }
            }
            if (this.keySorter != null) {
                try {
                    this.keySorter.close();
                    this.keySorter = null;
                }
                catch (Throwable t) {
                    LOG.error("Cannot properly dispose the key sorter and clean up its temporary files.", t);
                    ex = ex == null ? t : ex;
                }
            }
            this.memManager.release(this.memory);
            this.recordCounter = 0L;
        }
        if (ex != null) {
            throw new IOException("An error occurred cleaning up spill files in the large record handler.", ex);
        }
    }

    public boolean hasData() {
        return this.recordCounter > 0L;
    }

    private TypeSerializer<Object> createSerializer(Object key, int pos) {
        if (key == null) {
            throw new NullKeyFieldException(pos);
        }
        try {
            TypeInformation info = TypeExtractor.getForObject((Object)key);
            return info.createSerializer(this.executionConfig.getSerializerConfig());
        }
        catch (Throwable t) {
            throw new RuntimeException("Could not create key serializer for type " + String.valueOf(key));
        }
    }

    private static final class FetchingIterator<T>
    implements MutableObjectIterator<T> {
        private final TypeSerializer<T> serializer;
        private final MutableObjectIterator<Tuple> tupleInput;
        private final SeekableFileChannelInputView recordsInputs;
        private Tuple value;
        private final int pointerPos;

        public FetchingIterator(TypeSerializer<T> serializer, MutableObjectIterator<Tuple> tupleInput, SeekableFileChannelInputView recordsInputs, TypeSerializer<Tuple> tupleSerializer, int pointerPos) {
            this.serializer = serializer;
            this.tupleInput = tupleInput;
            this.recordsInputs = recordsInputs;
            this.pointerPos = pointerPos;
            this.value = (Tuple)tupleSerializer.createInstance();
        }

        public T next(T reuse) throws IOException {
            return this.next();
        }

        public T next() throws IOException {
            Tuple value = (Tuple)this.tupleInput.next((Object)this.value);
            if (value != null) {
                this.value = value;
                long pointer = (Long)value.getField(this.pointerPos);
                this.recordsInputs.seek(pointer);
                return (T)this.serializer.deserialize((DataInputView)this.recordsInputs);
            }
            return null;
        }
    }
}

