/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.process;

import java.time.Instant;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.TableRuntimeException;
import org.apache.flink.table.api.dataview.ListView;
import org.apache.flink.table.api.dataview.MapView;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.functions.TableSemantics;
import org.apache.flink.table.runtime.dataview.DataViewUtils;
import org.apache.flink.table.runtime.generated.HashFunction;
import org.apache.flink.table.runtime.generated.ProcessTableRunner;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.operators.process.ExternalTimeContext;
import org.apache.flink.table.runtime.operators.process.PassAllCollector;
import org.apache.flink.table.runtime.operators.process.PassPartitionKeysCollector;
import org.apache.flink.table.runtime.operators.process.PassThroughCollectorBase;
import org.apache.flink.table.runtime.operators.process.ReadableInternalTimeContext;
import org.apache.flink.table.runtime.operators.process.RuntimeChangelogMode;
import org.apache.flink.table.runtime.operators.process.RuntimeStateInfo;
import org.apache.flink.table.runtime.operators.process.RuntimeTableSemantics;
import org.apache.flink.table.runtime.operators.process.TimeConverter;
import org.apache.flink.table.runtime.operators.process.WritableInternalTimeContext;
import org.apache.flink.table.runtime.typeutils.ExternalSerializer;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.StringDataSerializer;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.KeyValueDataType;
import org.apache.flink.table.types.logical.LogicalType;

