/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.adapter.enumerable.PhysType;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.adapter.enumerable.PhysTypeImpl;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.linq4j.Enumerable;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.linq4j.Linq4j;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.linq4j.tree.Expression;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.linq4j.tree.Expressions;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.ConventionTraitDef;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptCost;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.RelNode;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.convert.ConverterImpl;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;

public class BeamEnumerableConverter
extends ConverterImpl
implements EnumerableRel {
    private final PipelineOptions options = PipelineOptionsFactory.create();

    public BeamEnumerableConverter(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
        super(cluster, ConventionTraitDef.INSTANCE, traits, input);
    }

    @Override
    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
        return new BeamEnumerableConverter(this.getCluster(), traitSet, BeamEnumerableConverter.sole(inputs));
    }

    @Override
    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
        return planner.getCostFactory().makeHugeCost();
    }

    @Override
    public EnumerableRel.Result implement(EnumerableRelImplementor implementor, EnumerableRel.Prefer prefer) {
        BlockBuilder list = new BlockBuilder();
        RelDataType rowType = this.getRowType();
        PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), rowType, prefer.preferArray());
        Expression options = implementor.stash(this.options, PipelineOptions.class);
        Expression node = implementor.stash((BeamRelNode)this.getInput(), BeamRelNode.class);
        list.add(Expressions.call(BeamEnumerableConverter.class, "toEnumerable", new Expression[]{options, node}));
        return implementor.result(physType, list.toBlock());
    }

    public static Enumerable<Object> toEnumerable(PipelineOptions options, BeamRelNode node) {
        if (node instanceof BeamIOSinkRel) {
            return BeamEnumerableConverter.count(options, node);
        }
        return BeamEnumerableConverter.collect(options, node);
    }

    private static PipelineResult run(PipelineOptions options, BeamRelNode node, DoFn<Row, Void> doFn) {
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        ((PCollection)PCollectionTuple.empty((Pipeline)pipeline).apply(node.toPTransform())).apply((PTransform)ParDo.of(doFn));
        PipelineResult result = pipeline.run();
        result.waitUntilFinish();
        return result;
    }

    private static Enumerable<Object> collect(PipelineOptions options, BeamRelNode node) {
        long id = options.getOptionsId();
        ConcurrentLinkedQueue values = new ConcurrentLinkedQueue();
        Preconditions.checkArgument(options.getRunner().getCanonicalName().equals("org.apache.beam.runners.direct.DirectRunner"));
        Collector.globalValues.put(id, values);
        BeamEnumerableConverter.run(options, node, new Collector());
        Collector.globalValues.remove(id);
        return Linq4j.asEnumerable(values);
    }

    private static Enumerable<Object> count(PipelineOptions options, BeamRelNode node) {
        PipelineResult result = BeamEnumerableConverter.run(options, node, new RowCounter());
        MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named(BeamEnumerableConverter.class, (String)"rows")).build());
        long count = (Long)((MetricResult)metrics.getCounters().iterator().next()).getAttempted();
        return Linq4j.singletonEnumerable(count);
    }

    private static class RowCounter
    extends DoFn<Row, Void> {
        final Counter rows = Metrics.counter(BeamEnumerableConverter.class, (String)"rows");

        private RowCounter() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            this.rows.inc();
        }
    }

    private static class Collector
    extends DoFn<Row, Void> {
        private static final Map<Long, Queue<Object>> globalValues = new ConcurrentHashMap<Long, Queue<Object>>();
        @Nullable
        private volatile Queue<Object> values;

        private Collector() {
        }

        @DoFn.StartBundle
        public void startBundle(DoFn.StartBundleContext context) {
            long id = context.getPipelineOptions().getOptionsId();
            this.values = globalValues.get(id);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            Object[] input = ((Row)context.element()).getValues().toArray();
            if (input.length == 1) {
                this.values.add(input[0]);
            } else {
                this.values.add(input);
            }
        }
    }
}

