/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.temporal;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;

public class TemporalRowTimeJoinOperator
extends BaseTwoInputStreamOperatorWithStateRetention {
    private static final long serialVersionUID = 6642514795175288193L;
    private static final String NEXT_LEFT_INDEX_STATE_NAME = "next-index";
    private static final String LEFT_STATE_NAME = "left";
    private static final String RIGHT_STATE_NAME = "right";
    private static final String REGISTERED_TIMER_STATE_NAME = "timer";
    private static final String TIMERS_STATE_NAME = "timers";
    private final boolean isLeftOuterJoin;
    private final InternalTypeInfo<RowData> leftType;
    private final InternalTypeInfo<RowData> rightType;
    private final GeneratedJoinCondition generatedJoinCondition;
    private final int leftTimeAttribute;
    private final int rightTimeAttribute;
    private final RowtimeComparator rightRowtimeComparator;
    private transient ValueState<Long> nextLeftIndex;
    private transient MapState<Long, RowData> leftState;
    private transient MapState<Long, RowData> rightState;
    private transient ValueState<Long> registeredTimer;
    private transient TimestampedCollector<RowData> collector;
    private transient InternalTimerService<VoidNamespace> timerService;
    private transient JoinCondition joinCondition;
    private transient JoinedRowData outRow;
    private transient GenericRowData rightNullRow;

    public TemporalRowTimeJoinOperator(InternalTypeInfo<RowData> leftType, InternalTypeInfo<RowData> rightType, GeneratedJoinCondition generatedJoinCondition, int leftTimeAttribute, int rightTimeAttribute, long minRetentionTime, long maxRetentionTime, boolean isLeftOuterJoin) {
        super(minRetentionTime, maxRetentionTime);
        this.leftType = leftType;
        this.rightType = rightType;
        this.generatedJoinCondition = generatedJoinCondition;
        this.leftTimeAttribute = leftTimeAttribute;
        this.rightTimeAttribute = rightTimeAttribute;
        this.rightRowtimeComparator = new RowtimeComparator(rightTimeAttribute);
        this.isLeftOuterJoin = isLeftOuterJoin;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.joinCondition = (JoinCondition)this.generatedJoinCondition.newInstance(this.getRuntimeContext().getUserCodeClassLoader());
        this.joinCondition.setRuntimeContext((RuntimeContext)this.getRuntimeContext());
        this.joinCondition.open(new Configuration());
        this.nextLeftIndex = this.getRuntimeContext().getState(new ValueStateDescriptor(NEXT_LEFT_INDEX_STATE_NAME, Types.LONG));
        this.leftState = this.getRuntimeContext().getMapState(new MapStateDescriptor(LEFT_STATE_NAME, Types.LONG, this.leftType));
        this.rightState = this.getRuntimeContext().getMapState(new MapStateDescriptor(RIGHT_STATE_NAME, Types.LONG, this.rightType));
        this.registeredTimer = this.getRuntimeContext().getState(new ValueStateDescriptor(REGISTERED_TIMER_STATE_NAME, Types.LONG));
        this.timerService = this.getInternalTimerService(TIMERS_STATE_NAME, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
        this.outRow = new JoinedRowData();
        this.rightNullRow = new GenericRowData(this.rightType.toRowType().getFieldCount());
        this.collector = new TimestampedCollector(this.output);
    }

    public void processElement1(StreamRecord<RowData> element) throws Exception {
        RowData row = (RowData)element.getValue();
        this.leftState.put((Object)this.getNextLeftIndex(), (Object)row);
        this.registerSmallestTimer(this.getLeftTime(row));
        this.registerProcessingCleanupTimer();
    }

    public void processElement2(StreamRecord<RowData> element) throws Exception {
        RowData row = (RowData)element.getValue();
        long rowTime = this.getRightTime(row);
        this.rightState.put((Object)rowTime, (Object)row);
        this.registerSmallestTimer(rowTime);
        this.registerProcessingCleanupTimer();
    }

    public void onEventTime(InternalTimer<Object, VoidNamespace> timer) throws Exception {
        this.registeredTimer.clear();
        long lastUnprocessedTime = this.emitResultAndCleanUpState(this.timerService.currentWatermark());
        if (lastUnprocessedTime < Long.MAX_VALUE) {
            this.registerTimer(lastUnprocessedTime);
        }
        if (this.stateCleaningEnabled) {
            if (lastUnprocessedTime < Long.MAX_VALUE || !this.rightState.isEmpty()) {
                this.registerProcessingCleanupTimer();
            } else {
                this.cleanupLastTimer();
                this.nextLeftIndex.clear();
            }
        }
    }

    public void close() throws Exception {
        if (this.joinCondition != null) {
            this.joinCondition.close();
        }
    }

    private long emitResultAndCleanUpState(long currentWatermark) throws Exception {
        List<RowData> rightRowsSorted = this.getRightRowSorted(this.rightRowtimeComparator);
        long lastUnprocessedTime = Long.MAX_VALUE;
        Iterator leftIterator = this.leftState.entries().iterator();
        TreeMap<Long, RowData> orderedLeftRecords = new TreeMap<Long, RowData>();
        while (leftIterator.hasNext()) {
            Map.Entry entry = (Map.Entry)leftIterator.next();
            Long leftSeq2 = (Long)entry.getKey();
            RowData leftRow2 = (RowData)entry.getValue();
            long leftTime = this.getLeftTime(leftRow2);
            if (leftTime <= currentWatermark) {
                orderedLeftRecords.put(leftSeq2, leftRow2);
                leftIterator.remove();
                continue;
            }
            lastUnprocessedTime = Math.min(lastUnprocessedTime, leftTime);
        }
        orderedLeftRecords.forEach((leftSeq, leftRow) -> {
            long leftTime = this.getLeftTime((RowData)leftRow);
            Optional<RowData> rightRow = this.latestRightRowToJoin(rightRowsSorted, leftTime);
            if (rightRow.isPresent() && RowDataUtil.isAccumulateMsg(rightRow.get())) {
                if (this.joinCondition.apply((RowData)leftRow, rightRow.get())) {
                    this.collectJoinedRow((RowData)leftRow, rightRow.get());
                } else if (this.isLeftOuterJoin) {
                    this.collectJoinedRow((RowData)leftRow, (RowData)this.rightNullRow);
                }
            } else if (this.isLeftOuterJoin) {
                this.collectJoinedRow((RowData)leftRow, (RowData)this.rightNullRow);
            }
        });
        orderedLeftRecords.clear();
        this.cleanupExpiredVersionInState(currentWatermark, rightRowsSorted);
        return lastUnprocessedTime;
    }

    private void collectJoinedRow(RowData leftSideRow, RowData rightRow) {
        this.outRow.setRowKind(leftSideRow.getRowKind());
        this.outRow.replace(leftSideRow, rightRow);
        this.collector.collect((Object)this.outRow);
    }

    private void cleanupExpiredVersionInState(long currentWatermark, List<RowData> rightRowsSorted) throws Exception {
        int indexToKeep = this.firstIndexToKeep(currentWatermark, rightRowsSorted);
        for (int i = 0; i < indexToKeep; ++i) {
            long rightTime = this.getRightTime(rightRowsSorted.get(i));
            this.rightState.remove((Object)rightTime);
        }
    }

    @Override
    public void cleanupState(long time) {
        this.leftState.clear();
        this.rightState.clear();
        this.nextLeftIndex.clear();
        this.registeredTimer.clear();
    }

    private int firstIndexToKeep(long timerTimestamp, List<RowData> rightRowsSorted) {
        int firstIndexNewerThenTimer = this.indexOfFirstElementNewerThanTimer(timerTimestamp, rightRowsSorted);
        if (firstIndexNewerThenTimer < 0) {
            return rightRowsSorted.size() - 1;
        }
        return firstIndexNewerThenTimer - 1;
    }

    private int indexOfFirstElementNewerThanTimer(long timerTimestamp, List<RowData> list) {
        ListIterator<RowData> iter = list.listIterator();
        while (iter.hasNext()) {
            if (this.getRightTime(iter.next()) <= timerTimestamp) continue;
            return iter.previousIndex();
        }
        return -1;
    }

    private Optional<RowData> latestRightRowToJoin(List<RowData> rightRowsSorted, long leftTime) {
        return this.latestRightRowToJoin(rightRowsSorted, 0, rightRowsSorted.size() - 1, leftTime);
    }

    private Optional<RowData> latestRightRowToJoin(List<RowData> rightRowsSorted, int low, int high, long leftTime) {
        if (low > high) {
            if (low - 1 < 0) {
                return Optional.empty();
            }
            return Optional.of(rightRowsSorted.get(low - 1));
        }
        int mid = low + high >>> 1;
        RowData midRow = rightRowsSorted.get(mid);
        long midTime = this.getRightTime(midRow);
        int cmp = Long.compare(midTime, leftTime);
        if (cmp < 0) {
            return this.latestRightRowToJoin(rightRowsSorted, mid + 1, high, leftTime);
        }
        if (cmp > 0) {
            return this.latestRightRowToJoin(rightRowsSorted, low, mid - 1, leftTime);
        }
        return Optional.of(midRow);
    }

    private void registerSmallestTimer(long timestamp) throws IOException {
        Long currentRegisteredTimer = (Long)this.registeredTimer.value();
        if (currentRegisteredTimer == null) {
            this.registerTimer(timestamp);
        } else if (currentRegisteredTimer > timestamp) {
            this.timerService.deleteEventTimeTimer((Object)VoidNamespace.INSTANCE, currentRegisteredTimer.longValue());
            this.registerTimer(timestamp);
        }
    }

    private void registerTimer(long timestamp) throws IOException {
        this.registeredTimer.update((Object)timestamp);
        this.timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, timestamp);
    }

    private List<RowData> getRightRowSorted(RowtimeComparator rowtimeComparator) throws Exception {
        ArrayList<RowData> rightRows = new ArrayList<RowData>();
        for (RowData row : this.rightState.values()) {
            rightRows.add(row);
        }
        rightRows.sort(rowtimeComparator);
        return rightRows;
    }

    private long getNextLeftIndex() throws IOException {
        Long index = (Long)this.nextLeftIndex.value();
        if (index == null) {
            index = 0L;
        }
        this.nextLeftIndex.update((Object)(index + 1L));
        return index;
    }

    private long getLeftTime(RowData leftRow) {
        return leftRow.getLong(this.leftTimeAttribute);
    }

    private long getRightTime(RowData rightRow) {
        return rightRow.getLong(this.rightTimeAttribute);
    }

    @VisibleForTesting
    static String getNextLeftIndexStateName() {
        return NEXT_LEFT_INDEX_STATE_NAME;
    }

    @VisibleForTesting
    static String getRegisteredTimerStateName() {
        return REGISTERED_TIMER_STATE_NAME;
    }

    private static class RowtimeComparator
    implements Comparator<RowData>,
    Serializable {
        private static final long serialVersionUID = 8160134014590716914L;
        private final int timeAttribute;

        private RowtimeComparator(int timeAttribute) {
            this.timeAttribute = timeAttribute;
        }

        @Override
        public int compare(RowData o1, RowData o2) {
            long o1Time = o1.getLong(this.timeAttribute);
            long o2Time = o2.getLong(this.timeAttribute);
            return Long.compare(o1Time, o2Time);
        }
    }
}

