/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.hash;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.operators.hash.HashPartition;
import org.apache.flink.runtime.operators.hash.MutableHashTable;
import org.apache.flink.runtime.operators.hash.ReOpenableHashPartition;
import org.apache.flink.util.MutableObjectIterator;

public class ReOpenableMutableHashTable<BT, PT>
extends MutableHashTable<BT, PT> {
    private final FileIOChannel.Enumerator spilledInMemoryPartitions;
    private List<HashPartition<BT, PT>> initialPartitions;
    private int initialBucketCount;
    private byte initialPartitionFanOut;
    private boolean spilled = false;

    public ReOpenableMutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer, TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator, TypePairComparator<PT, BT> comparator, List<MemorySegment> memorySegments, IOManager ioManager, boolean useBitmapFilters) {
        super(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, comparator, memorySegments, ioManager, useBitmapFilters);
        this.keepBuildSidePartitions = true;
        this.spilledInMemoryPartitions = ioManager.createChannelEnumerator();
    }

    @Override
    public void open(MutableObjectIterator<BT> buildSide, MutableObjectIterator<PT> probeSide, boolean buildSideOuterJoin) throws IOException {
        super.open(buildSide, probeSide, buildSideOuterJoin);
        this.initialPartitions = new ArrayList<HashPartition<BT, PT>>(this.partitionsBeingBuilt);
        this.initialPartitionFanOut = (byte)this.partitionsBeingBuilt.size();
        this.initialBucketCount = this.numBuckets;
    }

    public void reopenProbe(MutableObjectIterator<PT> probeInput) throws IOException {
        if (this.closed.get()) {
            throw new IllegalStateException("Cannot open probe input because hash join has already been closed");
        }
        this.partitionsBeingBuilt.clear();
        this.probeIterator = new MutableHashTable.ProbeIterator<Object>(probeInput, this.probeSideSerializer.createInstance());
        this.partitionsBeingBuilt.addAll(this.initialPartitions);
        if (this.spilled) {
            this.currentRecursionDepth = 0;
            this.initTable(this.initialBucketCount, this.initialPartitionFanOut);
            for (int i = 0; i < this.partitionsBeingBuilt.size(); ++i) {
                ReOpenableHashPartition part = (ReOpenableHashPartition)this.partitionsBeingBuilt.get(i);
                if (part.isInMemory()) {
                    this.ensureNumBuffersReturned(part.initialPartitionBuffersCount);
                    part.restorePartitionBuffers(this.ioManager, this.availableMemory);
                    HashPartition.PartitionIterator pIter = part.getPartitionIterator(this.buildSideComparator);
                    Object record = this.buildSideSerializer.createInstance();
                    while ((record = pIter.next(record)) != null) {
                        int hashCode2 = ReOpenableMutableHashTable.hash(pIter.getCurrentHashCode(), 0);
                        int posHashCode = hashCode2 % this.initialBucketCount;
                        long pointer = pIter.getPointer();
                        int bucketArrayPos = posHashCode >> this.bucketsPerSegmentBits;
                        int bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << 7;
                        MemorySegment bucket = this.buckets[bucketArrayPos];
                        this.insertBucketEntry(part, bucket, bucketInSegmentPos, hashCode2, pointer, true);
                    }
                    continue;
                }
                --this.writeBehindBuffersAvailable;
                if (this.writeBehindBuffers.size() == 0) {
                    this.writeBehindBuffers.add(this.getNextBuffer());
                    ++this.writeBehindBuffersAvailable;
                }
                part.prepareProbePhase(this.ioManager, this.currentEnumerator, this.writeBehindBuffers);
            }
        } else {
            for (int partIdx = 0; partIdx < this.partitionsBeingBuilt.size(); ++partIdx) {
                HashPartition p = (HashPartition)this.partitionsBeingBuilt.get(partIdx);
                p.prepareProbePhase(this.ioManager, this.currentEnumerator, this.writeBehindBuffers);
            }
        }
    }

    void storeInitialHashTable() throws IOException {
        if (this.spilled) {
            return;
        }
        this.spilled = true;
        for (int partIdx = 0; partIdx < this.initialPartitions.size(); ++partIdx) {
            ReOpenableHashPartition p = (ReOpenableHashPartition)this.initialPartitions.get(partIdx);
            if (!p.isInMemory()) continue;
            this.writeBehindBuffersAvailable += p.spillInMemoryPartition(this.spilledInMemoryPartitions.next(), this.ioManager, this.writeBehindBuffers);
        }
    }

    @Override
    protected boolean prepareNextPartition() throws IOException {
        this.furtherPartitioning = false;
        for (int i = 0; i < this.partitionsBeingBuilt.size(); ++i) {
            HashPartition p = (HashPartition)this.partitionsBeingBuilt.get(i);
            if (p.isInMemory() || p.getProbeSideRecordCount() == 0L) continue;
            this.furtherPartitioning = true;
            break;
        }
        if (this.furtherPartitioning) {
            this.storeInitialHashTable();
        }
        return super.prepareNextPartition();
    }

    @Override
    protected void releaseTable() {
        if (this.furtherPartitioning | this.currentRecursionDepth > 0) {
            super.releaseTable();
        }
    }

    @Override
    protected HashPartition<BT, PT> getNewInMemoryPartition(int number, int recursionLevel) {
        return new ReOpenableHashPartition(this.buildSideSerializer, this.probeSideSerializer, number, recursionLevel, (MemorySegment)this.availableMemory.remove(this.availableMemory.size() - 1), this, this.segmentSize);
    }

    @Override
    public void close() {
        if (this.partitionsBeingBuilt.size() == 0) {
            this.partitionsBeingBuilt.addAll(this.initialPartitions);
        }
        this.furtherPartitioning = true;
        super.close();
    }
}

