/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.com.google.common.collect.Lists;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.RelNode;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.RelWriter;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.Aggregate;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.core.AggregateCall;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.util.ImmutableBitSet;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.Group;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BeamAggregationRel
extends Aggregate
implements BeamRelNode {
    @Nullable
    private WindowFn<Row, IntervalWindow> windowFn;
    private final int windowFieldIndex;

    public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls, @Nullable WindowFn<Row, IntervalWindow> windowFn, int windowFieldIndex) {
        super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
        this.windowFn = windowFn;
        this.windowFieldIndex = windowFieldIndex;
    }

    @Override
    public RelWriter explainTerms(RelWriter pw) {
        super.explainTerms(pw);
        if (this.windowFn != null) {
            WindowFn<Row, IntervalWindow> windowFn = this.windowFn;
            String window = windowFn.getClass().getSimpleName() + "($" + String.valueOf(this.windowFieldIndex);
            if (windowFn instanceof FixedWindows) {
                FixedWindows fn = (FixedWindows)windowFn;
                window = window + ", " + fn.getSize().toString() + ", " + fn.getOffset().toString();
            } else if (windowFn instanceof SlidingWindows) {
                SlidingWindows fn = (SlidingWindows)windowFn;
                window = window + ", " + fn.getPeriod().toString() + ", " + fn.getSize().toString() + ", " + fn.getOffset().toString();
            } else if (windowFn instanceof Sessions) {
                Sessions fn = (Sessions)windowFn;
                window = window + ", " + fn.getGapDuration().toString();
            } else {
                throw new RuntimeException("Unknown window function " + windowFn.getClass().getSimpleName());
            }
            window = window + ")";
            pw.item("window", window);
        }
        return pw;
    }

    @Override
    public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
        Schema outputSchema = CalciteUtils.toSchema(this.getRowType());
        List aggregationAdapters = this.getNamedAggCalls().stream().map(aggCall -> new FieldAggregation((AggregateCall)aggCall.getKey(), (String)aggCall.getValue())).collect(Collectors.toList());
        return new Transform(this.windowFn, this.windowFieldIndex, this.getGroupSet(), aggregationAdapters, outputSchema);
    }

    @Override
    public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
        return new BeamAggregationRel(this.getCluster(), traitSet, input, indicator, groupSet, groupSets, aggCalls, this.windowFn, this.windowFieldIndex);
    }

    private static class Transform
    extends PTransform<PCollectionList<Row>, PCollection<Row>> {
        private final List<Integer> keyFieldsIds;
        private Schema outputSchema;
        private WindowFn<Row, IntervalWindow> windowFn;
        private int windowFieldIndex;
        private List<FieldAggregation> fieldAggregations;

        private Transform(WindowFn<Row, IntervalWindow> windowFn, int windowFieldIndex, ImmutableBitSet groupSet, List<FieldAggregation> fieldAggregations, Schema outputSchema) {
            this.windowFn = windowFn;
            this.windowFieldIndex = windowFieldIndex;
            this.fieldAggregations = fieldAggregations;
            this.outputSchema = outputSchema;
            this.keyFieldsIds = groupSet.asList().stream().filter(i -> i != windowFieldIndex).collect(Collectors.toList());
        }

        public PCollection<Row> expand(PCollectionList<Row> pinput) {
            PCollection<Row> upstream;
            Preconditions.checkArgument(pinput.size() == 1, "Wrong number of inputs for %s: %s", (Object)BeamAggregationRel.class.getSimpleName(), pinput);
            PCollection<Row> windowedStream = upstream = pinput.get(0);
            if (this.windowFn != null) {
                windowedStream = this.assignTimestampsAndWindow(upstream);
            }
            this.validateWindowIsSupported(windowedStream);
            Group.ByFields byFields = Group.byFieldIds(this.keyFieldsIds);
            Group.CombineByFields combined = null;
            for (FieldAggregation fieldAggregation : this.fieldAggregations) {
                List<Integer> inputs = fieldAggregation.inputs;
                Combine.CombineFn combineFn = fieldAggregation.combineFn;
                if (inputs.size() > 1 || inputs.isEmpty()) {
                    combined = combined == null ? byFields.aggregateFieldsById(inputs, combineFn, fieldAggregation.outputField) : combined.aggregateFieldsById(inputs, combineFn, fieldAggregation.outputField);
                    continue;
                }
                combined = combined == null ? byFields.aggregateField(inputs.get(0).intValue(), combineFn, fieldAggregation.outputField) : combined.aggregateField(inputs.get(0).intValue(), combineFn, fieldAggregation.outputField);
            }
            Group.CombineByFields combiner = combined;
            if (combiner == null) {
                combiner = byFields.aggregate(AggregationCombineFnAdapter.createConstantCombineFn());
            }
            return ((PCollection)((PCollection)windowedStream.apply(combiner)).apply("mergeRecord", (PTransform)ParDo.of(Transform.mergeRecord(this.outputSchema, this.windowFieldIndex)))).setRowSchema(this.outputSchema);
        }

        private PCollection<Row> assignTimestampsAndWindow(PCollection<Row> upstream) {
            PCollection windowedStream = (PCollection)((PCollection)upstream.apply("assignEventTimestamp", (PTransform)WithTimestamps.of((SerializableFunction & Serializable)row -> row.getDateTime(this.windowFieldIndex).toInstant()).withAllowedTimestampSkew(new Duration(Long.MAX_VALUE)))).setCoder(upstream.getCoder()).apply((PTransform)Window.into(this.windowFn));
            return windowedStream;
        }

        private void validateWindowIsSupported(PCollection<Row> upstream) {
            WindowingStrategy windowingStrategy = upstream.getWindowingStrategy();
            if (windowingStrategy.getWindowFn() instanceof GlobalWindows && windowingStrategy.getTrigger() instanceof DefaultTrigger && upstream.isBounded() != PCollection.IsBounded.BOUNDED) {
                throw new UnsupportedOperationException("Please explicitly specify windowing in SQL query using HOP/TUMBLE/SESSION functions (default trigger will be used in this case). Unbounded input with global windowing and default trigger is not supported in Beam SQL aggregations. See GroupByKey section in Beam Programming Guide");
            }
        }

        static DoFn<KV<Row, Row>, Row> mergeRecord(final Schema outputSchema, final int windowStartFieldIndex) {
            return new DoFn<KV<Row, Row>, Row>(){

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<Row, Row> kvRow, BoundedWindow window, DoFn.OutputReceiver<Row> o) {
                    ArrayList<Instant> fieldValues = Lists.newArrayListWithCapacity(((Row)kvRow.getKey()).getValues().size() + ((Row)kvRow.getValue()).getValues().size());
                    fieldValues.addAll(((Row)kvRow.getKey()).getValues());
                    fieldValues.addAll(((Row)kvRow.getValue()).getValues());
                    if (windowStartFieldIndex != -1) {
                        fieldValues.add(windowStartFieldIndex, ((IntervalWindow)window).start());
                    }
                    o.output((Object)Row.withSchema((Schema)outputSchema).addValues(fieldValues).build());
                }
            };
        }
    }

    private static class FieldAggregation
    implements Serializable {
        final List<Integer> inputs;
        final Combine.CombineFn combineFn;
        final Schema.Field outputField;

        FieldAggregation(AggregateCall call, String alias) {
            this.inputs = call.getArgList();
            this.outputField = CalciteUtils.toField(alias, call.getType());
            this.combineFn = AggregationCombineFnAdapter.createCombineFn(call, this.outputField, call.getAggregation().getName());
        }
    }
}

