/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.context.ExecutionContext;
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.operators.aggregate.RecordCounter;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

public class MiniBatchGroupAggFunction
extends MapBundleFunction<RowData, List<RowData>, RowData, RowData> {
    private static final long serialVersionUID = 7455939331036508477L;
    private final GeneratedAggsHandleFunction genAggsHandler;
    private final GeneratedRecordEqualiser genRecordEqualiser;
    private final LogicalType[] accTypes;
    private final RowType inputType;
    private final RecordCounter recordCounter;
    private final boolean generateUpdateBefore;
    private final long stateRetentionTime;
    private transient JoinedRowData resultRow = new JoinedRowData();
    private transient TypeSerializer<RowData> inputRowSerializer;
    private transient AggsHandleFunction function = null;
    private transient RecordEqualiser equaliser = null;
    private transient ValueState<RowData> accState = null;

    public MiniBatchGroupAggFunction(GeneratedAggsHandleFunction genAggsHandler, GeneratedRecordEqualiser genRecordEqualiser, LogicalType[] accTypes, RowType inputType, int indexOfCountStar, boolean generateUpdateBefore, long stateRetentionTime) {
        this.genAggsHandler = genAggsHandler;
        this.genRecordEqualiser = genRecordEqualiser;
        this.recordCounter = RecordCounter.of(indexOfCountStar);
        this.accTypes = accTypes;
        this.inputType = inputType;
        this.generateUpdateBefore = generateUpdateBefore;
        this.stateRetentionTime = stateRetentionTime;
    }

    @Override
    public void open(ExecutionContext ctx) throws Exception {
        super.open(ctx);
        StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(this.stateRetentionTime);
        this.function = (AggsHandleFunction)this.genAggsHandler.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader());
        this.function.open(new PerKeyStateDataViewStore(ctx.getRuntimeContext(), ttlConfig));
        this.equaliser = (RecordEqualiser)this.genRecordEqualiser.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader());
        InternalTypeInfo<RowData> accTypeInfo = InternalTypeInfo.ofFields(this.accTypes);
        ValueStateDescriptor accDesc = new ValueStateDescriptor("accState", accTypeInfo);
        if (ttlConfig.isEnabled()) {
            accDesc.enableTimeToLive(ttlConfig);
        }
        this.accState = ctx.getRuntimeContext().getState(accDesc);
        this.inputRowSerializer = InternalSerializers.create(this.inputType);
        this.resultRow = new JoinedRowData();
    }

    @Override
    public List<RowData> addInput(@Nullable List<RowData> value, RowData input) throws Exception {
        List<RowData> bufferedRows = value;
        if (value == null) {
            bufferedRows = new ArrayList<RowData>();
        }
        bufferedRows.add((RowData)this.inputRowSerializer.copy((Object)input));
        return bufferedRows;
    }

    @Override
    public void finishBundle(Map<RowData, List<RowData>> buffer, Collector<RowData> out) throws Exception {
        for (Map.Entry<RowData, List<RowData>> entry : buffer.entrySet()) {
            Object current;
            RowData currentKey = entry.getKey();
            List<RowData> inputRows = entry.getValue();
            boolean firstRow = false;
            this.ctx.setCurrentKey(currentKey);
            RowData acc = (RowData)this.accState.value();
            if (acc == null) {
                Iterator<RowData> inputIter = inputRows.iterator();
                while (inputIter.hasNext() && RowDataUtil.isRetractMsg((RowData)(current = inputIter.next()))) {
                    inputIter.remove();
                }
                if (inputRows.isEmpty()) {
                    return;
                }
                acc = this.function.createAccumulators();
                firstRow = true;
            }
            this.function.setAccumulators(acc);
            RowData prevAggValue = this.function.getValue();
            current = inputRows.iterator();
            while (current.hasNext()) {
                RowData input = (RowData)current.next();
                if (RowDataUtil.isAccumulateMsg(input)) {
                    this.function.accumulate(input);
                    continue;
                }
                this.function.retract(input);
            }
            RowData newAggValue = this.function.getValue();
            acc = this.function.getAccumulators();
            if (!this.recordCounter.recordCountIsZero(acc)) {
                this.accState.update((Object)acc);
                if (!firstRow) {
                    if (this.stateRetentionTime <= 0L && this.equaliser.equals(prevAggValue, newAggValue)) continue;
                    if (this.generateUpdateBefore) {
                        this.resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.UPDATE_BEFORE);
                        out.collect((Object)this.resultRow);
                    }
                    this.resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
                    out.collect((Object)this.resultRow);
                    continue;
                }
                this.resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT);
                out.collect((Object)this.resultRow);
                continue;
            }
            if (!firstRow) {
                this.resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
                out.collect((Object)this.resultRow);
            }
            this.accState.clear();
            this.function.cleanup();
        }
    }

    @Override
    public void close() throws Exception {
        if (this.function != null) {
            this.function.close();
        }
    }
}

