/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.over;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NonTimeRangeUnboundedPrecedingFunction<K>
extends KeyedProcessFunction<K, RowData, RowData> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(NonTimeRangeUnboundedPrecedingFunction.class);
    private final long stateRetentionTime;
    private final GeneratedAggsHandleFunction generatedAggsHandler;
    private final GeneratedRecordEqualiser generatedRecordEqualiser;
    private final GeneratedRecordEqualiser generatedSortKeyEqualiser;
    private final GeneratedRecordComparator generatedSortKeyComparator;
    private transient Comparator<RowData> sortKeyComparator;
    protected final KeySelector<RowData, RowData> sortKeySelector;
    private transient RecordEqualiser valueEqualiser;
    private transient RecordEqualiser sortKeyEqualiser;
    private final LogicalType[] accTypes;
    private final LogicalType[] inputFieldTypes;
    private final LogicalType[] sortKeyTypes;
    protected transient JoinedRowData output;
    private transient ValueState<Long> idState;
    @VisibleForTesting
    protected transient ValueStateDescriptor<Long> idStateDescriptor;
    private transient ValueState<List<Tuple2<RowData, List<Long>>>> sortedListState;
    @VisibleForTesting
    protected transient ValueStateDescriptor<List<Tuple2<RowData, List<Long>>>> sortedListStateDescriptor;
    private transient MapState<Long, RowData> valueMapState;
    @VisibleForTesting
    protected transient MapStateDescriptor<Long, RowData> valueStateDescriptor;
    private transient MapState<RowData, RowData> accMapState;
    @VisibleForTesting
    protected transient MapStateDescriptor<RowData, RowData> accStateDescriptor;
    protected transient AggsHandleFunction aggFuncs;
    private static final String IDS_NOT_FOUND_METRIC_NAME = "numOfIdsNotFound";
    private transient Counter numOfIdsNotFound;
    private static final String SORT_KEYS_NOT_FOUND_METRIC_NAME = "numOfSortKeysNotFound";
    private transient Counter numOfSortKeysNotFound;

    @VisibleForTesting
    protected Counter getNumOfIdsNotFound() {
        return this.numOfIdsNotFound;
    }

    @VisibleForTesting
    protected Counter getNumOfSortKeysNotFound() {
        return this.numOfSortKeysNotFound;
    }

    public NonTimeRangeUnboundedPrecedingFunction(long stateRetentionTime, GeneratedAggsHandleFunction genAggsHandler, GeneratedRecordEqualiser genRecordEqualiser, GeneratedRecordEqualiser genSortKeyEqualiser, GeneratedRecordComparator genSortKeyComparator, LogicalType[] accTypes, LogicalType[] inputFieldTypes, LogicalType[] sortKeyTypes, RowDataKeySelector sortKeySelector) {
        this.stateRetentionTime = stateRetentionTime;
        this.generatedAggsHandler = genAggsHandler;
        this.generatedRecordEqualiser = genRecordEqualiser;
        this.generatedSortKeyEqualiser = genSortKeyEqualiser;
        this.generatedSortKeyComparator = genSortKeyComparator;
        this.accTypes = accTypes;
        this.inputFieldTypes = inputFieldTypes;
        this.sortKeyTypes = sortKeyTypes;
        this.sortKeySelector = sortKeySelector;
    }

    public void open(OpenContext openContext) throws Exception {
        this.aggFuncs = (AggsHandleFunction)this.generatedAggsHandler.newInstance(this.getRuntimeContext().getUserCodeClassLoader());
        this.aggFuncs.open(new PerKeyStateDataViewStore(this.getRuntimeContext()));
        this.output = new JoinedRowData();
        this.valueEqualiser = (RecordEqualiser)this.generatedRecordEqualiser.newInstance(this.getRuntimeContext().getUserCodeClassLoader());
        this.sortKeyEqualiser = (RecordEqualiser)this.generatedSortKeyEqualiser.newInstance(this.getRuntimeContext().getUserCodeClassLoader());
        this.sortKeyComparator = (Comparator)this.generatedSortKeyComparator.newInstance(this.getRuntimeContext().getUserCodeClassLoader());
        StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(this.stateRetentionTime);
        this.idStateDescriptor = new ValueStateDescriptor("idState", Long.class);
        if (ttlConfig.isEnabled()) {
            this.idStateDescriptor.enableTimeToLive(ttlConfig);
        }
        this.idState = this.getRuntimeContext().getState(this.idStateDescriptor);
        InternalTypeInfo<RowData> inputRowTypeInfo = InternalTypeInfo.ofFields(this.inputFieldTypes);
        InternalTypeInfo<RowData> sortKeyRowTypeInfo = InternalTypeInfo.ofFields(this.sortKeyTypes);
        ListTypeInfo idListTypeInfo = new ListTypeInfo(Types.LONG);
        ListTypeInfo listTypeInfo = new ListTypeInfo((TypeInformation)new TupleTypeInfo(new TypeInformation[]{sortKeyRowTypeInfo, idListTypeInfo}));
        this.sortedListStateDescriptor = new ValueStateDescriptor("sortedListState", (TypeInformation)listTypeInfo);
        if (ttlConfig.isEnabled()) {
            this.sortedListStateDescriptor.enableTimeToLive(ttlConfig);
        }
        this.sortedListState = this.getRuntimeContext().getState(this.sortedListStateDescriptor);
        this.valueStateDescriptor = new MapStateDescriptor("valueMapState", Types.LONG, inputRowTypeInfo);
        if (ttlConfig.isEnabled()) {
            this.valueStateDescriptor.enableTimeToLive(ttlConfig);
        }
        this.valueMapState = this.getRuntimeContext().getMapState(this.valueStateDescriptor);
        InternalTypeInfo<RowData> accTypeInfo = InternalTypeInfo.ofFields(this.accTypes);
        this.accStateDescriptor = new MapStateDescriptor("accMapState", sortKeyRowTypeInfo, accTypeInfo);
        if (ttlConfig.isEnabled()) {
            this.accStateDescriptor.enableTimeToLive(ttlConfig);
        }
        this.accMapState = this.getRuntimeContext().getMapState(this.accStateDescriptor);
        this.numOfIdsNotFound = this.getRuntimeContext().getMetricGroup().counter(IDS_NOT_FOUND_METRIC_NAME);
        this.numOfSortKeysNotFound = this.getRuntimeContext().getMetricGroup().counter(SORT_KEYS_NOT_FOUND_METRIC_NAME);
    }

    public void processElement(RowData input, KeyedProcessFunction.Context ctx, Collector<RowData> out) throws Exception {
        RowKind rowKind = input.getRowKind();
        switch (rowKind) {
            case INSERT: 
            case UPDATE_AFTER: {
                this.insertIntoSortedList(input, out);
                break;
            }
            case DELETE: 
            case UPDATE_BEFORE: {
                this.removeFromSortedList(input, out);
            }
        }
        this.aggFuncs.resetAccumulators();
        this.aggFuncs.cleanup();
    }

    private void insertIntoSortedList(RowData insRow, Collector<RowData> out) throws Exception {
        Long id = this.getNextId();
        List<Tuple2<RowData, List<Long>>> sortedList = this.getSortedList();
        RowKind origRowKind = insRow.getRowKind();
        insRow.setRowKind(RowKind.INSERT);
        RowData inputSortKey = (RowData)this.sortKeySelector.getKey((Object)insRow);
        Tuple2<Integer, Boolean> indexForInsertOrUpdate = this.findIndexOfSortKey(sortedList, inputSortKey, false);
        boolean isInsert = (Boolean)indexForInsertOrUpdate.f1;
        int index = (Integer)indexForInsertOrUpdate.f0;
        if (isInsert) {
            if (index == -1) {
                sortedList.add((Tuple2<RowData, List<Long>>)new Tuple2((Object)inputSortKey, List.of(id)));
                index = sortedList.size() - 1;
            } else {
                sortedList.add(index, (Tuple2<RowData, List<Long>>)new Tuple2((Object)inputSortKey, List.of(id)));
            }
            this.setAccumulatorOfPrevRow(sortedList, index - 1);
            this.aggFuncs.accumulate(insRow);
            this.collectInsertOrUpdateAfter(out, insRow, origRowKind, this.aggFuncs.getValue());
        } else {
            ArrayList<Long> ids = new ArrayList<Long>((Collection)sortedList.get((int)index).f1);
            ids.add(id);
            sortedList.set(index, (Tuple2<RowData, List<Long>>)new Tuple2((Object)inputSortKey, ids));
            this.setAccumulatorOfPrevRow(sortedList, index - 1);
            this.reAccumulateIdsAfterInsert(this.aggFuncs.getAccumulators(), ids, insRow);
            this.emitUpdatesForIds(ids, ids.size() - 1, (RowData)this.accMapState.get((Object)inputSortKey), this.aggFuncs.getAccumulators(), origRowKind, insRow, out);
        }
        this.valueMapState.put((Object)id, (Object)insRow);
        this.accMapState.put((Object)inputSortKey, (Object)this.aggFuncs.getAccumulators());
        this.sortedListState.update(sortedList);
        id = id + 1L;
        this.idState.update((Object)id);
        this.processRemainingElements(sortedList, index + 1, this.aggFuncs.getAccumulators(), out);
    }

    private Long getNextId() throws IOException {
        Long id = (Long)this.idState.value();
        if (id == null) {
            id = Long.MIN_VALUE;
        }
        return id;
    }

    private List<Tuple2<RowData, List<Long>>> getSortedList() throws IOException {
        ArrayList sortedList = (ArrayList)this.sortedListState.value();
        if (sortedList == null) {
            sortedList = new ArrayList();
        }
        return sortedList;
    }

    private Tuple2<Integer, Boolean> findIndexOfSortKey(List<Tuple2<RowData, List<Long>>> sortedList, RowData inputSortKey, boolean isEquals) {
        for (int i = 0; i < sortedList.size(); ++i) {
            RowData curSortKey = (RowData)sortedList.get((int)i).f0;
            if (isEquals && this.sortKeyEqualiser.equals(curSortKey, inputSortKey)) {
                return new Tuple2((Object)i, (Object)true);
            }
            int compareResult = this.sortKeyComparator.compare(curSortKey, inputSortKey);
            if (compareResult == 0) {
                return new Tuple2((Object)i, (Object)false);
            }
            if (compareResult <= 0) continue;
            return new Tuple2((Object)i, (Object)true);
        }
        return new Tuple2((Object)-1, (Object)true);
    }

    private void setAccumulatorOfPrevRow(List<Tuple2<RowData, List<Long>>> sortedList, int prevIndex) throws Exception {
        if (prevIndex < 0) {
            this.aggFuncs.createAccumulators();
        } else {
            RowData prevAcc = (RowData)this.accMapState.get((Object)((RowData)sortedList.get((int)prevIndex).f0));
            if (prevAcc == null) {
                this.aggFuncs.createAccumulators();
            } else {
                this.aggFuncs.setAccumulators(prevAcc);
            }
        }
    }

    private void reAccumulateIdsAfterInsert(RowData currAcc, List<Long> ids, RowData insRow) throws Exception {
        this.aggFuncs.setAccumulators(currAcc);
        for (int j = 0; j < ids.size() - 1; ++j) {
            RowData value = (RowData)this.valueMapState.get((Object)ids.get(j));
            this.aggFuncs.accumulate(value);
        }
        this.aggFuncs.accumulate(insRow);
    }

    private RowData setAccumulatorAndGetValue(RowData accumulator) throws Exception {
        this.aggFuncs.setAccumulators(accumulator);
        return this.aggFuncs.getValue();
    }

    private void emitUpdatesForIds(List<Long> ids, int idxOfChangedRow, RowData prevAcc, RowData currAcc, RowKind rowKind, RowData changedRow, Collector<RowData> out) throws Exception {
        RowData prevAggValue = this.setAccumulatorAndGetValue(prevAcc);
        RowData currAggValue = this.setAccumulatorAndGetValue(currAcc);
        if (prevAcc.equals(currAcc)) {
            this.sendUpdateForChangedRow(out, rowKind, changedRow, prevAggValue, currAggValue);
            LOG.debug("Prev accumulator is same as curr accumulator. Skipping further updates.");
            return;
        }
        for (int j = 0; j < ids.size(); ++j) {
            if (j == idxOfChangedRow) {
                this.sendUpdateForChangedRow(out, rowKind, changedRow, prevAggValue, currAggValue);
                continue;
            }
            RowData value = (RowData)this.valueMapState.get((Object)ids.get(j));
            this.collectUpdateBefore(out, value, prevAggValue);
            this.collectUpdateAfter(out, value, currAggValue);
        }
    }

    private void processRemainingElements(List<Tuple2<RowData, List<Long>>> sortedList, int startPos, RowData currAcc, Collector<RowData> out) throws Exception {
        for (int i = startPos; i < sortedList.size(); ++i) {
            Tuple2<RowData, List<Long>> sortKeyAndIds = sortedList.get(i);
            RowData curSortKey = (RowData)sortKeyAndIds.f0;
            List ids = (List)sortKeyAndIds.f1;
            RowData lastValue = null;
            this.aggFuncs.setAccumulators(currAcc);
            for (int j = 0; j < ids.size(); ++j) {
                RowData value = (RowData)this.valueMapState.get((Object)((Long)ids.get(j)));
                this.aggFuncs.accumulate(value);
                lastValue = value;
            }
            currAcc = this.aggFuncs.getAccumulators();
            RowData prevAcc = (RowData)this.accMapState.get((Object)curSortKey);
            if (prevAcc.equals(currAcc)) {
                LOG.debug("Prev accumulator is same as curr accumulator. Skipping further updates.");
                return;
            }
            RowData prevAggValue = this.setAccumulatorAndGetValue(prevAcc);
            RowData currAggValue = this.setAccumulatorAndGetValue(currAcc);
            for (int j = 0; j < ids.size(); ++j) {
                RowData value = ids.size() - 1 == j ? lastValue : (RowData)this.valueMapState.get((Object)((Long)ids.get(j)));
                this.collectUpdateBefore(out, value, prevAggValue);
                this.collectUpdateAfter(out, value, currAggValue);
            }
            this.accMapState.put((Object)curSortKey, (Object)currAcc);
        }
    }

    private void sendUpdateForChangedRow(Collector<RowData> out, RowKind rowKind, RowData changedRow, RowData prevAggValue, RowData currAggValue) {
        if (rowKind == RowKind.DELETE) {
            this.collectDelete(out, changedRow, prevAggValue);
        } else {
            this.collectInsertOrUpdateAfter(out, changedRow, rowKind, currAggValue);
        }
    }

    private void collectInsertOrUpdateAfter(Collector<RowData> out, RowData value, RowKind rowKind, RowData currAggValue) {
        this.output.setRowKind(rowKind);
        this.output.replace(value, currAggValue);
        out.collect((Object)this.output);
    }

    private void collectUpdateBefore(Collector<RowData> out, RowData rowValue, RowData prevAggValue) {
        this.output.setRowKind(RowKind.UPDATE_BEFORE);
        this.output.replace(rowValue, prevAggValue);
        out.collect((Object)this.output);
    }

    private void collectUpdateAfter(Collector<RowData> out, RowData rowValue, RowData currAggValue) {
        this.output.setRowKind(RowKind.UPDATE_AFTER);
        this.output.replace(rowValue, currAggValue);
        out.collect((Object)this.output);
    }

    private void collectDelete(Collector<RowData> out, RowData rowValue, RowData prevAggValue) {
        this.output.setRowKind(RowKind.DELETE);
        this.output.replace(rowValue, prevAggValue);
        out.collect((Object)this.output);
    }

    private void removeFromSortedList(RowData delRow, Collector<RowData> out) throws Exception {
        delRow.setRowKind(RowKind.INSERT);
        RowData inputSortKey = (RowData)this.sortKeySelector.getKey((Object)delRow);
        List<Tuple2<RowData, List<Long>>> sortedList = this.getSortedList();
        int i = (Integer)this.findIndexOfSortKey(sortedList, (RowData)inputSortKey, (boolean)true).f0;
        if (i == -1) {
            LOG.debug("Could not find matching sort key. Skipping delete.");
            this.numOfSortKeysNotFound.inc();
            return;
        }
        RowData curSortKey = (RowData)sortedList.get((int)i).f0;
        ArrayList<Long> ids = new ArrayList<Long>((Collection)sortedList.get((int)i).f1);
        this.setAccumulatorOfPrevRow(sortedList, i - 1);
        int removeIndex = this.reAccumulateIdsAndGetRemoveIndex(ids, delRow);
        if (removeIndex == -1) {
            LOG.info("Could not find matching row to remove. Missing id from sortKey ids list.");
            this.numOfIdsNotFound.inc();
            return;
        }
        this.emitUpdatesForIds(ids, removeIndex, (RowData)this.accMapState.get((Object)curSortKey), this.aggFuncs.getAccumulators(), RowKind.DELETE, delRow, out);
        Long deletedId = (Long)ids.remove(removeIndex);
        i = this.removeIdFromSortedList(sortedList, i, ids, curSortKey);
        this.valueMapState.remove((Object)deletedId);
        if (ids.isEmpty()) {
            this.accMapState.remove((Object)curSortKey);
        } else {
            this.accMapState.put((Object)curSortKey, (Object)this.aggFuncs.getAccumulators());
        }
        this.sortedListState.update(sortedList);
        this.processRemainingElements(sortedList, i, this.aggFuncs.getAccumulators(), out);
    }

    private int reAccumulateIdsAndGetRemoveIndex(List<Long> ids, RowData delRow) throws Exception {
        int removeIndex = -1;
        for (int j = 0; j < ids.size(); ++j) {
            RowData curValue = (RowData)this.valueMapState.get((Object)ids.get(j));
            if (this.valueEqualiser.equals(curValue, delRow)) {
                removeIndex = j;
                continue;
            }
            this.aggFuncs.accumulate(curValue);
        }
        return removeIndex;
    }

    private int removeIdFromSortedList(List<Tuple2<RowData, List<Long>>> sortedList, int idx, List<Long> ids, RowData curSortKey) {
        if (ids.isEmpty()) {
            sortedList.remove(idx);
        } else {
            sortedList.set(idx, (Tuple2<RowData, List<Long>>)new Tuple2((Object)curSortKey, ids));
            ++idx;
        }
        return idx;
    }

    public void close() throws Exception {
        if (null != this.aggFuncs) {
            this.aggFuncs.close();
        }
    }
}

