/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join;

import java.util.BitSet;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.SortMergeFullOuterJoinIterator;
import org.apache.flink.table.runtime.operators.join.SortMergeInnerJoinIterator;
import org.apache.flink.table.runtime.operators.join.SortMergeOneSideOuterJoinIterator;
import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.typeutils.AbstractRowSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowSerializer;
import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;

public class SortMergeJoinOperator
extends TableStreamOperator<BaseRow>
implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow>,
BoundedMultiInput {
    private final long reservedSortMemory1;
    private final long reservedSortMemory2;
    private final long externalBufferMemory;
    private final FlinkJoinType type;
    private final boolean leftIsSmaller;
    private final boolean[] filterNulls;
    private GeneratedJoinCondition condFuncCode;
    private GeneratedProjection projectionCode1;
    private GeneratedProjection projectionCode2;
    private GeneratedNormalizedKeyComputer computer1;
    private GeneratedRecordComparator comparator1;
    private GeneratedNormalizedKeyComputer computer2;
    private GeneratedRecordComparator comparator2;
    private GeneratedRecordComparator genKeyComparator;
    private transient MemoryManager memManager;
    private transient IOManager ioManager;
    private transient BinaryRowSerializer serializer1;
    private transient BinaryRowSerializer serializer2;
    private transient BinaryExternalSorter sorter1;
    private transient BinaryExternalSorter sorter2;
    private transient Collector<BaseRow> collector;
    private transient boolean[] isFinished;
    private transient JoinCondition condFunc;
    private transient RecordComparator keyComparator;
    private transient Projection<BaseRow, BinaryRow> projection1;
    private transient Projection<BaseRow, BinaryRow> projection2;
    private transient BaseRow leftNullRow;
    private transient BaseRow rightNullRow;
    private transient JoinedRow joinedRow;

    @VisibleForTesting
    public SortMergeJoinOperator(long reservedSortMemory, long externalBufferMemory, FlinkJoinType type, boolean leftIsSmaller, GeneratedJoinCondition condFuncCode, GeneratedProjection projectionCode1, GeneratedProjection projectionCode2, GeneratedNormalizedKeyComputer computer1, GeneratedRecordComparator comparator1, GeneratedNormalizedKeyComputer computer2, GeneratedRecordComparator comparator2, GeneratedRecordComparator genKeyComparator, boolean[] filterNulls) {
        this(reservedSortMemory, reservedSortMemory, externalBufferMemory, type, leftIsSmaller, condFuncCode, projectionCode1, projectionCode2, computer1, comparator1, computer2, comparator2, genKeyComparator, filterNulls);
    }

    public SortMergeJoinOperator(long reservedSortMemory1, long reservedSortMemory2, long externalBufferMemory, FlinkJoinType type, boolean leftIsSmaller, GeneratedJoinCondition condFuncCode, GeneratedProjection projectionCode1, GeneratedProjection projectionCode2, GeneratedNormalizedKeyComputer computer1, GeneratedRecordComparator comparator1, GeneratedNormalizedKeyComputer computer2, GeneratedRecordComparator comparator2, GeneratedRecordComparator genKeyComparator, boolean[] filterNulls) {
        this.reservedSortMemory1 = reservedSortMemory1;
        this.reservedSortMemory2 = reservedSortMemory2;
        this.externalBufferMemory = externalBufferMemory;
        this.type = type;
        this.leftIsSmaller = leftIsSmaller;
        this.condFuncCode = condFuncCode;
        this.projectionCode1 = projectionCode1;
        this.projectionCode2 = projectionCode2;
        this.computer1 = (GeneratedNormalizedKeyComputer)Preconditions.checkNotNull((Object)computer1);
        this.comparator1 = (GeneratedRecordComparator)Preconditions.checkNotNull((Object)comparator1);
        this.computer2 = (GeneratedNormalizedKeyComputer)Preconditions.checkNotNull((Object)computer2);
        this.comparator2 = (GeneratedRecordComparator)Preconditions.checkNotNull((Object)comparator2);
        this.genKeyComparator = (GeneratedRecordComparator)Preconditions.checkNotNull((Object)genKeyComparator);
        this.filterNulls = filterNulls;
    }

    public void open() throws Exception {
        super.open();
        Configuration conf = this.getContainingTask().getJobConfiguration();
        this.isFinished = new boolean[]{false, false};
        this.collector = new StreamRecordCollector<BaseRow>(this.output);
        ClassLoader cl = this.getUserCodeClassloader();
        AbstractRowSerializer inputSerializer1 = (AbstractRowSerializer)this.getOperatorConfig().getTypeSerializerIn1(cl);
        this.serializer1 = new BinaryRowSerializer(inputSerializer1.getArity());
        AbstractRowSerializer inputSerializer2 = (AbstractRowSerializer)this.getOperatorConfig().getTypeSerializerIn2(cl);
        this.serializer2 = new BinaryRowSerializer(inputSerializer2.getArity());
        this.memManager = this.getContainingTask().getEnvironment().getMemoryManager();
        this.ioManager = this.getContainingTask().getEnvironment().getIOManager();
        this.sorter1 = new BinaryExternalSorter(this.getContainingTask(), this.memManager, this.reservedSortMemory1, this.ioManager, inputSerializer1, this.serializer1, (NormalizedKeyComputer)this.computer1.newInstance(cl), (RecordComparator)this.comparator1.newInstance(cl), conf);
        this.sorter1.startThreads();
        this.sorter2 = new BinaryExternalSorter(this.getContainingTask(), this.memManager, this.reservedSortMemory2, this.ioManager, inputSerializer2, this.serializer2, (NormalizedKeyComputer)this.computer2.newInstance(cl), (RecordComparator)this.comparator2.newInstance(cl), conf);
        this.sorter2.startThreads();
        this.keyComparator = (RecordComparator)this.genKeyComparator.newInstance(cl);
        this.condFunc = (JoinCondition)this.condFuncCode.newInstance(cl);
        this.condFunc.setRuntimeContext((RuntimeContext)this.getRuntimeContext());
        this.condFunc.open(new Configuration());
        this.projection1 = (Projection)this.projectionCode1.newInstance(cl);
        this.projection2 = (Projection)this.projectionCode2.newInstance(cl);
        this.leftNullRow = new GenericRow(this.serializer1.getArity());
        this.rightNullRow = new GenericRow(this.serializer2.getArity());
        this.joinedRow = new JoinedRow();
        this.condFuncCode = null;
        this.computer1 = null;
        this.comparator1 = null;
        this.computer2 = null;
        this.comparator2 = null;
        this.projectionCode1 = null;
        this.projectionCode2 = null;
        this.genKeyComparator = null;
        this.getMetricGroup().gauge("memoryUsedSizeInBytes", () -> this.sorter1.getUsedMemoryInBytes() + this.sorter2.getUsedMemoryInBytes());
        this.getMetricGroup().gauge("numSpillFiles", () -> this.sorter1.getNumSpillFiles() + this.sorter2.getNumSpillFiles());
        this.getMetricGroup().gauge("spillInBytes", () -> this.sorter1.getSpillInBytes() + this.sorter2.getSpillInBytes());
    }

    public void processElement1(StreamRecord<BaseRow> element) throws Exception {
        this.sorter1.write((BaseRow)element.getValue());
    }

    public void processElement2(StreamRecord<BaseRow> element) throws Exception {
        this.sorter2.write((BaseRow)element.getValue());
    }

    public void endInput(int inputId) throws Exception {
        this.isFinished[inputId - 1] = true;
        if (this.isAllFinished()) {
            this.doSortMergeJoin();
        }
    }

    private void doSortMergeJoin() throws Exception {
        MutableObjectIterator<BinaryRow> iterator1 = this.sorter1.getIterator();
        MutableObjectIterator<BinaryRow> iterator2 = this.sorter2.getIterator();
        if (this.type.equals((Object)FlinkJoinType.INNER)) {
            if (!this.leftIsSmaller) {
                try (SortMergeInnerJoinIterator joinIterator = new SortMergeInnerJoinIterator(this.serializer1, this.serializer2, (Projection)this.projection1, (Projection)this.projection2, this.keyComparator, (MutableObjectIterator<BaseRow>)iterator1, iterator2, this.newBuffer(this.serializer2), this.filterNulls);){
                    this.innerJoin(joinIterator, false);
                }
            } else {
                try (SortMergeInnerJoinIterator joinIterator = new SortMergeInnerJoinIterator(this.serializer2, this.serializer1, (Projection)this.projection2, (Projection)this.projection1, this.keyComparator, (MutableObjectIterator<BaseRow>)iterator2, iterator1, this.newBuffer(this.serializer1), this.filterNulls);){
                    this.innerJoin(joinIterator, true);
                }
            }
        } else {
            if (this.type.equals((Object)FlinkJoinType.LEFT)) {
                try (SortMergeOneSideOuterJoinIterator joinIterator = new SortMergeOneSideOuterJoinIterator(this.serializer1, this.serializer2, this.projection1, this.projection2, this.keyComparator, iterator1, iterator2, this.newBuffer(this.serializer2), this.filterNulls);){
                    this.oneSideOuterJoin(joinIterator, false, this.rightNullRow);
                }
            }
            if (this.type.equals((Object)FlinkJoinType.RIGHT)) {
                try (SortMergeOneSideOuterJoinIterator joinIterator = new SortMergeOneSideOuterJoinIterator(this.serializer2, this.serializer1, this.projection2, this.projection1, this.keyComparator, iterator2, iterator1, this.newBuffer(this.serializer1), this.filterNulls);){
                    this.oneSideOuterJoin(joinIterator, true, this.leftNullRow);
                }
            }
            if (this.type.equals((Object)FlinkJoinType.FULL)) {
                try (SortMergeFullOuterJoinIterator fullOuterJoinIterator = new SortMergeFullOuterJoinIterator(this.serializer1, this.serializer2, this.projection1, this.projection2, this.keyComparator, iterator1, iterator2, this.newBuffer(this.serializer1), this.newBuffer(this.serializer2), this.filterNulls);){
                    this.fullOuterJoin(fullOuterJoinIterator);
                }
            }
            if (this.type.equals((Object)FlinkJoinType.SEMI)) {
                try (SortMergeInnerJoinIterator joinIterator = new SortMergeInnerJoinIterator(this.serializer1, this.serializer2, (Projection)this.projection1, (Projection)this.projection2, this.keyComparator, (MutableObjectIterator<BaseRow>)iterator1, iterator2, this.newBuffer(this.serializer2), this.filterNulls);){
                    while (joinIterator.nextInnerJoin()) {
                        BaseRow probeRow = joinIterator.getProbeRow();
                        boolean matched = false;
                        try (ResettableExternalBuffer.BufferIterator iter = joinIterator.getMatchBuffer().newIterator();){
                            while (iter.advanceNext()) {
                                BinaryRow row = iter.getRow();
                                if (!this.condFunc.apply(probeRow, row)) continue;
                                matched = true;
                                break;
                            }
                        }
                        if (!matched) continue;
                        this.collector.collect((Object)probeRow);
                    }
                }
            }
            if (this.type.equals((Object)FlinkJoinType.ANTI)) {
                try (SortMergeOneSideOuterJoinIterator joinIterator = new SortMergeOneSideOuterJoinIterator(this.serializer1, this.serializer2, this.projection1, this.projection2, this.keyComparator, iterator1, iterator2, this.newBuffer(this.serializer2), this.filterNulls);){
                    while (joinIterator.nextOuterJoin()) {
                        BaseRow probeRow = joinIterator.getProbeRow();
                        ResettableExternalBuffer matchBuffer = joinIterator.getMatchBuffer();
                        boolean matched = false;
                        if (matchBuffer != null) {
                            try (ResettableExternalBuffer.BufferIterator iter = matchBuffer.newIterator();){
                                while (iter.advanceNext()) {
                                    BinaryRow row = iter.getRow();
                                    if (!this.condFunc.apply(probeRow, row)) continue;
                                    matched = true;
                                    break;
                                }
                            }
                        }
                        if (matched) continue;
                        this.collector.collect((Object)probeRow);
                    }
                }
            }
            throw new RuntimeException("Not support type: " + (Object)((Object)this.type));
        }
    }

    private void innerJoin(SortMergeInnerJoinIterator iterator, boolean reverseInvoke) throws Exception {
        while (iterator.nextInnerJoin()) {
            BaseRow probeRow = iterator.getProbeRow();
            ResettableExternalBuffer.BufferIterator iter = iterator.getMatchBuffer().newIterator();
            while (iter.advanceNext()) {
                BinaryRow row = iter.getRow();
                this.joinWithCondition(probeRow, row, reverseInvoke);
            }
            iter.close();
        }
    }

    private void oneSideOuterJoin(SortMergeOneSideOuterJoinIterator iterator, boolean reverseInvoke, BaseRow buildNullRow) throws Exception {
        while (iterator.nextOuterJoin()) {
            BaseRow probeRow = iterator.getProbeRow();
            boolean found = false;
            if (iterator.getMatchKey() != null) {
                ResettableExternalBuffer.BufferIterator iter = iterator.getMatchBuffer().newIterator();
                while (iter.advanceNext()) {
                    BinaryRow row = iter.getRow();
                    found |= this.joinWithCondition(probeRow, row, reverseInvoke);
                }
                iter.close();
            }
            if (found) continue;
            this.collect(probeRow, buildNullRow, reverseInvoke);
        }
    }

    private void fullOuterJoin(SortMergeFullOuterJoinIterator iterator) throws Exception {
        BitSet bitSet = new BitSet();
        while (iterator.nextOuterJoin()) {
            ResettableExternalBuffer.BufferIterator iter;
            bitSet.clear();
            BinaryRow matchKey = iterator.getMatchKey();
            ResettableExternalBuffer buffer1 = iterator.getBuffer1();
            ResettableExternalBuffer buffer2 = iterator.getBuffer2();
            if (matchKey == null && buffer1.size() > 0) {
                iter = buffer1.newIterator();
                while (iter.advanceNext()) {
                    BinaryRow row1 = iter.getRow();
                    this.collector.collect((Object)this.joinedRow.replace(row1, this.rightNullRow));
                }
                iter.close();
                continue;
            }
            if (matchKey == null && buffer2.size() > 0) {
                iter = buffer2.newIterator();
                while (iter.advanceNext()) {
                    BinaryRow row2 = iter.getRow();
                    this.collector.collect((Object)this.joinedRow.replace(this.leftNullRow, row2));
                }
                iter.close();
                continue;
            }
            if (matchKey != null) {
                ResettableExternalBuffer.BufferIterator iter1 = buffer1.newIterator();
                while (iter1.advanceNext()) {
                    BinaryRow row1 = iter1.getRow();
                    boolean found = false;
                    int index = 0;
                    ResettableExternalBuffer.BufferIterator iter2 = buffer2.newIterator();
                    while (iter2.advanceNext()) {
                        BinaryRow row2 = iter2.getRow();
                        if (this.condFunc.apply(row1, row2)) {
                            this.collector.collect((Object)this.joinedRow.replace(row1, row2));
                            found = true;
                            bitSet.set(index);
                        }
                        ++index;
                    }
                    iter2.close();
                    if (found) continue;
                    this.collector.collect((Object)this.joinedRow.replace(row1, this.rightNullRow));
                }
                iter1.close();
                int index = 0;
                ResettableExternalBuffer.BufferIterator iter2 = buffer2.newIterator();
                while (iter2.advanceNext()) {
                    BinaryRow row2 = iter2.getRow();
                    if (!bitSet.get(index)) {
                        this.collector.collect((Object)this.joinedRow.replace(this.leftNullRow, row2));
                    }
                    ++index;
                }
                iter2.close();
                continue;
            }
            throw new RuntimeException("There is a bug.");
        }
    }

    private boolean joinWithCondition(BaseRow row1, BaseRow row2, boolean reverseInvoke) throws Exception {
        if (reverseInvoke) {
            if (this.condFunc.apply(row2, row1)) {
                this.collector.collect((Object)this.joinedRow.replace(row2, row1));
                return true;
            }
        } else if (this.condFunc.apply(row1, row2)) {
            this.collector.collect((Object)this.joinedRow.replace(row1, row2));
            return true;
        }
        return false;
    }

    private void collect(BaseRow row1, BaseRow row2, boolean reverseInvoke) {
        if (reverseInvoke) {
            this.collector.collect((Object)this.joinedRow.replace(row2, row1));
        } else {
            this.collector.collect((Object)this.joinedRow.replace(row1, row2));
        }
    }

    private ResettableExternalBuffer newBuffer(BinaryRowSerializer serializer) throws MemoryAllocationException {
        List externalBufferSegments = this.memManager.allocatePages((Object)this.getContainingTask(), (int)(this.externalBufferMemory / (long)this.memManager.getPageSize()));
        return new ResettableExternalBuffer(this.memManager, this.ioManager, externalBufferSegments, serializer, false);
    }

    private boolean isAllFinished() {
        return this.isFinished[0] && this.isFinished[1];
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.sorter1 != null) {
            this.sorter1.close();
        }
        if (this.sorter2 != null) {
            this.sorter2.close();
        }
        this.condFunc.close();
    }
}