@Internal
public abstract class AbstractProcessTableOperator
extends AbstractStreamOperatorV2<RowData>
implements Triggerable<RowData, Object> {
    protected final List<RuntimeTableSemantics> tableSemantics;
    protected final ProcessTableRunner processTableRunner;
    private final List<RuntimeStateInfo> stateInfos;
    private final HashFunction[] stateHashCode;
    private final RecordEqualiser[] stateEquals;
    private final RuntimeChangelogMode producedChangelogMode;
    private transient ChangelogMode changelogMode;
    private transient ReadableInternalTimeContext internalTimeContext;
    private transient PassThroughCollectorBase evalCollector;
    private transient PassAllCollector onTimerCollector;
    private transient StateDescriptor<?, ?>[] stateDescriptors;
    private transient State[] stateHandles;
    @Nullable
    private transient MapState<StringData, Long> namedTimersMapState;
    @Nullable
    private transient InternalTimerService<StringData> namedTimerService;
    @Nullable
    private transient InternalTimerService<VoidNamespace> unnamedTimerService;

    public AbstractProcessTableOperator(StreamOperatorParameters<RowData> parameters, List<RuntimeTableSemantics> tableSemantics, List<RuntimeStateInfo> stateInfos, ProcessTableRunner processTableRunner, HashFunction[] stateHashCode, RecordEqualiser[] stateEquals, RuntimeChangelogMode producedChangelogMode) {
        super(parameters, Math.max(tableSemantics.size(), 1));
        this.tableSemantics = tableSemantics;
        this.stateInfos = stateInfos;
        this.processTableRunner = processTableRunner;
        this.stateHashCode = stateHashCode;
        this.stateEquals = stateEquals;
        this.producedChangelogMode = producedChangelogMode;
    }

    public void open() throws Exception {
        super.open();
        RunnerContext runnerContext = new RunnerContext();
        RunnerOnTimerContext runnerOnTimerContext = new RunnerOnTimerContext();
        this.changelogMode = this.producedChangelogMode.deserialize();
        this.setTimerServices();
        this.setTimeContext();
        this.setCollectors();
        this.setStateDescriptors();
        this.setStateHandles();
        this.processTableRunner.initialize(this.stateHandles, this.stateHashCode, this.stateEquals, this.shouldEmitRowtime(), runnerContext, runnerOnTimerContext, this.evalCollector, this.onTimerCollector);
        FunctionUtils.setFunctionRuntimeContext((Function)this.processTableRunner, (RuntimeContext)this.getRuntimeContext());
        FunctionUtils.openFunction((Function)this.processTableRunner, (OpenContext)DefaultOpenContext.INSTANCE);
    }

    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        this.processTableRunner.ingestWatermarkEvent(mark.getTimestamp());
    }

    public void onEventTime(InternalTimer<RowData, Object> timer) throws Exception {
        Object namedTimer = timer.getNamespace();
        this.processTableRunner.ingestTimerEvent((RowData)timer.getKey(), namedTimer == VoidNamespace.INSTANCE ? null : (StringData)namedTimer, timer.getTimestamp());
        this.processTableRunner.processOnTimer();
    }

    public void onProcessingTime(InternalTimer<RowData, Object> timer) throws Exception {
    }

    private void setTimerServices() {
        if (this.shouldEnableTimers()) {
            KeyedStateStore keyedStateStore = (KeyedStateStore)this.getKeyedStateStore().orElseThrow(IllegalStateException::new);
            MapStateDescriptor namedTimersDescriptor = new MapStateDescriptor("internal-named-timers-map", (TypeSerializer)StringDataSerializer.INSTANCE, (TypeSerializer)LongSerializer.INSTANCE);
            this.namedTimersMapState = keyedStateStore.getMapState(namedTimersDescriptor);
            this.namedTimerService = this.getInternalTimerService("user-named-timers", (TypeSerializer)StringDataSerializer.INSTANCE, this);
            this.unnamedTimerService = this.getInternalTimerService("user-unnamed-timers", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
        } else {
            this.namedTimersMapState = null;
            this.namedTimerService = null;
            this.unnamedTimerService = null;
        }
    }

    private void setTimeContext() {
        this.internalTimeContext = this.shouldEnableTimers() ? new WritableInternalTimeContext(this.namedTimersMapState, this.namedTimerService, this.unnamedTimerService) : new ReadableInternalTimeContext();
    }

    private void setCollectors() {
        int tableCount = this.tableSemantics.size();
        if (tableCount == 0 || this.tableSemantics.stream().anyMatch(RuntimeTableSemantics::passColumnsThrough)) {
            assert (tableCount <= 1);
            this.evalCollector = new PassAllCollector((Output<StreamRecord<RowData>>)this.output, this.changelogMode, 1);
        } else {
            this.evalCollector = new PassPartitionKeysCollector((Output<StreamRecord<RowData>>)this.output, this.changelogMode, this.tableSemantics);
        }
        this.onTimerCollector = new PassAllCollector((Output<StreamRecord<RowData>>)this.output, this.changelogMode, tableCount);
    }

    private void setStateDescriptors() {
        StateDescriptor[] stateDescriptors = new StateDescriptor[this.stateInfos.size()];
        for (int i = 0; i < this.stateInfos.size(); ++i) {
            ValueStateDescriptor stateDescriptor;
            RuntimeStateInfo stateInfo = this.stateInfos.get(i);
            DataType dataType = stateInfo.getDataType();
            LogicalType type = dataType.getLogicalType();
            String stateName = stateInfo.getStateName();
            if (DataViewUtils.isDataView(type, ListView.class)) {
                CollectionDataType arrayDataType = (CollectionDataType)dataType.getChildren().get(0);
                DataType elementDataType = arrayDataType.getElementDataType();
                stateDescriptor = new ListStateDescriptor(stateName, ExternalSerializer.of(elementDataType));
            } else if (DataViewUtils.isDataView(type, MapView.class)) {
                KeyValueDataType mapDataType = (KeyValueDataType)dataType.getChildren().get(0);
                DataType keyDataType = mapDataType.getKeyDataType();
                DataType valueDataType = mapDataType.getValueDataType();
                stateDescriptor = new MapStateDescriptor(stateName, ExternalSerializer.of(keyDataType), ExternalSerializer.of(valueDataType));
            } else {
                stateDescriptor = new ValueStateDescriptor(stateName, InternalSerializers.create(type));
            }
            StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(stateInfo.getTimeToLive());
            if (ttlConfig.isEnabled()) {
                stateDescriptor.enableTimeToLive(ttlConfig);
            }
            stateDescriptors[i] = stateDescriptor;
        }
        this.stateDescriptors = stateDescriptors;
    }

    private void setStateHandles() {
        State[] stateHandles = new State[this.stateDescriptors.length];
        for (int i = 0; i < this.stateDescriptors.length; ++i) {
            ValueState stateHandle;
            KeyedStateStore keyedStateStore = (KeyedStateStore)this.getKeyedStateStore().orElseThrow(IllegalStateException::new);
            StateDescriptor<?, ?> stateDescriptor = this.stateDescriptors[i];
            if (stateDescriptor instanceof ValueStateDescriptor) {
                stateHandle = keyedStateStore.getState((ValueStateDescriptor)stateDescriptor);
            } else if (stateDescriptor instanceof ListStateDescriptor) {
                stateHandle = keyedStateStore.getListState((ListStateDescriptor)stateDescriptor);
            } else if (stateDescriptor instanceof MapStateDescriptor) {
                stateHandle = keyedStateStore.getMapState((MapStateDescriptor)stateDescriptor);
            } else {
                throw new IllegalStateException("Unknown state descriptor:" + stateDescriptor);
            }
            stateHandles[i] = stateHandle;
        }
        this.stateHandles = stateHandles;
    }

    private boolean shouldEmitRowtime() {
        return !this.tableSemantics.isEmpty() && this.tableSemantics.stream().allMatch(input -> input.timeColumn() != -1);
    }

    private boolean shouldEnableTimers() {
        return !this.tableSemantics.isEmpty() && this.tableSemantics.stream().allMatch(input -> input.hasSetSemantics() && !input.passColumnsThrough());
    }

    @Internal
    public class RunnerOnTimerContext
    extends RunnerContext
    implements ProcessTableFunction.OnTimerContext {
        @Nullable
        public String currentTimer() {
            StringData timerName = AbstractProcessTableOperator.this.processTableRunner.getTimerName();
            return timerName == null ? null : timerName.toString();
        }
    }

    @Internal
    public class RunnerContext
    implements ProcessTableFunction.Context {
        private final Map<String, RuntimeTableSemantics> tableSemanticsMap = this.createTableSemanticsMap();
        private final Map<String, Integer> stateNameToPosMap = this.createStateNameToPosMap();

        RunnerContext() {
        }

        private Map<String, RuntimeTableSemantics> createTableSemanticsMap() {
            return AbstractProcessTableOperator.this.tableSemantics.stream().map(inputSemantics -> Map.entry(inputSemantics.getArgName(), inputSemantics)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        }

        private Map<String, Integer> createStateNameToPosMap() {
            HashMap<String, Integer> stateNameToPosMap = new HashMap<String, Integer>();
            for (int i = 0; i < AbstractProcessTableOperator.this.stateInfos.size(); ++i) {
                stateNameToPosMap.put(AbstractProcessTableOperator.this.stateInfos.get(i).getStateName(), i);
            }
            return stateNameToPosMap;
        }

        public <TimeType> ProcessTableFunction.TimeContext<TimeType> timeContext(Class<TimeType> conversionClass) {
            TimeConverter timeConverter;
            if (conversionClass == Instant.class) {
                timeConverter = TimeConverter.InstantTimeConverter.INSTANCE;
            } else if (conversionClass == LocalDateTime.class) {
                timeConverter = TimeConverter.LocalDateTimeConverter.INSTANCE;
            } else if (conversionClass == Long.class) {
                timeConverter = TimeConverter.LongTimeConverter.INSTANCE;
            } else {
                throw new TableRuntimeException("Unsupported conversion class for TimeContext: " + conversionClass.getName());
            }
            AbstractProcessTableOperator.this.internalTimeContext.setTime(AbstractProcessTableOperator.this.processTableRunner.getCurrentWatermark(), AbstractProcessTableOperator.this.processTableRunner.getTime());
            return new ExternalTimeContext<LocalDateTime>(AbstractProcessTableOperator.this.internalTimeContext, timeConverter);
        }

        public TableSemantics tableSemanticsFor(String argName) {
            RuntimeTableSemantics tableSemantics = this.tableSemanticsMap.get(argName);
            if (tableSemantics == null) {
                throw new TableRuntimeException("Unknown table argument: " + argName);
            }
            return tableSemantics;
        }

        public void clearState(String stateName) {
            Integer statePos = this.stateNameToPosMap.get(stateName);
            if (statePos == null) {
                throw new TableRuntimeException("Unknown state entry: " + stateName);
            }
            AbstractProcessTableOperator.this.processTableRunner.clearState(statePos);
        }

        public void clearAllState() {
            AbstractProcessTableOperator.this.processTableRunner.clearAllState();
        }

        public void clearAllTimers() {
            AbstractProcessTableOperator.this.internalTimeContext.clearAllTimers();
        }

        public void clearAll() {
            this.clearAllState();
            this.clearAllTimers();
        }

        public ChangelogMode getChangelogMode() {
            return AbstractProcessTableOperator.this.changelogMode;
        }

        @VisibleForTesting
        public StateDescriptor<?, ?> getStateDescriptor(String stateName) {
            Integer statePos = this.stateNameToPosMap.get(stateName);
            if (statePos == null) {
                throw new TableRuntimeException("Unknown state entry: " + stateName);
            }
            return AbstractProcessTableOperator.this.stateDescriptors[statePos];
        }
    }
}

