/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rya.kafka.connect.client.command;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.kafka.connect.api.StatementsSerializer;
import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand;
import org.apache.rya.rdftriplestore.utils.RdfFormatUtils;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.rio.RDFHandler;
import org.eclipse.rdf4j.rio.RDFHandlerException;
import org.eclipse.rdf4j.rio.RDFParseException;
import org.eclipse.rdf4j.rio.RDFParser;
import org.eclipse.rdf4j.rio.Rio;
import org.eclipse.rdf4j.rio.UnsupportedRDFormatException;
import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefaultAnnotation(value={NonNull.class})
public class WriteStatementsCommand
implements RyaKafkaClientCommand {
    private static final Logger log = LoggerFactory.getLogger(WriteStatementsCommand.class);

    @Override
    public String getCommand() {
        return "write";
    }

    @Override
    public String getDescription() {
        return "Writes Statements to the specified Kafka topic.";
    }

    @Override
    public boolean validArguments(String[] args) {
        boolean valid = true;
        try {
            new JCommander((Object)new WriteParameters(), args);
        }
        catch (ParameterException e) {
            valid = false;
        }
        return valid;
    }

    @Override
    public String getUsage() {
        JCommander parser = new JCommander((Object)new WriteParameters());
        StringBuilder usage = new StringBuilder();
        parser.usage(usage);
        return usage.toString();
    }

    @Override
    public void execute(String[] args) throws RyaKafkaClientCommand.ArgumentsException, RyaKafkaClientCommand.ExecutionException {
        Objects.requireNonNull(args);
        final WriteParameters params = new WriteParameters();
        try {
            new JCommander((Object)params, args);
        }
        catch (ParameterException e) {
            throw new RyaKafkaClientCommand.ArgumentsException("Could not stream the query's results because of invalid command line parameters.", e);
        }
        Path statementsPath = Paths.get(params.statementsFile, new String[0]);
        if (!statementsPath.toFile().exists()) {
            throw new RyaKafkaClientCommand.ArgumentsException("Could not load statements at path '" + statementsPath + "' because that file does not exist. Make sure you've entered the correct path.");
        }
        String filename = statementsPath.getFileName().toString();
        RDFFormat format = RdfFormatUtils.forFileName((String)filename);
        if (format == null) {
            throw new UnsupportedRDFormatException("Unknown RDF format for the file: " + filename);
        }
        RDFParser parser = Rio.createParser((RDFFormat)format);
        try (final Producer<String, Set<Statement>> producer = WriteStatementsCommand.makeProducer(params);){
            parser.setRDFHandler((RDFHandler)new AbstractRDFHandler(){
                private Set<Statement> batch = new HashSet<Statement>(5);

                public void startRDF() throws RDFHandlerException {
                    log.trace("Starting loading statements.");
                }

                public void handleStatement(Statement stmnt) throws RDFHandlerException {
                    log.trace("Adding statement.");
                    this.batch.add(stmnt);
                    if (this.batch.size() == 5) {
                        this.flushBatch();
                    }
                }

                public void endRDF() throws RDFHandlerException {
                    if (!this.batch.isEmpty()) {
                        this.flushBatch();
                    }
                    log.trace("Done.");
                }

                private void flushBatch() {
                    log.trace("Flushing batch of size " + this.batch.size());
                    producer.send(new ProducerRecord(params.topic, null, this.batch));
                    this.batch = new HashSet<Statement>(5);
                    producer.flush();
                }
            });
            try {
                parser.parse(Files.newInputStream(statementsPath, new OpenOption[0]), "");
            }
            catch (IOException | RDFHandlerException | RDFParseException e) {
                throw new RyaKafkaClientCommand.ExecutionException("Could not load the RDF file's Statements into the Kafka topic.", e);
            }
        }
    }

    private static Producer<String, Set<Statement>> makeProducer(RyaKafkaClientCommand.KafkaParameters params) {
        Objects.requireNonNull(params);
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", params.bootstrapServers);
        props.setProperty("key.serializer", StringSerializer.class.getName());
        props.setProperty("value.serializer", StatementsSerializer.class.getName());
        return new KafkaProducer(props);
    }

    public static class WriteParameters
    extends RyaKafkaClientCommand.KafkaParameters {
        @Parameter(names={"--statementsFile", "-f"}, required=true, description="The file of RDF statements to load into Rya Streams.")
        public String statementsFile;
    }
}

