package org.apache.flink.table.runtime.operators.aggregate.window.combines;

import java.util.Iterator;
import java.util.Objects;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.tvf.state.StateKeyContext;
import org.apache.flink.table.runtime.operators.window.tvf.state.WindowState;
import org.apache.flink.table.runtime.operators.window.tvf.state.WindowValueState;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.runtime.util.WindowKey;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner.class */
public class GlobalAggCombiner implements RecordsCombiner {
    private final WindowTimerService<Long> timerService;
    private final StateKeyContext keyContext;
    private final WindowValueState<Long> accState;
    private final NamespaceAggsHandleFunction<Long> localAggregator;
    private final NamespaceAggsHandleFunction<Long> globalAggregator;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner$Factory.class */
    public static final class Factory implements RecordsCombiner.Factory {
        private static final long serialVersionUID = 1;
        private final GeneratedNamespaceAggsHandleFunction<Long> genLocalAggsHandler;
        private final GeneratedNamespaceAggsHandleFunction<Long> genGlobalAggsHandler;

        public Factory(GeneratedNamespaceAggsHandleFunction<Long> generatedNamespaceAggsHandleFunction, GeneratedNamespaceAggsHandleFunction<Long> generatedNamespaceAggsHandleFunction2) {
            this.genLocalAggsHandler = generatedNamespaceAggsHandleFunction;
            this.genGlobalAggsHandler = generatedNamespaceAggsHandleFunction2;
        }

        @Override // org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner.Factory
        public RecordsCombiner createRecordsCombiner(RuntimeContext runtimeContext, WindowTimerService<Long> windowTimerService, KeyedStateBackend<RowData> keyedStateBackend, WindowState<Long> windowState, boolean z) throws Exception {
            NamespaceAggsHandleFunction newInstance = this.genLocalAggsHandler.newInstance(runtimeContext.getUserCodeClassLoader());
            NamespaceAggsHandleFunction newInstance2 = this.genGlobalAggsHandler.newInstance(runtimeContext.getUserCodeClassLoader());
            newInstance.open(new PerWindowStateDataViewStore(keyedStateBackend, LongSerializer.INSTANCE, runtimeContext));
            newInstance2.open(new PerWindowStateDataViewStore(keyedStateBackend, LongSerializer.INSTANCE, runtimeContext));
            Objects.requireNonNull(keyedStateBackend);
            return new GlobalAggCombiner(windowTimerService, (v1) -> {
                r3.setCurrentKey(v1);
            }, (WindowValueState) windowState, newInstance, newInstance2);
        }
    }

    public GlobalAggCombiner(WindowTimerService<Long> windowTimerService, StateKeyContext stateKeyContext, WindowValueState<Long> windowValueState, NamespaceAggsHandleFunction<Long> namespaceAggsHandleFunction, NamespaceAggsHandleFunction<Long> namespaceAggsHandleFunction2) {
        this.timerService = windowTimerService;
        this.keyContext = stateKeyContext;
        this.accState = windowValueState;
        this.localAggregator = namespaceAggsHandleFunction;
        this.globalAggregator = namespaceAggsHandleFunction2;
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner
    public void combine(WindowKey windowKey, Iterator<RowData> it) throws Exception {
        Long valueOf = Long.valueOf(windowKey.getWindow());
        this.localAggregator.setAccumulators(valueOf, this.localAggregator.createAccumulators());
        while (it.hasNext()) {
            this.localAggregator.merge(valueOf, it.next());
        }
        combineAccumulator(windowKey, this.localAggregator.getAccumulators());
    }

    private void combineAccumulator(WindowKey windowKey, RowData rowData) throws Exception {
        this.keyContext.setCurrentKey(windowKey.getKey());
        Long valueOf = Long.valueOf(windowKey.getWindow());
        RowData value = this.accState.value(valueOf);
        if (value == null) {
            value = this.globalAggregator.createAccumulators();
        }
        this.globalAggregator.setAccumulators(valueOf, value);
        this.globalAggregator.merge(valueOf, rowData);
        this.accState.update(valueOf, this.globalAggregator.getAccumulators());
        if (TimeWindowUtil.isWindowFired(valueOf.longValue(), this.timerService.currentWatermark(), this.timerService.getShiftTimeZone())) {
            return;
        }
        this.timerService.registerEventTimeWindowTimer(valueOf);
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner
    public void close() throws Exception {
        this.localAggregator.close();
        this.globalAggregator.close();
    }
}
