/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.sql.impl.rel.AbstractBeamCalcRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSortRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_26_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.adapter.enumerable.JavaRowFormat;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.adapter.enumerable.PhysType;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.adapter.enumerable.PhysTypeImpl;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.linq4j.Enumerable;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.linq4j.Linq4j;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.linq4j.tree.Expression;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.linq4j.tree.Expressions;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.ConventionTraitDef;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelOptCost;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelOptPlanner;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelTraitDef;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.convert.ConverterImpl;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.type.RelDataType;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamEnumerableConverter
extends ConverterImpl
implements EnumerableRel {
    private static final Logger LOG = LoggerFactory.getLogger(BeamEnumerableConverter.class);

    public BeamEnumerableConverter(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
        super(cluster, (RelTraitDef)ConventionTraitDef.INSTANCE, traits, input);
    }

    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
        return new BeamEnumerableConverter(this.getCluster(), traitSet, (RelNode)BeamEnumerableConverter.sole(inputs));
    }

    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
        return planner.getCostFactory().makeHugeCost();
    }

    public EnumerableRel.Result implement(EnumerableRelImplementor implementor, EnumerableRel.Prefer prefer) {
        BlockBuilder list = new BlockBuilder();
        RelDataType rowType = this.getRowType();
        PhysType physType = PhysTypeImpl.of((JavaTypeFactory)implementor.getTypeFactory(), (RelDataType)rowType, (JavaRowFormat)prefer.preferArray());
        Expression node = implementor.stash((Object)((BeamRelNode)this.getInput()), BeamRelNode.class);
        list.add((Expression)Expressions.call(BeamEnumerableConverter.class, (String)"toEnumerable", (Expression[])new Expression[]{node}));
        return implementor.result(physType, list.toBlock());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Enumerable<Object> toEnumerable(BeamRelNode node) {
        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
            PipelineOptions options = BeamEnumerableConverter.createPipelineOptions(node.getPipelineOptions());
            Enumerable<Object> enumerable = BeamEnumerableConverter.toEnumerable(options, node);
            return enumerable;
        }
        finally {
            Thread.currentThread().setContextClassLoader(originalClassLoader);
        }
    }

    public static List<Row> toRowList(BeamRelNode node) {
        return BeamEnumerableConverter.toRowList(node, Collections.emptyMap());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static List<Row> toRowList(BeamRelNode node, Map<String, String> otherOptions) {
        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
            HashMap<String, String> optionsMap = new HashMap<String, String>();
            optionsMap.putAll(node.getPipelineOptions());
            optionsMap.putAll(otherOptions);
            List<Row> list = BeamEnumerableConverter.toRowList(BeamEnumerableConverter.createPipelineOptions(optionsMap), node);
            return list;
        }
        finally {
            Thread.currentThread().setContextClassLoader(originalClassLoader);
        }
    }

    public static PipelineOptions createPipelineOptions(Map<String, String> map) {
        String[] args = new String[map.size()];
        int i = 0;
        for (Map.Entry<String, String> entry : map.entrySet()) {
            args[i++] = "--" + entry.getKey() + "=" + entry.getValue();
        }
        PipelineOptions options = PipelineOptionsFactory.fromArgs((String[])args).withValidation().create();
        FileSystems.setDefaultPipelineOptions((PipelineOptions)options);
        ((ApplicationNameOptions)options.as(ApplicationNameOptions.class)).setAppName("BeamSql");
        return options;
    }

    static List<Row> toRowList(PipelineOptions options, BeamRelNode node) {
        if (node instanceof BeamIOSinkRel) {
            throw new UnsupportedOperationException("Does not support BeamIOSinkRel in toRowList.");
        }
        if (BeamEnumerableConverter.isLimitQuery(node)) {
            throw new UnsupportedOperationException("Does not support queries with LIMIT in toRowList.");
        }
        return BeamEnumerableConverter.collectRows(options, node).stream().collect(Collectors.toList());
    }

    static Enumerable<Object> toEnumerable(PipelineOptions options, BeamRelNode node) {
        if (node instanceof BeamIOSinkRel) {
            return BeamEnumerableConverter.count(options, node);
        }
        if (BeamEnumerableConverter.isLimitQuery(node)) {
            return BeamEnumerableConverter.limitCollect(options, node);
        }
        return Linq4j.asEnumerable(BeamEnumerableConverter.rowToAvaticaAndUnboxValues(BeamEnumerableConverter.collectRows(options, node)));
    }

    /*
     * Exception decompiling
     */
    private static PipelineResult limitRun(PipelineOptions options, BeamRelNode node, DoFn<Row, Void> doFn, Queue<Row> values, int limitCount) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [2[DOLOOP]], but top level block is 0[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private static void runCollector(PipelineOptions options, BeamRelNode node) {
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        PCollection<Row> resultCollection = BeamSqlRelUtils.toPCollection(pipeline, node);
        resultCollection.apply((PTransform)ParDo.of((DoFn)new Collector()));
        PipelineResult result = pipeline.run();
        if (PipelineResult.State.FAILED.equals((Object)result.waitUntilFinish())) {
            throw new RuntimeException("Pipeline failed for unknown reason");
        }
    }

    private static Queue<Row> collectRows(PipelineOptions options, BeamRelNode node) {
        long id = options.getOptionsId();
        ConcurrentLinkedQueue<Row> values = new ConcurrentLinkedQueue<Row>();
        Preconditions.checkArgument((boolean)options.getRunner().getCanonicalName().equals("org.apache.beam.runners.direct.DirectRunner"), (Object)"collectRowList is only available in direct runner.");
        Collector.globalValues.put(id, values);
        BeamEnumerableConverter.runCollector(options, node);
        Collector.globalValues.remove(id);
        return values;
    }

    private static Enumerable<Object> limitCollect(PipelineOptions options, BeamRelNode node) {
        long id = options.getOptionsId();
        ConcurrentLinkedQueue<Row> values = new ConcurrentLinkedQueue<Row>();
        Preconditions.checkArgument((boolean)options.getRunner().getCanonicalName().equals("org.apache.beam.runners.direct.DirectRunner"), (Object)"SELECT without INSERT is only supported in DirectRunner in SQL Shell.");
        int limitCount = BeamEnumerableConverter.getLimitCount(node);
        Collector.globalValues.put(id, values);
        BeamEnumerableConverter.limitRun(options, node, new Collector(), values, limitCount);
        Collector.globalValues.remove(id);
        while (values.size() > limitCount) {
            values.remove();
        }
        return Linq4j.asEnumerable(BeamEnumerableConverter.rowToAvaticaAndUnboxValues(values));
    }

    private static List<Object> rowToAvaticaAndUnboxValues(Queue<Row> values) {
        return values.stream().map(row -> {
            Object[] objects = BeamEnumerableConverter.rowToAvatica(row);
            if (objects.length == 1) {
                return objects[0];
            }
            return objects;
        }).collect(Collectors.toList());
    }

    private static Object[] rowToAvatica(Row row) {
        Schema schema = row.getSchema();
        Object[] convertedColumns = new Object[schema.getFields().size()];
        int i = 0;
        for (Schema.Field field : schema.getFields()) {
            convertedColumns[i] = BeamEnumerableConverter.fieldToAvatica(field.getType(), row.getBaseValue(i, Object.class));
            ++i;
        }
        return convertedColumns;
    }

    private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) {
        if (beamValue == null) {
            return null;
        }
        switch (type.getTypeName()) {
            case LOGICAL_TYPE: {
                String logicalId = type.getLogicalType().getIdentifier();
                if (SqlTypes.TIME.getIdentifier().equals(logicalId)) {
                    if (beamValue instanceof Long) {
                        return (Long)beamValue;
                    }
                    return ((LocalTime)beamValue).toNanoOfDay();
                }
                if (SqlTypes.DATE.getIdentifier().equals(logicalId)) {
                    if (beamValue instanceof Long) {
                        return ((Long)beamValue).intValue();
                    }
                    return (int)((LocalDate)beamValue).toEpochDay();
                }
                if ("SqlCharType".equals(logicalId)) {
                    return beamValue;
                }
                throw new UnsupportedOperationException("Unknown DateTime type " + logicalId);
            }
            case DATETIME: {
                return ((ReadableInstant)beamValue).getMillis();
            }
            case BYTE: 
            case INT16: 
            case INT32: 
            case INT64: 
            case DECIMAL: 
            case FLOAT: 
            case DOUBLE: 
            case STRING: 
            case BOOLEAN: 
            case BYTES: {
                return beamValue;
            }
            case ARRAY: {
                return ((List)beamValue).stream().map(elem -> BeamEnumerableConverter.fieldToAvatica(type.getCollectionElementType(), elem)).collect(Collectors.toList());
            }
            case ITERABLE: {
                return StreamSupport.stream(((Iterable)beamValue).spliterator(), false).map(elem -> BeamEnumerableConverter.fieldToAvatica(type.getCollectionElementType(), elem)).collect(Collectors.toList());
            }
            case MAP: {
                return ((Map)beamValue).entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), entry -> BeamEnumerableConverter.fieldToAvatica(type.getCollectionElementType(), entry.getValue())));
            }
            case ROW: {
                return beamValue;
            }
        }
        throw new IllegalStateException(String.format("Unreachable case for Beam typename %s", type.getTypeName()));
    }

    private static Enumerable<Object> count(PipelineOptions options, BeamRelNode node) {
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        BeamSqlRelUtils.toPCollection(pipeline, node).apply((PTransform)ParDo.of((DoFn)new RowCounter()));
        PipelineResult result = pipeline.run();
        long count = 0L;
        if (!BeamEnumerableConverter.containsUnboundedPCollection(pipeline)) {
            if (PipelineResult.State.FAILED.equals((Object)result.waitUntilFinish())) {
                throw new RuntimeException("Pipeline failed for unknown reason");
            }
            MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named(BeamEnumerableConverter.class, (String)"rows")).build());
            Iterator iterator = metrics.getCounters().iterator();
            if (iterator.hasNext()) {
                count = (Long)((MetricResult)iterator.next()).getAttempted();
            }
        }
        return Linq4j.singletonEnumerable((Object)count);
    }

    private static boolean isLimitQuery(BeamRelNode node) {
        return node instanceof BeamSortRel && ((BeamSortRel)node).isLimitOnly() || node instanceof AbstractBeamCalcRel && ((AbstractBeamCalcRel)node).isInputSortRelAndLimitOnly();
    }

    private static int getLimitCount(BeamRelNode node) {
        if (node instanceof BeamSortRel) {
            return ((BeamSortRel)node).getCount();
        }
        if (node instanceof AbstractBeamCalcRel) {
            return ((AbstractBeamCalcRel)node).getLimitCountOfSortRel();
        }
        throw new IllegalArgumentException("Cannot get limit count from RelNode tree with root " + node.getRelTypeName());
    }

    private static boolean containsUnboundedPCollection(Pipeline p) {
        class BoundednessVisitor
        extends Pipeline.PipelineVisitor.Defaults {
            PCollection.IsBounded boundedness = PCollection.IsBounded.BOUNDED;

            BoundednessVisitor() {
            }

            public void visitValue(PValue value, TransformHierarchy.Node producer) {
                if (value instanceof PCollection) {
                    this.boundedness = this.boundedness.and(((PCollection)value).isBounded());
                }
            }
        }
        BoundednessVisitor visitor = new BoundednessVisitor();
        p.traverseTopologically((Pipeline.PipelineVisitor)visitor);
        return visitor.boundedness == PCollection.IsBounded.UNBOUNDED;
    }

    private static class RowCounter
    extends DoFn<Row, Void> {
        final Counter rows = Metrics.counter(BeamEnumerableConverter.class, (String)"rows");

        private RowCounter() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            this.rows.inc();
        }
    }

    private static class Collector
    extends DoFn<Row, Void> {
        private static final Map<Long, Queue<Row>> globalValues = new ConcurrentHashMap<Long, Queue<Row>>();
        private volatile @Nullable Queue<Row> values;

        private Collector() {
        }

        @DoFn.StartBundle
        public void startBundle(DoFn.StartBundleContext context) {
            long id = context.getPipelineOptions().getOptionsId();
            this.values = globalValues.get(id);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            this.values.add((Row)context.element());
        }
    }
}

