/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate.window.processors;

import java.time.ZoneId;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalMergingState;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractWindowAggProcessor;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowTimerServiceImpl;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;

public class UnsliceWindowAggProcessor
extends AbstractWindowAggProcessor<TimeWindow>
implements UnslicingWindowProcessor<TimeWindow> {
    private final UnsliceAssigner<TimeWindow> unsliceAssigner;
    private final Trigger<TimeWindow> trigger;
    private transient MetricGroup metrics;
    protected transient MergingWindowProcessFunction<RowData, TimeWindow> windowFunction;
    private transient TriggerContextImpl triggerContext;

    public UnsliceWindowAggProcessor(GeneratedNamespaceAggsHandleFunction<TimeWindow> genAggsHandler, UnsliceAssigner<TimeWindow> unsliceAssigner, TypeSerializer<RowData> accSerializer, int indexOfCountStar, ZoneId shiftTimeZone) {
        super(genAggsHandler, unsliceAssigner, accSerializer, unsliceAssigner.isEventTime(), indexOfCountStar, shiftTimeZone);
        this.unsliceAssigner = unsliceAssigner;
        this.trigger = this.isEventTime ? EventTimeTriggers.afterEndOfWindow() : ProcessingTimeTriggers.afterEndOfWindow();
    }

    @Override
    public void open(WindowProcessor.Context<TimeWindow> context) throws Exception {
        super.open(context);
        this.metrics = context.getRuntimeContext().getMetricGroup();
        this.windowFunction = new MergingWindowProcessFunction(this.unsliceAssigner.getMergingWindowAssigner(), this.aggregator, this.unsliceAssigner.getMergingWindowAssigner().getWindowSerializer(new ExecutionConfig()), 0L);
        this.triggerContext = new TriggerContextImpl();
        this.triggerContext.open();
        WindowContextImpl windowContext = new WindowContextImpl();
        this.windowFunction.open(windowContext);
    }

    @Override
    public boolean processElement(RowData key, RowData element) throws Exception {
        Optional<TimeWindow> affectedWindowOp = this.unsliceAssigner.assignStateNamespace(element, this.clockService, this.windowFunction);
        boolean isElementDropped = true;
        if (affectedWindowOp.isPresent()) {
            TimeWindow affectedWindow = affectedWindowOp.get();
            isElementDropped = false;
            RowData acc = this.windowState.value(affectedWindow);
            if (acc == null) {
                acc = this.aggregator.createAccumulators();
            }
            this.aggregator.setAccumulators(affectedWindow, acc);
            if (RowDataUtil.isAccumulateMsg(element)) {
                this.aggregator.accumulate(element);
            } else {
                this.aggregator.retract(element);
            }
            acc = this.aggregator.getAccumulators();
            this.windowState.update(affectedWindow, acc);
        }
        Optional<TimeWindow> actualWindowOp = this.unsliceAssigner.assignActualWindow(element, this.clockService, this.windowFunction);
        Preconditions.checkArgument((affectedWindowOp.isPresent() && actualWindowOp.isPresent() || !affectedWindowOp.isPresent() && !actualWindowOp.isPresent() ? 1 : 0) != 0);
        if (actualWindowOp.isPresent()) {
            TimeWindow actualWindow = actualWindowOp.get();
            this.triggerContext.setWindow(actualWindow);
            long triggerTime = TimeWindowUtil.toEpochMillsForTimer(actualWindow.maxTimestamp(), this.shiftTimeZone);
            if (this.isEventTime) {
                this.triggerContext.registerEventTimeTimer(triggerTime);
            } else {
                this.triggerContext.registerProcessingTimeTimer(triggerTime);
            }
        }
        return isElementDropped;
    }

    @Override
    public void fireWindow(long timerTimestamp, TimeWindow window) throws Exception {
        this.windowFunction.prepareAggregateAccumulatorForEmit(window);
        RowData aggResult = this.aggregator.getValue(window);
        this.triggerContext.setWindow(window);
        boolean isFired = this.isEventTime ? this.triggerContext.onEventTime(timerTimestamp) : this.triggerContext.onProcessingTime(timerTimestamp);
        if (isFired && !this.emptySupplier.get().booleanValue()) {
            this.collect(aggResult);
        }
    }

    @Override
    public void clearWindow(long timerTimestamp, TimeWindow window) throws Exception {
        this.windowFunction.cleanWindowIfNeeded(window, timerTimestamp);
    }

    @Override
    public void advanceProgress(long progress) throws Exception {
    }

    @Override
    public void prepareCheckpoint() throws Exception {
    }

    @Override
    public TypeSerializer<TimeWindow> createWindowSerializer() {
        return this.unsliceAssigner.getMergingWindowAssigner().getWindowSerializer(new ExecutionConfig());
    }

    @Override
    protected WindowTimerService<TimeWindow> getWindowTimerService() {
        return new UnslicingWindowTimerServiceImpl((InternalTimerService<TimeWindow>)this.ctx.getTimerService(), this.shiftTimeZone);
    }

    private class TriggerContextImpl
    implements Trigger.OnMergeContext {
        private TimeWindow window;
        private Collection<TimeWindow> mergedWindows;

        private TriggerContextImpl() {
        }

        public void open() throws Exception {
            UnsliceWindowAggProcessor.this.trigger.open(this);
        }

        @Override
        public MetricGroup getMetricGroup() {
            return UnsliceWindowAggProcessor.this.metrics;
        }

        public boolean onProcessingTime(long time) throws Exception {
            return UnsliceWindowAggProcessor.this.trigger.onProcessingTime(time, this.window);
        }

        public boolean onEventTime(long time) throws Exception {
            return UnsliceWindowAggProcessor.this.trigger.onEventTime(time, this.window);
        }

        public void onMerge() throws Exception {
            UnsliceWindowAggProcessor.this.trigger.onMerge(this.window, this);
        }

        public void setWindow(TimeWindow window) {
            this.window = window;
        }

        public void setMergedWindows(Collection<TimeWindow> mergedWindows) {
            this.mergedWindows = mergedWindows;
        }

        @Override
        public long getCurrentProcessingTime() {
            return UnsliceWindowAggProcessor.this.ctx.getTimerService().currentProcessingTime();
        }

        @Override
        public long getCurrentWatermark() {
            return UnsliceWindowAggProcessor.this.ctx.getTimerService().currentWatermark();
        }

        @Override
        public void registerProcessingTimeTimer(long time) {
            UnsliceWindowAggProcessor.this.ctx.getTimerService().registerProcessingTimeTimer((Object)this.window, time);
        }

        @Override
        public void registerEventTimeTimer(long time) {
            UnsliceWindowAggProcessor.this.ctx.getTimerService().registerEventTimeTimer((Object)this.window, time);
        }

        @Override
        public void deleteProcessingTimeTimer(long time) {
            UnsliceWindowAggProcessor.this.ctx.getTimerService().deleteProcessingTimeTimer((Object)this.window, time);
        }

        @Override
        public void deleteEventTimeTimer(long time) {
            UnsliceWindowAggProcessor.this.ctx.getTimerService().deleteEventTimeTimer((Object)this.window, time);
        }

        @Override
        public ZoneId getShiftTimeZone() {
            return UnsliceWindowAggProcessor.this.shiftTimeZone;
        }

        public void clear() throws Exception {
            UnsliceWindowAggProcessor.this.trigger.clear(this.window);
        }

        @Override
        public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            if (this.mergedWindows != null && !this.mergedWindows.isEmpty()) {
                try {
                    State state = UnsliceWindowAggProcessor.this.ctx.getKeyedStateBackend().getOrCreateKeyedState(UnsliceWindowAggProcessor.this.createWindowSerializer(), stateDescriptor);
                    if (!(state instanceof InternalMergingState)) {
                        throw new IllegalArgumentException("The given state descriptor does not refer to a mergeable state (MergingState)");
                    }
                    ((InternalMergingState)state).mergeNamespaces((Object)this.window, this.mergedWindows);
                }
                catch (Exception e) {
                    throw new RuntimeException("Error while merging state.", e);
                }
            }
        }

        @Override
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                return (S)UnsliceWindowAggProcessor.this.ctx.getKeyedStateBackend().getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, stateDescriptor);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }
    }

    private class WindowContextImpl
    implements MergingWindowProcessFunction.MergingContext<RowData, TimeWindow> {
        private WindowContextImpl() {
        }

        @Override
        public long currentProcessingTime() {
            return UnsliceWindowAggProcessor.this.ctx.getTimerService().currentProcessingTime();
        }

        @Override
        public long currentWatermark() {
            return UnsliceWindowAggProcessor.this.ctx.getTimerService().currentWatermark();
        }

        @Override
        public ZoneId getShiftTimeZone() {
            return UnsliceWindowAggProcessor.this.shiftTimeZone;
        }

        @Override
        public RowData getWindowAccumulators(TimeWindow window) throws Exception {
            return UnsliceWindowAggProcessor.this.windowState.value(window);
        }

        @Override
        public void setWindowAccumulators(TimeWindow window, RowData acc) throws Exception {
            UnsliceWindowAggProcessor.this.windowState.update(window, acc);
        }

        @Override
        public void clearWindowState(TimeWindow window) throws Exception {
            UnsliceWindowAggProcessor.this.windowState.clear(window);
            UnsliceWindowAggProcessor.this.aggregator.cleanup(window);
        }

        @Override
        public void clearPreviousState(TimeWindow window) throws Exception {
        }

        @Override
        public void clearTrigger(TimeWindow window) throws Exception {
            UnsliceWindowAggProcessor.this.triggerContext.setWindow(window);
            UnsliceWindowAggProcessor.this.triggerContext.clear();
        }

        @Override
        public void deleteCleanupTimer(TimeWindow window) throws Exception {
            long cleanupTime = TimeWindowUtil.toEpochMillsForTimer(window.maxTimestamp(), UnsliceWindowAggProcessor.this.shiftTimeZone);
            if (cleanupTime == Long.MAX_VALUE) {
                return;
            }
            if (UnsliceWindowAggProcessor.this.unsliceAssigner.isEventTime()) {
                UnsliceWindowAggProcessor.this.triggerContext.deleteEventTimeTimer(cleanupTime);
            } else {
                UnsliceWindowAggProcessor.this.triggerContext.deleteProcessingTimeTimer(cleanupTime);
            }
        }

        @Override
        public void onMerge(TimeWindow newWindow, Collection<TimeWindow> mergedWindows) throws Exception {
            UnsliceWindowAggProcessor.this.triggerContext.setWindow(newWindow);
            UnsliceWindowAggProcessor.this.triggerContext.setMergedWindows(mergedWindows);
            UnsliceWindowAggProcessor.this.triggerContext.onMerge();
        }

        @Override
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
            Objects.requireNonNull(stateDescriptor, "The state properties must not be null");
            return (S)UnsliceWindowAggProcessor.this.ctx.getKeyedStateBackend().getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, stateDescriptor);
        }

        @Override
        public RowData currentKey() {
            return (RowData)UnsliceWindowAggProcessor.this.ctx.getKeyedStateBackend().getCurrentKey();
        }

        @Override
        public BiConsumerWithException<TimeWindow, Collection<TimeWindow>, Throwable> getWindowStateMergingConsumer() {
            return new MergingWindowProcessFunction.DefaultAccMergingConsumer<TimeWindow>(this, UnsliceWindowAggProcessor.this.aggregator);
        }
    }
}

