/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.accumulo.index;

import com.facebook.presto.accumulo.AccumuloClient;
import com.facebook.presto.accumulo.AccumuloErrorCode;
import com.facebook.presto.accumulo.conf.AccumuloSessionProperties;
import com.facebook.presto.accumulo.index.ColumnCardinalityCache;
import com.facebook.presto.accumulo.index.Indexer;
import com.facebook.presto.accumulo.model.AccumuloColumnConstraint;
import com.facebook.presto.accumulo.model.TabletSplitMetadata;
import com.facebook.presto.accumulo.serializers.AccumuloRowSerializer;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;

public class IndexLookup {
    private static final Logger LOG = Logger.get(IndexLookup.class);
    private static final Range METRICS_TABLE_ROWID_RANGE = new Range(Indexer.METRICS_TABLE_ROWID_AS_TEXT);
    private final ColumnCardinalityCache cardinalityCache;
    private final Connector connector;
    private final ExecutorService coreExecutor;
    private final BoundedExecutor executorService;

    @Inject
    public IndexLookup(Connector connector, ColumnCardinalityCache cardinalityCache) {
        this.connector = Objects.requireNonNull(connector, "connector is null");
        this.cardinalityCache = Objects.requireNonNull(cardinalityCache, "cardinalityCache is null");
        this.coreExecutor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"cardinality-lookup-%s"));
        this.executorService = new BoundedExecutor((Executor)this.coreExecutor, 4 * Runtime.getRuntime().availableProcessors());
    }

    @PreDestroy
    public void shutdown() {
        this.coreExecutor.shutdownNow();
    }

    public boolean applyIndex(String schema, String table, ConnectorSession session, Collection<AccumuloColumnConstraint> constraints, Collection<Range> rowIdRanges, List<TabletSplitMetadata> tabletSplits, AccumuloRowSerializer serializer, Authorizations auths) throws Exception {
        if (!AccumuloSessionProperties.isOptimizeIndexEnabled(session)) {
            LOG.debug("Secondary index is disabled");
            return false;
        }
        LOG.debug("Secondary index is enabled");
        Multimap<AccumuloColumnConstraint, Range> constraintRanges = IndexLookup.getIndexedConstraintRanges(constraints, serializer);
        if (constraintRanges.isEmpty()) {
            LOG.debug("Query contains no constraints on indexed columns, skipping secondary index");
            return false;
        }
        if (!AccumuloSessionProperties.isIndexMetricsEnabled(session)) {
            LOG.debug("Use of index metrics is disabled");
            List<Range> indexRanges = this.getIndexRanges(Indexer.getIndexTableName(schema, table), constraintRanges, rowIdRanges, auths);
            if (!indexRanges.isEmpty()) {
                IndexLookup.binRanges(AccumuloSessionProperties.getNumIndexRowsPerSplit(session), indexRanges, tabletSplits);
                LOG.debug("Number of splits for %s.%s is %d with %d ranges", new Object[]{schema, table, tabletSplits.size(), indexRanges.size()});
            } else {
                LOG.debug("Query would return no results, returning empty list of splits");
            }
            return true;
        }
        LOG.debug("Use of index metrics is enabled");
        return this.getRangesWithMetrics(session, schema, table, constraintRanges, rowIdRanges, tabletSplits, auths);
    }

    private static Multimap<AccumuloColumnConstraint, Range> getIndexedConstraintRanges(Collection<AccumuloColumnConstraint> constraints, AccumuloRowSerializer serializer) {
        ImmutableListMultimap.Builder builder = ImmutableListMultimap.builder();
        for (AccumuloColumnConstraint columnConstraint : constraints) {
            if (columnConstraint.isIndexed()) {
                for (Range range : AccumuloClient.getRangesFromDomain(columnConstraint.getDomain(), serializer)) {
                    builder.put((Object)columnConstraint, (Object)range);
                }
                continue;
            }
            LOG.warn("Query containts constraint on non-indexed column %s. Is it worth indexing?", new Object[]{columnConstraint.getName()});
        }
        return builder.build();
    }

    private boolean getRangesWithMetrics(ConnectorSession session, String schema, String table, Multimap<AccumuloColumnConstraint, Range> constraintRanges, Collection<Range> rowIdRanges, List<TabletSplitMetadata> tabletSplits, Authorizations auths) throws Exception {
        List<Range> indexRanges;
        double ratio;
        long numEntries;
        String metricsTable = Indexer.getMetricsTableName(schema, table);
        long numRows = this.getNumRowsInTable(metricsTable, auths);
        Multimap<Long, AccumuloColumnConstraint> cardinalities = AccumuloSessionProperties.isIndexShortCircuitEnabled(session) ? this.cardinalityCache.getCardinalities(schema, table, auths, constraintRanges, (long)((double)numRows * AccumuloSessionProperties.getIndexSmallCardThreshold(session)), AccumuloSessionProperties.getIndexCardinalityCachePollingDuration(session)) : this.cardinalityCache.getCardinalities(schema, table, auths, constraintRanges, 0L, new Duration(0.0, TimeUnit.MILLISECONDS));
        Optional entry = cardinalities.entries().stream().findFirst();
        if (!entry.isPresent()) {
            return false;
        }
        Map.Entry lowestCardinality = (Map.Entry)entry.get();
        String indexTable = Indexer.getIndexTableName(schema, table);
        double threshold = AccumuloSessionProperties.getIndexThreshold(session);
        if (IndexLookup.smallestCardAboveThreshold(session, numRows, (Long)lowestCardinality.getKey())) {
            if (cardinalities.size() == 1) {
                numEntries = (Long)lowestCardinality.getKey();
                ratio = (double)numEntries / (double)numRows;
                LOG.debug("Use of index would scan %s of %s rows, ratio %s. Threshold %2f, Using for index table? %s", new Object[]{numEntries, numRows, ratio, threshold, ratio < threshold});
                if (ratio >= threshold) {
                    return false;
                }
            }
            LOG.debug("%d indexed columns, intersecting ranges", new Object[]{constraintRanges.size()});
            indexRanges = this.getIndexRanges(indexTable, constraintRanges, rowIdRanges, auths);
            LOG.debug("Intersection results in %d ranges from secondary index", new Object[]{indexRanges.size()});
        } else {
            LOG.debug("Not intersecting columns, using column with lowest cardinality ");
            ImmutableMultimap.Builder lcBldr = ImmutableMultimap.builder();
            lcBldr.putAll(lowestCardinality.getValue(), (Iterable)constraintRanges.get(lowestCardinality.getValue()));
            indexRanges = this.getIndexRanges(indexTable, (Multimap<AccumuloColumnConstraint, Range>)lcBldr.build(), rowIdRanges, auths);
        }
        if (indexRanges.isEmpty()) {
            LOG.debug("Query would return no results, returning empty list of splits");
            return true;
        }
        numEntries = indexRanges.size();
        ratio = (double)numEntries / (double)numRows;
        LOG.debug("Use of index would scan %d of %d rows, ratio %s. Threshold %2f, Using for table? %b", new Object[]{numEntries, numRows, ratio, threshold, ratio < threshold, table});
        if (ratio < threshold) {
            IndexLookup.binRanges(AccumuloSessionProperties.getNumIndexRowsPerSplit(session), indexRanges, tabletSplits);
            LOG.debug("Number of splits for %s.%s is %d with %d ranges", new Object[]{schema, table, tabletSplits.size(), indexRanges.size()});
            return true;
        }
        return false;
    }

    private static boolean smallestCardAboveThreshold(ConnectorSession session, long numRows, long smallestCardinality) {
        double ratio = (double)smallestCardinality / (double)numRows;
        double threshold = AccumuloSessionProperties.getIndexSmallCardThreshold(session);
        LOG.debug("Smallest cardinality is %d, num rows is %d, ratio is %2f with threshold of %f", new Object[]{smallestCardinality, numRows, ratio, threshold});
        return ratio > threshold;
    }

    private long getNumRowsInTable(String metricsTable, Authorizations auths) throws TableNotFoundException {
        Scanner scanner = this.connector.createScanner(metricsTable, auths);
        scanner.setRange(METRICS_TABLE_ROWID_RANGE);
        scanner.fetchColumn(Indexer.METRICS_TABLE_ROWS_CF_AS_TEXT, Indexer.CARDINALITY_CQ_AS_TEXT);
        long numRows = -1L;
        for (Map.Entry entry : scanner) {
            if (numRows > 0L) {
                throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.FUNCTION_IMPLEMENTATION_ERROR, "Should have received only one entry when scanning for number of rows in metrics table");
            }
            numRows = Long.parseLong(((Value)entry.getValue()).toString());
        }
        scanner.close();
        LOG.debug("Number of rows in table is %d", new Object[]{numRows});
        return numRows;
    }

    private List<Range> getIndexRanges(String indexTable, Multimap<AccumuloColumnConstraint, Range> constraintRanges, Collection<Range> rowIDRanges, Authorizations auths) {
        HashSet finalRanges = new HashSet();
        ArrayList<Future<Set>> tasks = new ArrayList<Future<Set>>();
        ExecutorCompletionService<Set> executor = new ExecutorCompletionService<Set>((Executor)this.executorService);
        for (Map.Entry constraintEntry : constraintRanges.asMap().entrySet()) {
            tasks.add(executor.submit(() -> {
                BatchScanner scan = this.connector.createBatchScanner(indexTable, auths, 10);
                scan.setRanges((Collection)constraintEntry.getValue());
                scan.fetchColumnFamily(new Text(Indexer.getIndexColumnFamily(((AccumuloColumnConstraint)constraintEntry.getKey()).getFamily().getBytes(), ((AccumuloColumnConstraint)constraintEntry.getKey()).getQualifier().getBytes()).array()));
                Text tmpQualifier = new Text();
                HashSet<Range> columnRanges = new HashSet<Range>();
                for (Map.Entry entry : scan) {
                    ((Key)entry.getKey()).getColumnQualifier(tmpQualifier);
                    if (!IndexLookup.inRange(tmpQualifier, rowIDRanges)) continue;
                    columnRanges.add(new Range(tmpQualifier));
                }
                LOG.debug("Retrieved %d ranges for index column %s", new Object[]{columnRanges.size(), ((AccumuloColumnConstraint)constraintEntry.getKey()).getName()});
                scan.close();
                return columnRanges;
            }));
        }
        tasks.forEach(future -> {
            try {
                if (finalRanges.isEmpty()) {
                    finalRanges.addAll((Collection)future.get());
                } else {
                    finalRanges.retainAll((Collection)future.get());
                }
            }
            catch (InterruptedException | ExecutionException e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                throw new PrestoException((ErrorCodeSupplier)AccumuloErrorCode.UNEXPECTED_ACCUMULO_ERROR, "Exception when getting index ranges", e.getCause());
            }
        });
        return ImmutableList.copyOf(finalRanges);
    }

    private static void binRanges(int numRangesPerBin, List<Range> splitRanges, List<TabletSplitMetadata> prestoSplits) {
        Preconditions.checkArgument((numRangesPerBin > 0 ? 1 : 0) != 0, (Object)"number of ranges per bin must positivebe greater than zero");
        int toAdd = splitRanges.size();
        int fromIndex = 0;
        int toIndex = Math.min(toAdd, numRangesPerBin);
        do {
            prestoSplits.add(new TabletSplitMetadata(Optional.empty(), splitRanges.subList(fromIndex, toIndex)));
            fromIndex = toIndex;
            toIndex += Math.min(toAdd -= toIndex - fromIndex, numRangesPerBin);
        } while (toAdd > 0);
    }

    private static boolean inRange(Text text, Collection<Range> ranges) {
        Key kCq = new Key(text);
        return ranges.stream().anyMatch(r -> !r.beforeStartKey(kCq) && !r.afterEndKey(kCq));
    }
}

