/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rya.api.client.accumulo;

import com.google.common.base.Optional;
import java.util.Objects;
import java.util.Properties;
import org.apache.accumulo.core.client.Connector;
import org.apache.fluo.api.client.FluoClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.api.client.CreatePeriodicPCJ;
import org.apache.rya.api.client.GetInstanceDetails;
import org.apache.rya.api.client.InstanceDoesNotExistException;
import org.apache.rya.api.client.RyaClientException;
import org.apache.rya.api.client.accumulo.AccumuloCommand;
import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
import org.apache.rya.api.client.accumulo.AccumuloGetInstanceDetails;
import org.apache.rya.api.client.accumulo.FluoClientFactory;
import org.apache.rya.api.instance.RyaDetails;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
import org.apache.rya.periodic.notification.notification.CommandNotification;
import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.sail.SailException;

public class AccumuloCreatePeriodicPCJ
extends AccumuloCommand
implements CreatePeriodicPCJ {
    private final GetInstanceDetails getInstanceDetails;

    public AccumuloCreatePeriodicPCJ(AccumuloConnectionDetails connectionDetails, Connector connector) {
        super(connectionDetails, connector);
        this.getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector);
    }

    public String createPeriodicPCJ(String instanceName, String sparql, String periodicTopic, String bootStrapServers) throws RyaClientException {
        Objects.requireNonNull(instanceName);
        Objects.requireNonNull(sparql);
        Optional ryaDetailsHolder = this.getInstanceDetails.getDetails(instanceName);
        boolean ryaInstanceExists = ryaDetailsHolder.isPresent();
        if (!ryaInstanceExists) {
            throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName));
        }
        RyaDetails.PCJIndexDetails pcjIndexDetails = ((RyaDetails)ryaDetailsHolder.get()).getPCJIndexDetails();
        boolean pcjIndexingEnabeld = pcjIndexDetails.isEnabled();
        if (!pcjIndexingEnabeld) {
            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
        }
        Optional fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
        if (fluoDetailsHolder.isPresent()) {
            String fluoAppName = ((RyaDetails.PCJIndexDetails.FluoDetails)fluoDetailsHolder.get()).getUpdateAppName();
            try {
                return this.updateFluoAppAndRegisterWithKafka(instanceName, fluoAppName, sparql, periodicTopic, bootStrapServers);
            }
            catch (RyaDAOException | CreatePeriodicQuery.PeriodicQueryCreationException | PcjException | MalformedQueryException | QueryEvaluationException | RepositoryException | SailException e) {
                throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
            }
            catch (UnsupportedQueryException e) {
                throw new RyaClientException("The new PCJ could not be initialized because it either contains an unsupported query node or an invalid ExportStrategy for the given QueryType.  Projection queries can be exported to either Rya or Kafka,unless they contain an aggregation, in which case they can only be exported to Kafka.  Construct queries can be exportedto Rya and Kafka, and Periodic queries can only be exported to Rya.");
            }
        }
        throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
    }

    private String updateFluoAppAndRegisterWithKafka(String ryaInstance, String fluoAppName, String sparql, String periodicTopic, String bootStrapServers) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException, UnsupportedQueryException, CreatePeriodicQuery.PeriodicQueryCreationException {
        Objects.requireNonNull(sparql);
        Objects.requireNonNull(periodicTopic);
        Objects.requireNonNull(bootStrapServers);
        AccumuloPeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(this.getConnector(), ryaInstance);
        AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
        try (FluoClient fluoClient = new FluoClientFactory().connect(cd.getUsername(), new String(cd.getUserPass()), cd.getInstanceName(), cd.getZookeepers(), fluoAppName);){
            CreatePeriodicQuery periodicPcj = new CreatePeriodicQuery(fluoClient, (PeriodicQueryResultStorage)periodicStorage);
            KafkaNotificationRegistrationClient periodicClient = new KafkaNotificationRegistrationClient(periodicTopic, AccumuloCreatePeriodicPCJ.createProducer(bootStrapServers));
            String string = periodicPcj.withRyaIntegration(sparql, (PeriodicNotificationClient)periodicClient, this.getConnector(), ryaInstance).getQueryId();
            return string;
        }
    }

    private static KafkaProducer<String, CommandNotification> createProducer(String bootStrapServers) {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootStrapServers);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", CommandNotificationSerializer.class.getName());
        return new KafkaProducer(props);
    }
}

