/*
 * Decompiled with CFR 0.152.
 */
package org.cloudgraph.rdb.graph;

import commonj.sdo.DataGraph;
import commonj.sdo.Property;
import commonj.sdo.Type;
import java.sql.Connection;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.cloudgraph.common.concurrent.ConfigProps;
import org.cloudgraph.common.concurrent.GraphMetricVisitor;
import org.cloudgraph.common.concurrent.SubgraphTask;
import org.cloudgraph.common.concurrent.Traversal;
import org.cloudgraph.rdb.filter.RDBStatementExecutor;
import org.cloudgraph.rdb.filter.RDBStatementFactory;
import org.cloudgraph.rdb.graph.ParallelSubgraphTask;
import org.cloudgraph.store.lang.DefaultAssembler;
import org.cloudgraph.store.lang.LangStoreGraphAssembler;
import org.cloudgraph.store.lang.StatementExecutor;
import org.cloudgraph.store.lang.StatementFactory;
import org.plasma.query.collector.SelectionCollector;
import org.plasma.sdo.PlasmaDataGraphVisitor;
import org.plasma.sdo.PlasmaDataObject;
import org.plasma.sdo.PlasmaProperty;
import org.plasma.sdo.PlasmaType;
import org.plasma.sdo.access.provider.common.PropertyPair;
import org.plasma.sdo.core.CoreNode;

public class ParallelGraphAssembler
extends DefaultAssembler
implements LangStoreGraphAssembler {
    private static Log log = LogFactory.getLog(ParallelGraphAssembler.class);
    private ThreadPoolExecutor executorService;
    private ConfigProps config;

    public ParallelGraphAssembler(PlasmaType rootType, SelectionCollector collector, Timestamp snapshotDate, ConfigProps config, Connection con) {
        super(rootType, collector, (StatementFactory)new RDBStatementFactory(), (StatementExecutor)new RDBStatementExecutor(con), new ConcurrentHashMap(), snapshotDate);
        this.executorService = new ThreadPoolExecutor(config.getMinThreadPoolSize(), config.getMaxThreadPoolSize(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
        this.config = config;
    }

    public ThreadPoolExecutor getExecutorService() {
        return this.executorService;
    }

    public ConfigProps getConfig() {
        return this.config;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void link(PlasmaDataObject target, PlasmaDataObject source, PlasmaProperty sourceProperty) {
        PlasmaDataObject plasmaDataObject = source;
        synchronized (plasmaDataObject) {
            PlasmaDataObject plasmaDataObject2 = target;
            synchronized (plasmaDataObject2) {
                super.link(target, source, sourceProperty);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected PlasmaDataObject createDataObject(List<PropertyPair> row, PlasmaDataObject source, PlasmaProperty sourceProperty) {
        ParallelGraphAssembler parallelGraphAssembler = this;
        synchronized (parallelGraphAssembler) {
            return super.createDataObject(row, source, sourceProperty);
        }
    }

    public void assemble(List<PropertyPair> results) {
        ParallelSubgraphTask task;
        long before = System.currentTimeMillis();
        DataGraph dataGraph = this.initRoot(results);
        CoreNode rootNode = (CoreNode)dataGraph.getRootObject();
        ArrayList<Traversal> traversals = new ArrayList<Traversal>();
        for (PropertyPair pair : results) {
            if (pair.getProp().isMany() || pair.getProp().getType().isDataType()) continue;
            List childKeyProps = this.getChildKeyPairs(pair);
            Traversal trav2 = new Traversal((PlasmaType)pair.getProp().getType(), this.root, pair.getProp(), childKeyProps, 1);
            traversals.add(trav2);
        }
        Set props = this.collector.getProperties((Type)this.rootType);
        for (Property p : props) {
            PlasmaProperty prop = (PlasmaProperty)p;
            if (!prop.isMany() || prop.getType().isDataType()) continue;
            List list = this.getChildKeyPairs(this.root, prop);
            Traversal trav3 = new Traversal((PlasmaType)prop.getType(), this.root, prop, list, 1);
            traversals.add(trav3);
        }
        this.logPoolStatistics();
        int available = this.numThreadsAvailable();
        if (available > traversals.size()) {
            available = traversals.size();
        }
        ArrayList<ParallelSubgraphTask> concurrentTasks = new ArrayList<ParallelSubgraphTask>();
        for (int i = 0; i < available; ++i) {
            Traversal traversal = (Traversal)traversals.get(i);
            task = new ParallelSubgraphTask(traversal.getSubrootType(), traversal.getSource(), this.collector, this.getStatementFactory(), this.getStatementExecutor(), traversal.getSourceProperty(), traversal.getChildKeyPairs(), traversal.getLevel(), i, this);
            concurrentTasks.add(task);
        }
        for (SubgraphTask subgraphTask : concurrentTasks) {
            subgraphTask.start();
        }
        for (SubgraphTask subgraphTask : concurrentTasks) {
            subgraphTask.join();
        }
        for (int i = available; i < traversals.size(); ++i) {
            Traversal traversal = (Traversal)traversals.get(i);
            task = new ParallelSubgraphTask(traversal.getSubrootType(), traversal.getSource(), this.collector, this.getStatementFactory(), this.getStatementExecutor(), traversal.getSourceProperty(), traversal.getChildKeyPairs(), traversal.getLevel(), traversals.size(), this);
            task.assemble();
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("completed root " + this.root));
        }
        long after = System.currentTimeMillis();
        rootNode.getValueObject().put("GraphAssemblyTime", (Object)(after - before));
        GraphMetricVisitor visitor = new GraphMetricVisitor();
        this.root.accept((PlasmaDataGraphVisitor)visitor);
        rootNode.getValueObject().put("GraphNodeCount", (Object)visitor.getCount());
        rootNode.getValueObject().put("GraphDepth", (Object)visitor.getDepth());
        rootNode.getValueObject().put("GraphThreadCount", (Object)visitor.getThreadCount());
    }

    protected void assemble(PlasmaType targetType, PlasmaDataObject source, PlasmaProperty sourceProperty, List<PropertyPair> childKeyPairs, int level) {
    }

    public void logPoolStatistics() {
        if (log.isDebugEnabled()) {
            log.debug((Object)("active: " + this.executorService.getActiveCount() + ", size: " + this.executorService.getPoolSize()));
        }
    }

    public boolean threadsAvailable() {
        return this.executorService.getActiveCount() < this.executorService.getMaximumPoolSize();
    }

    public int numThreadsAvailable() {
        int result = this.executorService.getMaximumPoolSize() - this.executorService.getActiveCount();
        if (result < 0) {
            result = 0;
        }
        return result;
    }
}

