/*
 * Decompiled with CFR 0.152.
 */
package org.janusgraph.diskstorage.keycolumnvalue.scan;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractFuture;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.EntryList;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore;
import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery;
import org.janusgraph.diskstorage.keycolumnvalue.StoreFeatures;
import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction;
import org.janusgraph.diskstorage.keycolumnvalue.scan.MultiThreadsRowsCollector;
import org.janusgraph.diskstorage.keycolumnvalue.scan.RowsCollector;
import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanJob;
import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanJobFuture;
import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanMetrics;
import org.janusgraph.diskstorage.keycolumnvalue.scan.SingleThreadRowsCollector;
import org.janusgraph.diskstorage.keycolumnvalue.scan.StandardScanMetrics;
import org.janusgraph.diskstorage.util.BufferUtil;
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.janusgraph.util.system.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StandardScannerExecutor
extends AbstractFuture<ScanMetrics>
implements ScanJobFuture,
Runnable {
    private static final Logger log = LoggerFactory.getLogger(StandardScannerExecutor.class);
    private static final int TIMEOUT_MS = 180000;
    static final int TIME_PER_TRY = 10;
    private final ScanJob job;
    private final Consumer<ScanMetrics> finishJob;
    private final StoreFeatures storeFeatures;
    private final StoreTransaction storeTx;
    private final KeyColumnValueStore store;
    private final int numProcessors;
    private final int workBlockSize;
    private final Configuration jobConfiguration;
    private final Configuration graphConfiguration;
    private final ScanMetrics metrics;
    private boolean hasCompleted = false;
    private boolean interrupted = false;
    private RowsCollector rowsCollector;

    StandardScannerExecutor(ScanJob job, Consumer<ScanMetrics> finishJob, KeyColumnValueStore store, StoreTransaction storeTx, StoreFeatures storeFeatures, int numProcessors, int workBlockSize, Configuration jobConfiguration, Configuration graphConfiguration) {
        this.job = job;
        this.finishJob = finishJob;
        this.store = store;
        this.storeTx = storeTx;
        this.storeFeatures = storeFeatures;
        this.numProcessors = numProcessors;
        this.workBlockSize = workBlockSize;
        this.jobConfiguration = jobConfiguration;
        this.graphConfiguration = graphConfiguration;
        this.metrics = new StandardScanMetrics();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LinkedBlockingQueue<Row> processorQueue;
        try {
            this.job.workerIterationStart(this.jobConfiguration, this.graphConfiguration, this.metrics);
            List<SliceQuery> queries = this.job.getQueries();
            int numQueries = queries.size();
            processorQueue = new LinkedBlockingQueue<Row>(this.graphConfiguration.get(GraphDatabaseConfiguration.PAGE_SIZE, new String[0]) * this.numProcessors * numQueries);
            Preconditions.checkArgument((numQueries > 0 ? 1 : 0) != 0, (String)"Must at least specify one query for job: %s", (Object)this.job);
            if (numQueries > 1) {
                SliceQuery ground = queries.get(0);
                StaticBuffer start = ground.getSliceStart();
                Preconditions.checkArgument((boolean)start.equals(BufferUtil.zeroBuffer(1)), (String)"Expected start of first query to be a single 0s: %s", (Object)start);
                StaticBuffer end = ground.getSliceEnd();
                Preconditions.checkArgument((boolean)end.equals(BufferUtil.oneBuffer(end.length())), (String)"Expected end of first query to be all 1s: %s", (Object)end);
            }
            this.rowsCollector = this.buildScanner(processorQueue, queries);
        }
        catch (Throwable e) {
            log.error("Exception trying to setup the job:", e);
            this.cleanupSilent();
            this.job.workerIterationEnd(this.metrics);
            this.setException(e);
            return;
        }
        Thread[] processors = new Processor[this.numProcessors];
        for (int i = 0; i < processors.length; ++i) {
            processors[i] = new Processor(this.job.clone(), processorQueue);
            processors[i].start();
        }
        try {
            this.rowsCollector.run();
            this.rowsCollector.join();
            for (Thread processor : processors) {
                ((Processor)processor).finish();
            }
            if (!Threads.waitForCompletion(processors, 180000)) {
                log.error("Processor did not terminate in time");
            }
            this.cleanup();
            try {
                this.job.workerIterationEnd(this.metrics);
            }
            catch (IllegalArgumentException e) {
                log.warn("Exception occurred processing worker iteration end. See PR 891.", (Throwable)e);
            }
            if (this.interrupted) {
                this.setException(new InterruptedException("Scanner got interrupted"));
            } else {
                this.finishJob.accept(this.metrics);
                this.set(this.metrics);
            }
        }
        catch (Throwable e) {
            log.error("Exception occurred during job execution:", e);
            this.job.workerIterationEnd(this.metrics);
            this.setException(e);
        }
        finally {
            Threads.terminate(processors);
            this.cleanupSilent();
        }
    }

    private RowsCollector buildScanner(BlockingQueue<Row> processorQueue, List<SliceQuery> queries) throws BackendException {
        if (!this.storeFeatures.hasConsistentScan()) {
            return new SingleThreadRowsCollector(this.store, this.storeTx, queries, this.job.getKeyFilter(), processorQueue);
        }
        return new MultiThreadsRowsCollector(this.store, this.storeFeatures, this.storeTx, queries, this.job.getKeyFilter(), processorQueue, this.graphConfiguration);
    }

    protected void interruptTask() {
        this.interrupted = true;
        this.rowsCollector.interrupt();
    }

    private void cleanup() throws BackendException {
        if (!this.hasCompleted) {
            this.hasCompleted = true;
            if (this.rowsCollector != null) {
                this.rowsCollector.cleanup();
            }
            this.storeTx.rollback();
        }
    }

    private void cleanupSilent() {
        try {
            this.cleanup();
        }
        catch (BackendException ex) {
            log.error("Encountered exception when trying to clean up after failure", (Throwable)ex);
        }
    }

    @Override
    public ScanMetrics getIntermediateResult() {
        return this.metrics;
    }

    private class Processor
    extends Thread {
        private ScanJob job;
        private final BlockingQueue<Row> processorQueue;
        private volatile boolean finished;
        private int numProcessed;

        private Processor(ScanJob job, BlockingQueue<Row> processorQueue) {
            this.job = job;
            this.processorQueue = processorQueue;
            this.finished = false;
            this.numProcessed = 0;
        }

        @Override
        public void run() {
            try {
                this.job.workerIterationStart(StandardScannerExecutor.this.jobConfiguration, StandardScannerExecutor.this.graphConfiguration, StandardScannerExecutor.this.metrics);
                while (!this.finished || !this.processorQueue.isEmpty()) {
                    Row row;
                    while ((row = this.processorQueue.poll(10L, TimeUnit.MILLISECONDS)) != null) {
                        if (this.numProcessed >= StandardScannerExecutor.this.workBlockSize) {
                            this.job.workerIterationEnd(StandardScannerExecutor.this.metrics);
                            this.job = this.job.clone();
                            this.job.workerIterationStart(StandardScannerExecutor.this.jobConfiguration, StandardScannerExecutor.this.graphConfiguration, StandardScannerExecutor.this.metrics);
                            this.numProcessed = 0;
                        }
                        try {
                            this.job.process(row.key, row.entries, StandardScannerExecutor.this.metrics);
                            StandardScannerExecutor.this.metrics.increment(ScanMetrics.Metric.SUCCESS);
                        }
                        catch (Throwable ex) {
                            log.error("Exception processing row [" + row.key + "]: ", ex);
                            StandardScannerExecutor.this.metrics.increment(ScanMetrics.Metric.FAILURE);
                        }
                        ++this.numProcessed;
                    }
                }
            }
            catch (InterruptedException e) {
                log.error("Processing thread interrupted while waiting on queue or processing data", (Throwable)e);
            }
            catch (Throwable e) {
                log.error("Unexpected error processing data", e);
            }
            finally {
                this.job.workerIterationEnd(StandardScannerExecutor.this.metrics);
            }
        }

        public void finish() {
            this.finished = true;
        }
    }

    static class Row {
        final StaticBuffer key;
        final Map<SliceQuery, EntryList> entries;

        Row(StaticBuffer key, Map<SliceQuery, EntryList> entries) {
            this.key = key;
            this.entries = entries;
        }
    }
}

