/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.adapter.enumerable.PhysType;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.adapter.enumerable.PhysTypeImpl;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.linq4j.Enumerable;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.linq4j.Linq4j;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.linq4j.tree.Expression;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.linq4j.tree.Expressions;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.ConventionTraitDef;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptCost;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.RelNode;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.convert.ConverterImpl;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSortRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Duration;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamEnumerableConverter
extends ConverterImpl
implements EnumerableRel {
    private static final Logger LOG = LoggerFactory.getLogger(BeamEnumerableConverter.class);

    public BeamEnumerableConverter(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
        super(cluster, ConventionTraitDef.INSTANCE, traits, input);
    }

    @Override
    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
        return new BeamEnumerableConverter(this.getCluster(), traitSet, BeamEnumerableConverter.sole(inputs));
    }

    @Override
    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
        return planner.getCostFactory().makeHugeCost();
    }

    @Override
    public EnumerableRel.Result implement(EnumerableRelImplementor implementor, EnumerableRel.Prefer prefer) {
        BlockBuilder list = new BlockBuilder();
        RelDataType rowType = this.getRowType();
        PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), rowType, prefer.preferArray());
        Expression node = implementor.stash((BeamRelNode)this.getInput(), BeamRelNode.class);
        list.add(Expressions.call(BeamEnumerableConverter.class, "toEnumerable", new Expression[]{node}));
        return implementor.result(physType, list.toBlock());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Enumerable<Object> toEnumerable(BeamRelNode node) {
        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
            PipelineOptions options = BeamEnumerableConverter.createPipelineOptions(node.getPipelineOptions());
            Enumerable<Object> enumerable = BeamEnumerableConverter.toEnumerable(options, node);
            return enumerable;
        }
        finally {
            Thread.currentThread().setContextClassLoader(originalClassLoader);
        }
    }

    public static PipelineOptions createPipelineOptions(Map<String, String> map) {
        String[] args = new String[map.size()];
        int i = 0;
        for (Map.Entry<String, String> entry : map.entrySet()) {
            args[i++] = "--" + entry.getKey() + "=" + entry.getValue();
        }
        PipelineOptions options = PipelineOptionsFactory.fromArgs((String[])args).withValidation().create();
        ((ApplicationNameOptions)options.as(ApplicationNameOptions.class)).setAppName("BeamSql");
        return options;
    }

    static Enumerable<Object> toEnumerable(PipelineOptions options, BeamRelNode node) {
        if (node instanceof BeamIOSinkRel) {
            return BeamEnumerableConverter.count(options, node);
        }
        if (BeamEnumerableConverter.isLimitQuery(node)) {
            return BeamEnumerableConverter.limitCollect(options, node);
        }
        return BeamEnumerableConverter.collect(options, node);
    }

    private static PipelineResult limitRun(PipelineOptions options, BeamRelNode node, DoFn<Row, Void> doFn, Queue<Object> values, int limitCount) {
        PipelineResult.State state;
        ((DirectOptions)options.as(DirectOptions.class)).setBlockOnRun(false);
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        PCollection<Row> resultCollection = BeamSqlRelUtils.toPCollection(pipeline, node);
        resultCollection.apply((PTransform)ParDo.of(doFn));
        PipelineResult result = pipeline.run();
        while ((state = result.waitUntilFinish(Duration.standardSeconds((long)1L))) == null || !state.isTerminal()) {
            try {
                if (values.size() < limitCount) continue;
                result.cancel();
            }
            catch (IOException e) {
                LOG.warn(e.toString());
            }
            break;
        }
        return result;
    }

    private static Enumerable<Object> collect(PipelineOptions options, BeamRelNode node) {
        long id = options.getOptionsId();
        ConcurrentLinkedQueue values = new ConcurrentLinkedQueue();
        Preconditions.checkArgument(options.getRunner().getCanonicalName().equals("org.apache.beam.runners.direct.DirectRunner"), "SELECT without INSERT is only supported in DirectRunner in SQL Shell.");
        Collector.globalValues.put(id, values);
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        PCollection<Row> resultCollection = BeamSqlRelUtils.toPCollection(pipeline, node);
        resultCollection.apply((PTransform)ParDo.of((DoFn)new Collector()));
        PipelineResult result = pipeline.run();
        result.waitUntilFinish();
        Collector.globalValues.remove(id);
        return Linq4j.asEnumerable(values);
    }

    private static Enumerable<Object> limitCollect(PipelineOptions options, BeamRelNode node) {
        long id = options.getOptionsId();
        ConcurrentLinkedQueue<Object> values = new ConcurrentLinkedQueue<Object>();
        Preconditions.checkArgument(options.getRunner().getCanonicalName().equals("org.apache.beam.runners.direct.DirectRunner"), "SELECT without INSERT is only supported in DirectRunner in SQL Shell.");
        int limitCount = BeamEnumerableConverter.getLimitCount(node);
        Collector.globalValues.put(id, values);
        BeamEnumerableConverter.limitRun(options, node, new Collector(), values, limitCount);
        Collector.globalValues.remove(id);
        while (values.size() > limitCount) {
            values.remove();
        }
        return Linq4j.asEnumerable(values);
    }

    private static Object[] rowToAvatica(Row row) {
        Schema schema = row.getSchema();
        Object[] convertedColumns = new Object[schema.getFields().size()];
        int i = 0;
        for (Schema.Field field : schema.getFields()) {
            convertedColumns[i] = BeamEnumerableConverter.fieldToAvatica(field.getType(), row.getValue(i));
            ++i;
        }
        return convertedColumns;
    }

    private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) {
        switch (type.getTypeName()) {
            case DATETIME: {
                return ((ReadableInstant)beamValue).getMillis();
            }
            case BYTE: 
            case INT16: 
            case INT32: 
            case INT64: 
            case DECIMAL: 
            case FLOAT: 
            case DOUBLE: 
            case STRING: 
            case BOOLEAN: {
                return beamValue;
            }
            case ARRAY: {
                return ((List)beamValue).stream().map(elem -> BeamEnumerableConverter.fieldToAvatica(type.getCollectionElementType(), elem)).collect(Collectors.toList());
            }
            case MAP: {
                return ((Map)beamValue).entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), entry -> BeamEnumerableConverter.fieldToAvatica(type.getCollectionElementType(), entry.getValue())));
            }
            case ROW: {
                return beamValue;
            }
        }
        throw new IllegalStateException(String.format("Unreachable case for Beam typename %s", type.getTypeName()));
    }

    private static Enumerable<Object> count(PipelineOptions options, BeamRelNode node) {
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        BeamSqlRelUtils.toPCollection(pipeline, node).apply((PTransform)ParDo.of((DoFn)new RowCounter()));
        PipelineResult result = pipeline.run();
        long count = 0L;
        if (!BeamEnumerableConverter.containsUnboundedPCollection(pipeline)) {
            result.waitUntilFinish();
            MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named(BeamEnumerableConverter.class, (String)"rows")).build());
            count = (Long)((MetricResult)metrics.getCounters().iterator().next()).getAttempted();
        }
        return Linq4j.singletonEnumerable(count);
    }

    private static boolean isLimitQuery(BeamRelNode node) {
        return node instanceof BeamSortRel && ((BeamSortRel)node).isLimitOnly() || node instanceof BeamCalcRel && ((BeamCalcRel)node).isInputSortRelAndLimitOnly();
    }

    private static int getLimitCount(BeamRelNode node) {
        if (node instanceof BeamSortRel) {
            return ((BeamSortRel)node).getCount();
        }
        if (node instanceof BeamCalcRel) {
            return ((BeamCalcRel)node).getLimitCountOfSortRel();
        }
        throw new RuntimeException("Cannot get limit count from RelNode tree with root " + node.getRelTypeName());
    }

    private static boolean containsUnboundedPCollection(Pipeline p) {
        class BoundednessVisitor
        extends Pipeline.PipelineVisitor.Defaults {
            PCollection.IsBounded boundedness = PCollection.IsBounded.BOUNDED;

            BoundednessVisitor() {
            }

            public void visitValue(PValue value, TransformHierarchy.Node producer) {
                if (value instanceof PCollection) {
                    this.boundedness = this.boundedness.and(((PCollection)value).isBounded());
                }
            }
        }
        BoundednessVisitor visitor = new BoundednessVisitor();
        p.traverseTopologically((Pipeline.PipelineVisitor)visitor);
        return visitor.boundedness == PCollection.IsBounded.UNBOUNDED;
    }

    private static class RowCounter
    extends DoFn<Row, Void> {
        final Counter rows = Metrics.counter(BeamEnumerableConverter.class, (String)"rows");

        private RowCounter() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            this.rows.inc();
        }
    }

    private static class Collector
    extends DoFn<Row, Void> {
        private static final Map<Long, Queue<Object>> globalValues = new ConcurrentHashMap<Long, Queue<Object>>();
        @Nullable
        private volatile Queue<Object> values;

        private Collector() {
        }

        @DoFn.StartBundle
        public void startBundle(DoFn.StartBundleContext context) {
            long id = context.getPipelineOptions().getOptionsId();
            this.values = globalValues.get(id);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            Object[] avaticaRow = BeamEnumerableConverter.rowToAvatica((Row)context.element());
            if (avaticaRow.length == 1) {
                this.values.add(avaticaRow[0]);
            } else {
                this.values.add(avaticaRow);
            }
        }
    }
}

