/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.StateMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.spec.PartitionSpec;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank;
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
import org.apache.flink.table.runtime.operators.rank.ConstantRankRange;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.types.logical.RowType;

@ExecNodeMetadata(name="stream-exec-limit", version=1, consumedOptions={"table.exec.rank.topn-cache-size"}, producedTransformations={"rank"}, minPlanVersion=FlinkVersion.v1_15, minStateVersion=FlinkVersion.v1_15)
public class StreamExecLimit
extends StreamExecRank {
    private final long limitEnd;

    public StreamExecLimit(ReadableConfig tableConfig, long limitStart, long limitEnd, boolean generateUpdateBefore, boolean needRetraction, InputProperty inputProperty, RowType outputType, String description) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecLimit.class), ExecNodeContext.newPersistedConfig(StreamExecLimit.class, tableConfig), new ConstantRankRange(limitStart + 1L, limitEnd), StreamExecLimit.getRankStrategy(needRetraction), generateUpdateBefore, StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, "rankState"), Collections.singletonList(inputProperty), outputType, description);
    }

    @JsonCreator
    public StreamExecLimit(@JsonProperty(value="id") int id, @JsonProperty(value="type") ExecNodeContext context, @JsonProperty(value="configuration") ReadableConfig persistedConfig, @JsonProperty(value="rankRange") ConstantRankRange rankRange, @JsonProperty(value="rankStrategy") RankProcessStrategy rankStrategy, @JsonProperty(value="generateUpdateBefore") boolean generateUpdateBefore, @Nullable @JsonProperty(value="state") List<StateMetadata> stateMetadataList, @JsonProperty(value="inputProperties") List<InputProperty> inputProperties, @JsonProperty(value="outputType") RowType outputType, @JsonProperty(value="description") String description) {
        super(id, context, persistedConfig, RankType.ROW_NUMBER, PartitionSpec.ALL_IN_ONE, SortSpec.ANY, (RankRange)rankRange, rankStrategy, false, generateUpdateBefore, stateMetadataList, inputProperties, outputType, description);
        this.limitEnd = rankRange.getRankEnd();
    }

    private static RankProcessStrategy getRankStrategy(boolean needRetraction) {
        if (needRetraction) {
            return RankProcessStrategy.RETRACT_STRATEGY;
        }
        return RankProcessStrategy.APPEND_FAST_STRATEGY;
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) {
        if (this.limitEnd == Long.MAX_VALUE) {
            throw new TableException("FETCH is missed, which on streaming table is not supported currently.");
        }
        return super.translateToPlanInternal(planner, config);
    }
}

