/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.util.List;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.com.google.common.collect.ImmutableList;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.RelNode;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.Correlate;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.CorrelationId;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql.SemiJoinType;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.util.ImmutableBitSet;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironments;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamUncollectRel;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;

public class BeamUnnestRel
extends Correlate
implements BeamRelNode {
    public BeamUnnestRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType joinType) {
        super(cluster, traits, left, right, correlationId, requiredColumns, joinType);
    }

    @Override
    public Correlate copy(RelTraitSet relTraitSet, RelNode left, RelNode right, CorrelationId correlationId, ImmutableBitSet requireColumns, SemiJoinType joinType) {
        return new BeamUnnestRel(this.getCluster(), relTraitSet, left, right, correlationId, this.requiredColumns, joinType);
    }

    @Override
    public List<RelNode> getPCollectionInputs() {
        return ImmutableList.of(BeamSqlRelUtils.getBeamRelInput(this.left));
    }

    @Override
    public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
        return new Transform();
    }

    private static class UnnestFn
    extends DoFn<Row, Row> {
        private final BeamSqlExpressionExecutor expr;
        private final int correlationId;
        private final Schema outputSchema;
        private final Schema.Field innerField;

        private UnnestFn(int correlationId, BeamSqlExpressionExecutor expr, Schema outputSchema, Schema.Field innerField) {
            this.correlationId = correlationId;
            this.expr = expr;
            this.outputSchema = outputSchema;
            this.innerField = innerField;
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element Row row, BoundedWindow window, DoFn.OutputReceiver<Row> out) {
            Preconditions.checkState(this.correlationId == 0, "Only one level of correlation nesting is supported");
            BeamSqlExpressionEnvironment env = BeamSqlExpressionEnvironments.forRowAndCorrelVariables(row, window, ImmutableList.of(row));
            List<Object> rawValues = this.expr.execute(row, window, env);
            if (rawValues == null) {
                return;
            }
            Preconditions.checkState(rawValues.size() == 1, "%s expression to unnest %s resulted in more than one column", ((Object)((Object)this)).getClass(), (Object)this.expr);
            Preconditions.checkState(rawValues.get(0) instanceof Iterable, "%s expression to unnest %s not iterable", ((Object)((Object)this)).getClass(), (Object)this.expr);
            for (Object uncollectedValue : (Iterable)rawValues.get(0)) {
                Object coercedValue = BeamTableUtils.autoCastField(this.innerField, uncollectedValue);
                out.output((Object)Row.withSchema((Schema)this.outputSchema).addValues(row.getValues()).addValue(coercedValue).build());
            }
        }

        @DoFn.Teardown
        public void close() {
            this.expr.close();
        }
    }

    private class Transform
    extends PTransform<PCollectionList<Row>, PCollection<Row>> {
        private Transform() {
        }

        public PCollection<Row> expand(PCollectionList<Row> pinput) {
            PCollection outer = pinput.get(0);
            BeamUncollectRel uncollect = (BeamUncollectRel)BeamSqlRelUtils.getBeamRelInput(BeamUnnestRel.this.right);
            Schema innerSchema = CalciteUtils.toBeamSchema(uncollect.getRowType());
            Preconditions.checkArgument(innerSchema.getFieldCount() == 1, "Can only UNNEST a single column", ((Object)((Object)this)).getClass());
            BeamSqlFnExecutor expr = new BeamSqlFnExecutor(((BeamCalcRel)BeamSqlRelUtils.getBeamRelInput(uncollect.getInput())).getProgram());
            Schema joinedSchema = CalciteUtils.toBeamSchema(BeamUnnestRel.this.rowType);
            return ((PCollection)outer.apply((PTransform)ParDo.of((DoFn)new UnnestFn(BeamUnnestRel.this.correlationId.getId(), expr, joinedSchema, innerSchema.getField(0))))).setCoder((Coder)joinedSchema.getRowCoder());
        }
    }
}

