/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.Group;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelWriter;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Aggregate;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.ImmutableBitSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

public class BeamAggregationRel
extends Aggregate
implements BeamRelNode {
    private @Nullable WindowFn<Row, IntervalWindow> windowFn;
    private final int windowFieldIndex;

    public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls, @Nullable WindowFn<Row, IntervalWindow> windowFn, int windowFieldIndex) {
        super(cluster, traits, child, groupSet, groupSets, aggCalls);
        assert (this.getGroupType() == Aggregate.Group.SIMPLE);
        this.windowFn = windowFn;
        this.windowFieldIndex = windowFieldIndex;
    }

    @Override
    public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
        NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
        inputStat = this.computeWindowingCostEffect(inputStat);
        float multiplier = 1.0f + (float)this.aggCalls.size() * 0.125f;
        for (AggregateCall aggCall : this.aggCalls) {
            if (!aggCall.getAggregation().getName().equals("SUM")) continue;
            multiplier += 0.0125f;
        }
        return BeamCostModel.FACTORY.makeCost(inputStat.getRowCount() * (double)multiplier, inputStat.getRate() * (double)multiplier);
    }

    @Override
    public NodeStats estimateNodeStats(RelMetadataQuery mq) {
        NodeStats inputEstimate = BeamSqlRelUtils.getNodeStats(this.input, mq);
        inputEstimate = this.computeWindowingCostEffect(inputEstimate);
        int groupCount = this.groupSet.cardinality() - (this.windowFn == null ? 0 : 1);
        return groupCount == 0 ? NodeStats.create(Math.min(inputEstimate.getRowCount(), 1.0), inputEstimate.getRate() / inputEstimate.getWindow(), 1.0) : inputEstimate.multiply(1.0 - Math.pow(0.5, groupCount));
    }

    private NodeStats computeWindowingCostEffect(NodeStats inputStat) {
        if (this.windowFn == null) {
            return inputStat;
        }
        WindowFn<Row, IntervalWindow> w = this.windowFn;
        double multiplicationFactor = 1.0;
        if (w instanceof SlidingWindows) {
            multiplicationFactor = (double)((SlidingWindows)w).getSize().getStandardSeconds() / (double)((SlidingWindows)w).getPeriod().getStandardSeconds();
        }
        return NodeStats.create(inputStat.getRowCount() * multiplicationFactor, inputStat.getRate() * multiplicationFactor, 10.0);
    }

    public RelWriter explainTerms(RelWriter pw) {
        super.explainTerms(pw);
        if (this.windowFn != null) {
            WindowFn<Row, IntervalWindow> windowFn = this.windowFn;
            String window = windowFn.getClass().getSimpleName() + "($" + String.valueOf(this.windowFieldIndex);
            if (windowFn instanceof FixedWindows) {
                FixedWindows fn = (FixedWindows)windowFn;
                window = window + ", " + fn.getSize().toString() + ", " + fn.getOffset().toString();
            } else if (windowFn instanceof SlidingWindows) {
                SlidingWindows fn = (SlidingWindows)windowFn;
                window = window + ", " + fn.getPeriod().toString() + ", " + fn.getSize().toString() + ", " + fn.getOffset().toString();
            } else if (windowFn instanceof Sessions) {
                Sessions fn = (Sessions)windowFn;
                window = window + ", " + fn.getGapDuration().toString();
            } else {
                throw new UnsupportedOperationException("Unknown window function " + windowFn.getClass().getSimpleName());
            }
            window = window + ")";
            pw.item("window", (Object)window);
        }
        return pw;
    }

    @Override
    public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
        Schema outputSchema = CalciteUtils.toSchema(this.getRowType());
        List aggregationAdapters = this.getNamedAggCalls().stream().map(aggCall -> new FieldAggregation((AggregateCall)aggCall.getKey(), (String)aggCall.getValue())).collect(Collectors.toList());
        return new Transform(this.windowFn, this.windowFieldIndex, this.getGroupSet(), aggregationAdapters, outputSchema);
    }

    public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
        return new BeamAggregationRel(this.getCluster(), traitSet, input, groupSet, groupSets, aggCalls, this.windowFn, this.windowFieldIndex);
    }

    private static class Transform
    extends PTransform<PCollectionList<Row>, PCollection<Row>> {
        private final List<Integer> keyFieldsIds;
        private Schema outputSchema;
        private WindowFn<Row, IntervalWindow> windowFn;
        private int windowFieldIndex;
        private List<FieldAggregation> fieldAggregations;

        private Transform(WindowFn<Row, IntervalWindow> windowFn, int windowFieldIndex, ImmutableBitSet groupSet, List<FieldAggregation> fieldAggregations, Schema outputSchema) {
            this.windowFn = windowFn;
            this.windowFieldIndex = windowFieldIndex;
            this.fieldAggregations = fieldAggregations;
            this.outputSchema = outputSchema;
            this.keyFieldsIds = groupSet.asList().stream().filter(i -> i != windowFieldIndex).collect(Collectors.toList());
        }

        public PCollection<Row> expand(PCollectionList<Row> pinput) {
            PCollection<Row> upstream;
            Preconditions.checkArgument((pinput.size() == 1 ? 1 : 0) != 0, (String)"Wrong number of inputs for %s: %s", (Object[])new Object[]{BeamAggregationRel.class.getSimpleName(), pinput});
            PCollection<Row> windowedStream = upstream = pinput.get(0);
            if (this.windowFn != null) {
                windowedStream = this.assignTimestampsAndWindow(upstream);
            }
            this.validateWindowIsSupported(windowedStream);
            Group.ByFields byFields = Group.byFieldIds(this.keyFieldsIds);
            Group.CombineFieldsByFields combined = null;
            for (FieldAggregation fieldAggregation : this.fieldAggregations) {
                List<Integer> inputs = fieldAggregation.inputs;
                Combine.CombineFn combineFn = fieldAggregation.combineFn;
                if (inputs.size() > 1 || inputs.isEmpty()) {
                    combined = combined == null ? byFields.aggregateFieldsById(inputs, combineFn, fieldAggregation.outputField) : combined.aggregateFieldsById(inputs, combineFn, fieldAggregation.outputField);
                    continue;
                }
                combined = combined == null ? byFields.aggregateField(inputs.get(0).intValue(), combineFn, fieldAggregation.outputField) : combined.aggregateField(inputs.get(0).intValue(), combineFn, fieldAggregation.outputField);
            }
            Group.CombineFieldsByFields combiner = combined;
            boolean ignoreValues = false;
            if (combiner == null) {
                combiner = byFields.aggregateField("*", AggregationCombineFnAdapter.createConstantCombineFn(), Schema.Field.of((String)"e", (Schema.FieldType)Schema.FieldType.row((Schema)AggregationCombineFnAdapter.EMPTY_SCHEMA).withNullable(true)));
                ignoreValues = true;
            }
            boolean verifyRowValues = ((BeamSqlPipelineOptions)pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class)).getVerifyRowValues();
            return ((PCollection)((PCollection)windowedStream.apply((PTransform)combiner)).apply("mergeRecord", (PTransform)ParDo.of(Transform.mergeRecord(this.outputSchema, this.windowFieldIndex, ignoreValues, verifyRowValues)))).setRowSchema(this.outputSchema);
        }

        private PCollection<Row> assignTimestampsAndWindow(PCollection<Row> upstream) {
            PCollection windowedStream = (PCollection)((PCollection)upstream.apply("assignEventTimestamp", (PTransform)WithTimestamps.of((SerializableFunction & Serializable)row -> row.getDateTime(this.windowFieldIndex).toInstant()).withAllowedTimestampSkew(new Duration(Long.MAX_VALUE)))).setCoder(upstream.getCoder()).apply((PTransform)Window.into(this.windowFn));
            return windowedStream;
        }

        private void validateWindowIsSupported(PCollection<Row> upstream) {
            WindowingStrategy windowingStrategy = upstream.getWindowingStrategy();
            if (windowingStrategy.getWindowFn() instanceof GlobalWindows && windowingStrategy.getTrigger() instanceof DefaultTrigger && upstream.isBounded() != PCollection.IsBounded.BOUNDED) {
                throw new UnsupportedOperationException("Please explicitly specify windowing in SQL query using HOP/TUMBLE/SESSION functions (default trigger will be used in this case). Unbounded input with global windowing and default trigger is not supported in Beam SQL aggregations. See GroupByKey section in Beam Programming Guide");
            }
        }

        static DoFn<Row, Row> mergeRecord(final Schema outputSchema, final int windowStartFieldIndex, final boolean ignoreValues, final boolean verifyRowValues) {
            return new DoFn<Row, Row>(){

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element Row kvRow, BoundedWindow window, DoFn.OutputReceiver<Row> o) {
                    int capacity = kvRow.getRow(0).getFieldCount() + (!ignoreValues ? kvRow.getRow(1).getFieldCount() : 0);
                    ArrayList fieldValues = Lists.newArrayListWithCapacity((int)capacity);
                    fieldValues.addAll(kvRow.getRow(0).getValues());
                    if (!ignoreValues) {
                        fieldValues.addAll(kvRow.getRow(1).getValues());
                    }
                    if (windowStartFieldIndex != -1) {
                        fieldValues.add(windowStartFieldIndex, ((IntervalWindow)window).start());
                    }
                    Row row = verifyRowValues ? Row.withSchema((Schema)outputSchema).addValues((List)fieldValues).build() : Row.withSchema((Schema)outputSchema).attachValues((List)fieldValues);
                    o.output((Object)row);
                }
            };
        }
    }

    private static class FieldAggregation
    implements Serializable {
        final List<Integer> inputs;
        final Combine.CombineFn combineFn;
        final Schema.Field outputField;

        FieldAggregation(AggregateCall call, String alias) {
            this.inputs = call.getArgList();
            this.outputField = CalciteUtils.toField(alias, call.getType());
            this.combineFn = AggregationCombineFnAdapter.createCombineFn(call, this.outputField, call.getAggregation().getName());
        }
    }
}

