/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.com.google.common.collect.ImmutableList;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.RelNode;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.CorrelationId;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.JoinRelType;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexCall;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rex.RexNode;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.util.Pair;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.joinlibrary.Join;
import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;

public class BeamJoinRel
extends org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.Join
implements BeamRelNode {
    public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
        super(cluster, traits, left, right, condition, variablesSet, joinType);
    }

    @Override
    public org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
        return new BeamJoinRel(this.getCluster(), traitSet, left, right, conditionExpr, this.variablesSet, joinType);
    }

    @Override
    public List<RelNode> getPCollectionInputs() {
        if (this.isSideInputJoin()) {
            return ImmutableList.of(BeamSqlRelUtils.getBeamRelInput(this.left));
        }
        return BeamRelNode.super.getPCollectionInputs();
    }

    @Override
    public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
        return new Transform();
    }

    private boolean isSideInputJoin() {
        BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(this.left);
        BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(this.right);
        return !this.seekable(leftRelNode) && this.seekable(rightRelNode);
    }

    private void verifySupportedTrigger(PCollection<Row> pCollection) {
        WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();
        if (PCollection.IsBounded.UNBOUNDED.equals((Object)pCollection.isBounded()) && !this.triggersOncePerWindow(windowingStrategy)) {
            throw new UnsupportedOperationException("Joining unbounded PCollections is currently only supported for non-global windows with triggers that are known to produce output once per window,such as the default trigger with zero allowed lateness. In these cases Beam can guarantee it joins all input elements once per window. " + windowingStrategy + " is not supported");
        }
    }

    private boolean triggersOncePerWindow(WindowingStrategy windowingStrategy) {
        Trigger trigger = windowingStrategy.getTrigger();
        return !(windowingStrategy.getWindowFn() instanceof GlobalWindows) && trigger instanceof DefaultTrigger && Duration.ZERO.equals((Object)windowingStrategy.getAllowedLateness());
    }

    private PCollection<Row> standardJoin(PCollection<KV<Row, Row>> extractedLeftRows, PCollection<KV<Row, Row>> extractedRightRows, Schema leftSchema, Schema rightSchema) {
        PCollection joinedRows = null;
        switch (this.joinType) {
            case LEFT: {
                Schema rigthNullSchema = this.buildNullSchema(rightSchema);
                Row rightNullRow = Row.nullRow((Schema)rigthNullSchema);
                extractedRightRows = BeamJoinRel.setValueCoder(extractedRightRows, SchemaCoder.of((Schema)rigthNullSchema));
                joinedRows = Join.leftOuterJoin(extractedLeftRows, extractedRightRows, (Object)rightNullRow);
                break;
            }
            case RIGHT: {
                Schema leftNullSchema = this.buildNullSchema(leftSchema);
                Row leftNullRow = Row.nullRow((Schema)leftNullSchema);
                extractedLeftRows = BeamJoinRel.setValueCoder(extractedLeftRows, SchemaCoder.of((Schema)leftNullSchema));
                joinedRows = Join.rightOuterJoin(extractedLeftRows, extractedRightRows, (Object)leftNullRow);
                break;
            }
            case FULL: {
                Schema leftNullSchema = this.buildNullSchema(leftSchema);
                Schema rightNullSchema = this.buildNullSchema(rightSchema);
                Row leftNullRow = Row.nullRow((Schema)leftNullSchema);
                Row rightNullRow = Row.nullRow((Schema)rightNullSchema);
                extractedLeftRows = BeamJoinRel.setValueCoder(extractedLeftRows, SchemaCoder.of((Schema)leftNullSchema));
                extractedRightRows = BeamJoinRel.setValueCoder(extractedRightRows, SchemaCoder.of((Schema)rightNullSchema));
                joinedRows = Join.fullOuterJoin(extractedLeftRows, extractedRightRows, (Object)leftNullRow, (Object)rightNullRow);
                break;
            }
            default: {
                joinedRows = Join.innerJoin(extractedLeftRows, extractedRightRows);
            }
        }
        Schema schema = CalciteUtils.toSchema(this.getRowType());
        PCollection ret = ((PCollection)joinedRows.apply("JoinParts2WholeRow", (PTransform)MapElements.via((SimpleFunction)new BeamJoinTransforms.JoinParts2WholeRow(schema)))).setRowSchema(schema);
        return ret;
    }

    public PCollection<Row> sideInputJoin(PCollection<KV<Row, Row>> extractedLeftRows, PCollection<KV<Row, Row>> extractedRightRows, Schema leftSchema, Schema rightSchema) {
        Row realRightNullRow;
        PCollection<KV<Row, Row>> realRightRows;
        boolean swapped = extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED;
        JoinRelType realJoinType = swapped && this.joinType != JoinRelType.INNER ? JoinRelType.LEFT : this.joinType;
        PCollection<KV<Row, Row>> realLeftRows = swapped ? extractedRightRows : extractedLeftRows;
        PCollection<KV<Row, Row>> pCollection = realRightRows = swapped ? extractedLeftRows : extractedRightRows;
        if (swapped) {
            Schema leftNullSchema = this.buildNullSchema(leftSchema);
            realRightRows = BeamJoinRel.setValueCoder(realRightRows, SchemaCoder.of((Schema)leftNullSchema));
            realRightNullRow = Row.nullRow((Schema)leftNullSchema);
        } else {
            Schema rightNullSchema = this.buildNullSchema(rightSchema);
            realRightRows = BeamJoinRel.setValueCoder(realRightRows, SchemaCoder.of((Schema)rightNullSchema));
            realRightNullRow = Row.nullRow((Schema)rightNullSchema);
        }
        return this.sideInputJoinHelper(realJoinType, realLeftRows, realRightRows, realRightNullRow, swapped);
    }

    private PCollection<Row> sideInputJoinHelper(JoinRelType joinType, PCollection<KV<Row, Row>> leftRows, PCollection<KV<Row, Row>> rightRows, Row rightNullRow, boolean swapped) {
        PCollectionView rowsView = (PCollectionView)rightRows.apply((PTransform)View.asMultimap());
        Schema schema = CalciteUtils.toSchema(this.getRowType());
        PCollection ret = ((PCollection)leftRows.apply((PTransform)ParDo.of((DoFn)new BeamJoinTransforms.SideInputJoinDoFn(joinType, rightNullRow, (PCollectionView<Map<Row, Iterable<Row>>>)rowsView, swapped, schema)).withSideInputs(new PCollectionView[]{rowsView}))).setRowSchema(schema);
        return ret;
    }

    private Schema buildNullSchema(Schema schema) {
        Schema.Builder builder = Schema.builder();
        builder.addFields(schema.getFields().stream().map(f -> f.withNullable(true)).collect(Collectors.toList()));
        return builder.build();
    }

    private static <K, V> PCollection<KV<K, V>> setValueCoder(PCollection<KV<K, V>> kvs, Coder<V> valueCoder) {
        KvCoder coder = (KvCoder)kvs.getCoder();
        return kvs.setCoder((Coder)KvCoder.of((Coder)coder.getKeyCoder(), valueCoder));
    }

    private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) {
        if (this.condition instanceof RexLiteral && ((Boolean)((RexLiteral)this.condition).getValue()).booleanValue()) {
            throw new UnsupportedOperationException("CROSS JOIN is not supported!");
        }
        RexCall call = (RexCall)this.condition;
        ArrayList<Pair<Integer, Integer>> pairs = new ArrayList<Pair<Integer, Integer>>();
        if ("AND".equals(call.getOperator().getName())) {
            List<RexNode> operands = call.getOperands();
            for (RexNode rexNode : operands) {
                Pair<Integer, Integer> pair = this.extractOneJoinColumn((RexCall)rexNode, leftRowColumnCount);
                pairs.add(pair);
            }
        } else if ("=".equals(call.getOperator().getName())) {
            pairs.add(this.extractOneJoinColumn(call, leftRowColumnCount));
        } else {
            throw new UnsupportedOperationException("Operator " + call.getOperator().getName() + " is not supported in join condition");
        }
        return pairs;
    }

    private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition, int leftRowColumnCount) {
        List<RexNode> operands = oneCondition.getOperands();
        int leftIndex = Math.min(((RexInputRef)operands.get(0)).getIndex(), ((RexInputRef)operands.get(1)).getIndex());
        int rightIndex1 = Math.max(((RexInputRef)operands.get(0)).getIndex(), ((RexInputRef)operands.get(1)).getIndex());
        int rightIndex = rightIndex1 - leftRowColumnCount;
        return new Pair<Integer, Integer>(leftIndex, rightIndex);
    }

    private PCollection<Row> joinAsLookup(BeamRelNode leftRelNode, BeamRelNode rightRelNode, PCollection<Row> factStream, Schema outputSchema) {
        BeamIOSourceRel srcRel = (BeamIOSourceRel)rightRelNode;
        BeamSqlSeekableTable seekableTable = (BeamSqlSeekableTable)((Object)srcRel.getBeamSqlTable());
        return (PCollection)factStream.apply("join_as_lookup", (PTransform)new BeamJoinTransforms.JoinAsLookup(this.condition, seekableTable, CalciteUtils.toSchema(rightRelNode.getRowType()), outputSchema, CalciteUtils.toSchema(leftRelNode.getRowType()).getFieldCount()));
    }

    private boolean seekable(BeamRelNode relNode) {
        BeamIOSourceRel srcRel;
        BeamSqlTable sourceTable;
        return relNode instanceof BeamIOSourceRel && (sourceTable = (srcRel = (BeamIOSourceRel)relNode).getBeamSqlTable()) instanceof BeamSqlSeekableTable;
    }

    private class Transform
    extends PTransform<PCollectionList<Row>, PCollection<Row>> {
        private Transform() {
        }

        public PCollection<Row> expand(PCollectionList<Row> pinput) {
            BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(BeamJoinRel.this.left);
            BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(BeamJoinRel.this.right);
            if (BeamJoinRel.this.isSideInputJoin()) {
                Preconditions.checkArgument(pinput.size() == 1, "More than one input received for side input join");
                Schema schema = CalciteUtils.toSchema(BeamJoinRel.this.getRowType());
                return BeamJoinRel.this.joinAsLookup(leftRelNode, rightRelNode, (PCollection<Row>)pinput.get(0), schema).setRowSchema(schema);
            }
            Schema leftSchema = CalciteUtils.toSchema(BeamJoinRel.this.left.getRowType());
            Schema rightSchema = CalciteUtils.toSchema(BeamJoinRel.this.right.getRowType());
            assert (pinput.size() == 2);
            PCollection leftRows = pinput.get(0);
            PCollection rightRows = pinput.get(1);
            BeamJoinRel.this.verifySupportedTrigger((PCollection<Row>)leftRows);
            BeamJoinRel.this.verifySupportedTrigger((PCollection<Row>)rightRows);
            WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
            WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
            List pairs = BeamJoinRel.this.extractJoinColumns(leftRelNode.getRowType().getFieldCount());
            Schema extractKeySchemaLeft = (Schema)pairs.stream().map(pair -> leftSchema.getField(((Integer)pair.getKey()).intValue())).collect(Schema.toSchema());
            Schema extractKeySchemaRight = (Schema)pairs.stream().map(pair -> rightSchema.getField(((Integer)pair.getValue()).intValue())).collect(Schema.toSchema());
            SchemaCoder extractKeyRowCoder = SchemaCoder.of((Schema)extractKeySchemaLeft);
            PCollection extractedLeftRows = ((PCollection)leftRows.apply("left_ExtractJoinFields", (PTransform)MapElements.via((SimpleFunction)new BeamJoinTransforms.ExtractJoinFields(true, pairs, extractKeySchemaLeft)))).setCoder((Coder)KvCoder.of((Coder)extractKeyRowCoder, (Coder)leftRows.getCoder()));
            PCollection extractedRightRows = ((PCollection)rightRows.apply("right_ExtractJoinFields", (PTransform)MapElements.via((SimpleFunction)new BeamJoinTransforms.ExtractJoinFields(false, pairs, extractKeySchemaRight)))).setCoder((Coder)KvCoder.of((Coder)extractKeyRowCoder, (Coder)rightRows.getCoder()));
            if (leftRows.isBounded() == PCollection.IsBounded.BOUNDED && rightRows.isBounded() == PCollection.IsBounded.BOUNDED || leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED) {
                try {
                    leftWinFn.verifyCompatibility(rightWinFn);
                }
                catch (IncompatibleWindowException e) {
                    throw new IllegalArgumentException("WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e);
                }
                return BeamJoinRel.this.standardJoin((PCollection<KV<Row, Row>>)extractedLeftRows, (PCollection<KV<Row, Row>>)extractedRightRows, leftSchema, rightSchema);
            }
            if (leftRows.isBounded() == PCollection.IsBounded.BOUNDED && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED || leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) {
                if (BeamJoinRel.this.joinType == JoinRelType.FULL) {
                    throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join a bounded table with an unbounded table.");
                }
                if (BeamJoinRel.this.joinType == JoinRelType.LEFT && leftRows.isBounded() == PCollection.IsBounded.BOUNDED || BeamJoinRel.this.joinType == JoinRelType.RIGHT && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) {
                    throw new UnsupportedOperationException("LEFT side of an OUTER JOIN must be Unbounded table.");
                }
                return BeamJoinRel.this.sideInputJoin((PCollection<KV<Row, Row>>)extractedLeftRows, (PCollection<KV<Row, Row>>)extractedRightRows, leftSchema, rightSchema);
            }
            throw new UnsupportedOperationException("The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn);
        }
    }
}

