package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.StateMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink;
import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.lookup.KeyedLookupJoinWrapper;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.types.logical.RowType;

@ExecNodeMetadata(name = "stream-exec-lookup-join", version = 1, producedTransformations = {CommonExecLookupJoin.LOOKUP_JOIN_TRANSFORMATION}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.class */
public class StreamExecLookupJoin extends CommonExecLookupJoin implements StreamExecNode<RowData>, MultipleTransformationTranslator<RowData> {
    public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE = "requireUpsertMaterialize";
    public static final String FIELD_NAME_LOOKUP_KEY_CONTAINS_PRIMARY_KEY = "lookupKeyContainsPrimaryKey";
    public static final String STATE_NAME = "lookupJoinState";

    @JsonProperty(FIELD_NAME_LOOKUP_KEY_CONTAINS_PRIMARY_KEY)
    private final boolean lookupKeyContainsPrimaryKey;

    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
    @JsonProperty("requireUpsertMaterialize")
    private final boolean upsertMaterialize;

    @Nullable
    @JsonInclude(JsonInclude.Include.NON_NULL)
    @JsonProperty(ExecNode.FIELD_NAME_STATE)
    private final List<StateMetadata> stateMetadataList;

    public StreamExecLookupJoin(ReadableConfig readableConfig, FlinkJoinType flinkJoinType, @Nullable RexNode rexNode, @Nullable RexNode rexNode2, TemporalTableSourceSpec temporalTableSourceSpec, Map<Integer, LookupJoinUtil.LookupKey> map, @Nullable List<RexNode> list, @Nullable RexNode rexNode3, boolean z, boolean z2, @Nullable LookupJoinUtil.AsyncLookupOptions asyncLookupOptions, @Nullable LookupJoinUtil.RetryLookupOptions retryLookupOptions, ChangelogMode changelogMode, InputProperty inputProperty, RowType rowType, String str) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecLookupJoin.class), ExecNodeContext.newPersistedConfig(StreamExecLookupJoin.class, readableConfig), flinkJoinType, rexNode, rexNode2, temporalTableSourceSpec, map, list, rexNode3, z, z2, asyncLookupOptions, retryLookupOptions, changelogMode, z2 ? StateMetadata.getOneInputOperatorDefaultMeta(readableConfig, STATE_NAME) : null, Collections.singletonList(inputProperty), rowType, str);
    }

    @JsonCreator
    public StreamExecLookupJoin(@JsonProperty("id") int i, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("joinType") FlinkJoinType flinkJoinType, @Nullable @JsonProperty("preFilterCondition") RexNode rexNode, @Nullable @JsonProperty("joinCondition") RexNode rexNode2, @JsonProperty("temporalTable") TemporalTableSourceSpec temporalTableSourceSpec, @JsonProperty("lookupKeys") Map<Integer, LookupJoinUtil.LookupKey> map, @Nullable @JsonProperty("projectionOnTemporalTable") List<RexNode> list, @Nullable @JsonProperty("filterOnTemporalTable") RexNode rexNode3, @JsonProperty("lookupKeyContainsPrimaryKey") boolean z, @JsonProperty("requireUpsertMaterialize") boolean z2, @Nullable @JsonProperty("asyncOptions") LookupJoinUtil.AsyncLookupOptions asyncLookupOptions, @Nullable @JsonProperty("retryOptions") LookupJoinUtil.RetryLookupOptions retryLookupOptions, @Nullable @JsonProperty("inputChangelogMode") ChangelogMode changelogMode, @Nullable @JsonProperty("state") List<StateMetadata> list2, @JsonProperty("inputProperties") List<InputProperty> list3, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, execNodeContext, readableConfig, flinkJoinType, rexNode, rexNode2, temporalTableSourceSpec, map, list, rexNode3, asyncLookupOptions, retryLookupOptions, changelogMode, list3, rowType, str);
        this.lookupKeyContainsPrimaryKey = z;
        this.upsertMaterialize = z2;
        this.stateMetadataList = list2;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    public Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        return createJoinTransformation(plannerBase, execNodeConfig, this.upsertMaterialize, this.lookupKeyContainsPrimaryKey);
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin
    protected Transformation<RowData> createSyncLookupJoinWithState(Transformation<RowData> transformation, RelOptTable relOptTable, ExecNodeConfig execNodeConfig, ClassLoader classLoader, Map<Integer, LookupJoinUtil.LookupKey> map, TableFunction<?> tableFunction, RelBuilder relBuilder, RowType rowType, RowType rowType2, RowType rowType3, boolean z, boolean z2, boolean z3) {
        EmptyRowDataKeySelector rowDataSelector;
        long stateTtlForOneInputOperator = StateMetadata.getStateTtlForOneInputOperator(execNodeConfig, this.stateMetadataList);
        KeyedProcessOperator keyedProcessOperator = new KeyedProcessOperator(new KeyedLookupJoinWrapper(createSyncLookupJoinFunction(relOptTable, execNodeConfig, classLoader, map, tableFunction, relBuilder, rowType, rowType2, rowType3, z, z2), StateConfigUtil.createTtlConfig(stateTtlForOneInputOperator), InternalSerializers.create(getRightOutputRowType(getProjectionOutputRelDataType(relBuilder), rowType2)), z3));
        List list = (List) map.values().stream().filter(lookupKey -> {
            return lookupKey instanceof LookupJoinUtil.FieldRefLookupKey;
        }).map(lookupKey2 -> {
            return Integer.valueOf(((LookupJoinUtil.FieldRefLookupKey) lookupKey2).index);
        }).collect(Collectors.toList());
        boolean isEmpty = list.isEmpty();
        if (isEmpty) {
            rowDataSelector = EmptyRowDataKeySelector.INSTANCE;
        } else {
            Collections.sort(list);
            rowDataSelector = KeySelectorUtil.getRowDataSelector(classLoader, list.stream().mapToInt((v0) -> {
                return v0.intValue();
            }).toArray(), InternalTypeInfo.of(rowType));
        }
        PartitionTransformation partitionTransformation = new PartitionTransformation(transformation, new KeyGroupStreamPartitioner(rowDataSelector, 128));
        createTransformationMeta(CommonExecSink.PARTITIONER_TRANSFORMATION, "Partitioner", "Partitioner", execNodeConfig).fill(partitionTransformation);
        if (isEmpty) {
            setSingletonParallelism(partitionTransformation);
        } else {
            partitionTransformation.setParallelism(transformation.getParallelism(), false);
        }
        OneInputTransformation createOneInputTransformation = ExecNodeUtil.createOneInputTransformation((Transformation) partitionTransformation, createTransformationMeta(CommonExecLookupJoin.LOOKUP_JOIN_MATERIALIZE_TRANSFORMATION, execNodeConfig), (StreamOperator) keyedProcessOperator, (TypeInformation) InternalTypeInfo.of(rowType3), partitionTransformation.getParallelism(), false);
        createOneInputTransformation.setStateKeySelector(rowDataSelector);
        createOneInputTransformation.setStateKeyType(rowDataSelector.getProducedType());
        if (isEmpty) {
            setSingletonParallelism(createOneInputTransformation);
        }
        return createOneInputTransformation;
    }

    private void setSingletonParallelism(Transformation<RowData> transformation) {
        transformation.setParallelism(1);
        transformation.setMaxParallelism(1);
    }
}
