/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rya.indexing.pcj.fluo.api;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
import org.apache.rya.api.client.CreatePCJ;
import org.apache.rya.api.domain.RyaIRI;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.persist.query.BatchRyaQuery;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.calrissian.mango.collect.CloseableIterable;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.algebra.StatementPattern;

@DefaultAnnotation(value={NonNull.class})
public class CreateFluoPcj {
    private static final Logger log = Logger.getLogger(CreateFluoPcj.class);
    private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000;
    private static final int DEFAULT_JOIN_BATCH_SIZE = 5000;
    private final int spInsertBatchSize;
    private final int joinBatchSize;

    public CreateFluoPcj() {
        this(1000, 5000);
    }

    public CreateFluoPcj(int spInsertBatchSize, int joinBatchSize) {
        Preconditions.checkArgument((spInsertBatchSize > 0 ? 1 : 0) != 0, (Object)("The SP insert batch size '" + spInsertBatchSize + "' must be greater than 0."));
        Preconditions.checkArgument((joinBatchSize > 0 ? 1 : 0) != 0, (Object)("The Join batch size '" + joinBatchSize + "' must be greater than 0."));
        this.spInsertBatchSize = spInsertBatchSize;
        this.joinBatchSize = joinBatchSize;
    }

    public FluoQuery createPcj(String sparql, FluoClient fluo) throws MalformedQueryException, UnsupportedQueryException {
        Preconditions.checkNotNull((Object)sparql);
        Preconditions.checkNotNull((Object)fluo);
        String pcjId = FluoQueryUtils.createNewPcjId();
        return this.createPcj(pcjId, sparql, Sets.newHashSet((Object[])new CreatePCJ.ExportStrategy[]{CreatePCJ.ExportStrategy.KAFKA}), fluo);
    }

    public FluoQuery createPcj(String pcjId, String sparql, Set<CreatePCJ.ExportStrategy> strategies, FluoClient fluo) throws MalformedQueryException, UnsupportedQueryException {
        Objects.requireNonNull(pcjId);
        Objects.requireNonNull(sparql);
        Objects.requireNonNull(strategies);
        Objects.requireNonNull(fluo);
        FluoQuery fluoQuery = this.makeFluoQuery(sparql, pcjId, strategies);
        this.writeFluoQuery(fluo, fluoQuery, pcjId);
        return fluoQuery;
    }

    public FluoQuery createPcj(String pcjId, PrecomputedJoinStorage pcjStorage, FluoClient fluo) throws MalformedQueryException, PcjException, UnsupportedQueryException {
        Objects.requireNonNull(pcjId);
        Objects.requireNonNull(pcjStorage);
        Objects.requireNonNull(fluo);
        PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
        String sparql = pcjMetadata.getSparql();
        return this.createPcj(pcjId, sparql, Sets.newHashSet((Object[])new CreatePCJ.ExportStrategy[]{CreatePCJ.ExportStrategy.RYA}), fluo);
    }

    private FluoQuery makeFluoQuery(String sparql, String pcjId, Set<CreatePCJ.ExportStrategy> strategies) throws MalformedQueryException, UnsupportedQueryException {
        String queryId = NodeType.generateNewIdForType((NodeType)NodeType.QUERY, (String)pcjId);
        SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder().setExportStrategies(strategies).setFluoQueryId(queryId).setSparql(sparql).setJoinBatchSize(this.joinBatchSize);
        FluoQuery query = builder.build();
        if (query.getQueryType() == CreatePCJ.QueryType.PERIODIC && !Sets.newHashSet((Object[])new CreatePCJ.ExportStrategy[]{CreatePCJ.ExportStrategy.PERIODIC}).containsAll(strategies)) {
            throw new UnsupportedQueryException("Periodic Queries must only utilize the PeriodicExport or the NoOpExport ExportStrategy.");
        }
        if (query.getQueryType() != CreatePCJ.QueryType.PERIODIC && strategies.contains(CreatePCJ.ExportStrategy.PERIODIC)) {
            throw new UnsupportedQueryException("Only Periodic Queries can utilize the PeriodicExport ExportStrategy.");
        }
        return query;
    }

    private void writeFluoQuery(FluoClient fluo, FluoQuery fluoQuery, String pcjId) {
        try (Transaction tx = fluo.newTransaction();){
            new FluoQueryMetadataDAO().write((TransactionBase)tx, fluoQuery);
            tx.commit();
        }
    }

