/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.physical.stream;

import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy;
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowJoin;
import org.apache.flink.table.planner.plan.rules.physical.stream.ImmutableStreamPhysicalWindowJoinRule;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
import org.apache.flink.table.planner.plan.trait.RelWindowProperties;
import org.apache.flink.table.planner.plan.utils.WindowJoinUtil;
import org.immutables.value.Value;
import scala.Tuple2;
import scala.Tuple7;

@Value.Enclosing
public class StreamPhysicalWindowJoinRule
extends RelRule<StreamPhysicalWindowJoinRuleConfig> {
    public static final StreamPhysicalWindowJoinRule INSTANCE = StreamPhysicalWindowJoinRuleConfig.DEFAULT.toRule();

    protected StreamPhysicalWindowJoinRule(StreamPhysicalWindowJoinRuleConfig config) {
        super(config);
    }

    @Override
    public boolean matches(RelOptRuleCall call) {
        FlinkLogicalJoin join = (FlinkLogicalJoin)call.rel(0);
        return WindowJoinUtil.satisfyWindowJoin(join);
    }

    private RelTraitSet toHashTraitByColumns(int[] columns, RelTraitSet inputTraitSet) {
        FlinkRelDistribution distribution = columns.length == 0 ? FlinkRelDistribution.SINGLETON() : FlinkRelDistribution.hash(columns, true);
        return inputTraitSet.replace(FlinkConventions.STREAM_PHYSICAL()).replace(distribution);
    }

    private RelNode convertInput(RelNode input, int[] columns) {
        RelTraitSet requiredTraitSet = this.toHashTraitByColumns(columns, input.getTraitSet());
        return RelOptRule.convert(input, requiredTraitSet);
    }

    @Override
    public void onMatch(RelOptRuleCall call) {
        FlinkLogicalJoin join = (FlinkLogicalJoin)call.rel(0);
        Tuple7<int[], int[], int[], int[], int[], int[], RexNode> tuple7 = WindowJoinUtil.excludeWindowStartEqualityAndEndEqualityFromWindowJoinCondition(join);
        int[] windowStartEqualityLeftKeys = (int[])tuple7._1();
        int[] windowEndEqualityLeftKeys = (int[])tuple7._2();
        int[] windowStartEqualityRightKeys = (int[])tuple7._3();
        int[] windowEndEqualityRightKeys = (int[])tuple7._4();
        int[] remainLeftKeys = (int[])tuple7._5();
        int[] remainRightKeys = (int[])tuple7._6();
        RexNode remainCondition = (RexNode)tuple7._7();
        RelTraitSet providedTraitSet = join.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL());
        FlinkLogicalRel left = (FlinkLogicalRel)call.rel(1);
        FlinkLogicalRel right = (FlinkLogicalRel)call.rel(2);
        RelNode newLeft = this.convertInput(left, remainLeftKeys);
        RelNode newRight = this.convertInput(right, remainRightKeys);
        Tuple2<RelWindowProperties, RelWindowProperties> tuple2 = WindowJoinUtil.getChildWindowProperties(join);
        RelWindowProperties leftWindowProperties = (RelWindowProperties)tuple2._1();
        RelWindowProperties rightWindowProperties = (RelWindowProperties)tuple2._2();
        WindowAttachedWindowingStrategy leftWindowing = new WindowAttachedWindowingStrategy(leftWindowProperties.getWindowSpec(), leftWindowProperties.getTimeAttributeType(), windowStartEqualityLeftKeys[0], windowEndEqualityLeftKeys[0]);
        WindowAttachedWindowingStrategy rightWindowing = new WindowAttachedWindowingStrategy(rightWindowProperties.getWindowSpec(), rightWindowProperties.getTimeAttributeType(), windowStartEqualityRightKeys[0], windowEndEqualityRightKeys[0]);
        StreamPhysicalWindowJoin newWindowJoin = new StreamPhysicalWindowJoin(join.getCluster(), providedTraitSet, newLeft, newRight, join.getJoinType(), remainCondition, leftWindowing, rightWindowing);
        call.transformTo(newWindowJoin);
    }

    @Value.Immutable(singleton=false)
    public static interface StreamPhysicalWindowJoinRuleConfig
    extends RelRule.Config {
        public static final StreamPhysicalWindowJoinRuleConfig DEFAULT = ImmutableStreamPhysicalWindowJoinRule.StreamPhysicalWindowJoinRuleConfig.builder().build().withOperandSupplier(b0 -> b0.operand(FlinkLogicalJoin.class).inputs(b1 -> b1.operand(FlinkLogicalRel.class).anyInputs(), b2 -> b2.operand(FlinkLogicalRel.class).anyInputs())).withDescription("StreamPhysicalWindowJoinRule").as(StreamPhysicalWindowJoinRuleConfig.class);

        @Override
        default public StreamPhysicalWindowJoinRule toRule() {
            return new StreamPhysicalWindowJoinRule(this);
        }
    }
}