    public String withRyaIntegration(String sparql, FluoClient fluo, Connector accumulo, String ryaInstance) throws MalformedQueryException, PcjException, RyaDAOException, UnsupportedQueryException {
        Objects.requireNonNull(sparql);
        Objects.requireNonNull(fluo);
        Objects.requireNonNull(accumulo);
        Objects.requireNonNull(ryaInstance);
        FluoQuery fluoQuery = this.createPcj(sparql, fluo);
        this.importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance);
        return fluoQuery.getQueryMetadata().getNodeId();
    }

    public FluoQuery withRyaIntegration(String pcjId, String sparql, Set<CreatePCJ.ExportStrategy> strategies, FluoClient fluo, Connector accumulo, String ryaInstance) throws MalformedQueryException, PcjException, RyaDAOException, UnsupportedQueryException {
        Objects.requireNonNull(pcjId);
        Objects.requireNonNull(sparql);
        Objects.requireNonNull(fluo);
        Objects.requireNonNull(accumulo);
        Objects.requireNonNull(ryaInstance);
        FluoQuery fluoQuery = this.createPcj(pcjId, sparql, strategies, fluo);
        this.importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance);
        return fluoQuery;
    }

    public FluoQuery withRyaIntegration(String pcjId, PrecomputedJoinStorage pcjStorage, FluoClient fluo, Connector accumulo, String ryaInstance) throws MalformedQueryException, PcjException, RyaDAOException, UnsupportedQueryException {
        Objects.requireNonNull(pcjId);
        Objects.requireNonNull(pcjStorage);
        Objects.requireNonNull(fluo);
        Objects.requireNonNull(accumulo);
        Objects.requireNonNull(ryaInstance);
        PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
        String sparql = pcjMetadata.getSparql();
        return this.withRyaIntegration(pcjId, sparql, Sets.newHashSet((Object[])new CreatePCJ.ExportStrategy[]{CreatePCJ.ExportStrategy.RYA}), fluo, accumulo, ryaInstance);
    }

    private void importHistoricResultsIntoFluo(FluoClient fluo, FluoQuery fluoQuery, Connector accumulo, String ryaInstance) throws RyaDAOException {
        HashSet<RyaStatement> queryBatch = new HashSet<RyaStatement>();
        for (StatementPatternMetadata patternMetadata : fluoQuery.getStatementPatternMetadata()) {
            StatementPattern pattern = FluoStringConverter.toStatementPattern((String)patternMetadata.getStatementPattern());
            queryBatch.add(CreateFluoPcj.spToRyaStatement(pattern));
        }
        AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
        conf.setTablePrefix(ryaInstance);
        conf.setAuths(this.getAuths(accumulo));
        try (AccumuloRyaQueryEngine queryEngine = new AccumuloRyaQueryEngine(accumulo, conf);
             CloseableIterable queryIterable = queryEngine.query(new BatchRyaQuery(queryBatch));){
            HashSet<RyaStatement> triplesBatch = new HashSet<RyaStatement>();
            for (RyaStatement ryaStatement : queryIterable) {
                if (triplesBatch.size() == this.spInsertBatchSize) {
                    CreateFluoPcj.writeBatch(fluo, triplesBatch);
                    triplesBatch.clear();
                }
                triplesBatch.add(ryaStatement);
            }
            if (!triplesBatch.isEmpty()) {
                CreateFluoPcj.writeBatch(fluo, triplesBatch);
                triplesBatch.clear();
            }
        }
        catch (IOException e) {
            log.warn((Object)"Ignoring IOException thrown while closing the AccumuloRyaQueryEngine used by CreatePCJ.", (Throwable)e);
        }
    }

    private static void writeBatch(FluoClient fluo, Set<RyaStatement> batch) {
        Preconditions.checkNotNull((Object)fluo);
        Preconditions.checkNotNull(batch);
        new InsertTriples().insert(fluo, batch);
    }

    private static RyaStatement spToRyaStatement(StatementPattern sp) {
        Value subjVal = sp.getSubjectVar().getValue();
        Value predVal = sp.getPredicateVar().getValue();
        Value objVal = sp.getObjectVar().getValue();
        RyaIRI subjIRI = null;
        RyaIRI predIRI = null;
        RyaType objType = null;
        if (subjVal != null) {
            if (!(subjVal instanceof Resource)) {
                throw new AssertionError((Object)"Subject must be a Resource.");
            }
            subjIRI = RdfToRyaConversions.convertResource((Resource)((Resource)subjVal));
        }
        if (predVal != null) {
            if (!(predVal instanceof IRI)) {
                throw new AssertionError((Object)"Predicate must be a IRI.");
            }
            predIRI = RdfToRyaConversions.convertIRI((IRI)((IRI)predVal));
        }
        if (objVal != null) {
            objType = RdfToRyaConversions.convertValue((Value)objVal);
        }
        return new RyaStatement(subjIRI, predIRI, objType);
    }

    private String[] getAuths(Connector accumulo) {
        try {
            Authorizations auths = accumulo.securityOperations().getUserAuthorizations(accumulo.whoami());
            List authList = auths.getAuthorizations();
            String[] authArray = new String[authList.size()];
            for (int i = 0; i < authList.size(); ++i) {
                authArray[i] = new String((byte[])authList.get(i), "UTF-8");
            }
            return authArray;
        }
        catch (UnsupportedEncodingException | AccumuloException | AccumuloSecurityException e) {
            throw new RuntimeException("Cannot read authorizations for user: " + accumulo.whoami());
        }
    }
}

