package org.apache.rya.kafka.connect.client.command;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.kafka.connect.api.StatementsSerializer;
import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand;
import org.apache.rya.rdftriplestore.utils.RdfFormatUtils;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.rio.RDFHandlerException;
import org.eclipse.rdf4j.rio.RDFParseException;
import org.eclipse.rdf4j.rio.RDFParser;
import org.eclipse.rdf4j.rio.Rio;
import org.eclipse.rdf4j.rio.UnsupportedRDFormatException;
import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler;
import org.elasticsearch.discovery.DiscoverySettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.class */
public class WriteStatementsCommand implements RyaKafkaClientCommand {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) WriteStatementsCommand.class);

    /* loaded from: input_file:org/apache/rya/kafka/connect/client/command/WriteStatementsCommand$WriteParameters.class */
    public static class WriteParameters extends RyaKafkaClientCommand.KafkaParameters {

        @Parameter(names = {"--statementsFile", "-f"}, required = true, description = "The file of RDF statements to load into Rya Streams.")
        public String statementsFile;
    }

    @Override // org.apache.rya.kafka.connect.client.RyaKafkaClientCommand
    public String getCommand() {
        return DiscoverySettings.DEFAULT_NO_MASTER_BLOCK;
    }

    @Override // org.apache.rya.kafka.connect.client.RyaKafkaClientCommand
    public String getDescription() {
        return "Writes Statements to the specified Kafka topic.";
    }

    @Override // org.apache.rya.kafka.connect.client.RyaKafkaClientCommand
    public boolean validArguments(String[] strArr) {
        boolean z = true;
        try {
            new JCommander(new WriteParameters(), strArr);
        } catch (ParameterException e) {
            z = false;
        }
        return z;
    }

    @Override // org.apache.rya.kafka.connect.client.RyaKafkaClientCommand
    public String getUsage() {
        JCommander jCommander = new JCommander(new WriteParameters());
        StringBuilder sb = new StringBuilder();
        jCommander.usage(sb);
        return sb.toString();
    }

    @Override // org.apache.rya.kafka.connect.client.RyaKafkaClientCommand
    public void execute(String[] strArr) throws RyaKafkaClientCommand.ArgumentsException, RyaKafkaClientCommand.ExecutionException {
        Objects.requireNonNull(strArr);
        final WriteParameters writeParameters = new WriteParameters();
        try {
            new JCommander(writeParameters, strArr);
            Path path = Paths.get(writeParameters.statementsFile, new String[0]);
            if (!path.toFile().exists()) {
                throw new RyaKafkaClientCommand.ArgumentsException("Could not load statements at path '" + path + "' because that file does not exist. Make sure you've entered the correct path.");
            }
            String path2 = path.getFileName().toString();
            RDFFormat forFileName = RdfFormatUtils.forFileName(path2);
            if (forFileName == null) {
                throw new UnsupportedRDFormatException("Unknown RDF format for the file: " + path2);
            }
            RDFParser createParser = Rio.createParser(forFileName);
            final Producer<String, Set<Statement>> makeProducer = makeProducer(writeParameters);
            Throwable th = null;
            try {
                try {
                    createParser.setRDFHandler(new AbstractRDFHandler() { // from class: org.apache.rya.kafka.connect.client.command.WriteStatementsCommand.1
                        private Set<Statement> batch = new HashSet(5);

                        @Override // org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler, org.eclipse.rdf4j.rio.RDFHandler
                        public void startRDF() throws RDFHandlerException {
                            WriteStatementsCommand.log.trace("Starting loading statements.");
                        }

                        @Override // org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler, org.eclipse.rdf4j.rio.RDFHandler
                        public void handleStatement(Statement statement) throws RDFHandlerException {
                            WriteStatementsCommand.log.trace("Adding statement.");
                            this.batch.add(statement);
                            if (this.batch.size() == 5) {
                                flushBatch();
                            }
                        }

                        @Override // org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler, org.eclipse.rdf4j.rio.RDFHandler
                        public void endRDF() throws RDFHandlerException {
                            if (!this.batch.isEmpty()) {
                                flushBatch();
                            }
                            WriteStatementsCommand.log.trace("Done.");
                        }

                        private void flushBatch() {
                            WriteStatementsCommand.log.trace("Flushing batch of size " + this.batch.size());
                            makeProducer.send(new ProducerRecord(writeParameters.topic, null, this.batch));
                            this.batch = new HashSet(5);
                            makeProducer.flush();
                        }
                    });
                    try {
                        createParser.parse(Files.newInputStream(path, new OpenOption[0]), "");
                        if (makeProducer != null) {
                            if (0 == 0) {
                                makeProducer.close();
                                return;
                            }
                            try {
                                makeProducer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                    } catch (IOException | RDFHandlerException | RDFParseException e) {
                        throw new RyaKafkaClientCommand.ExecutionException("Could not load the RDF file's Statements into the Kafka topic.", e);
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (makeProducer != null) {
                    if (th != null) {
                        try {
                            makeProducer.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        makeProducer.close();
                    }
                }
                throw th4;
            }
        } catch (ParameterException e2) {
            throw new RyaKafkaClientCommand.ArgumentsException("Could not stream the query's results because of invalid command line parameters.", e2);
        }
    }

    private static Producer<String, Set<Statement>> makeProducer(RyaKafkaClientCommand.KafkaParameters kafkaParameters) {
        Objects.requireNonNull(kafkaParameters);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaParameters.bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StatementsSerializer.class.getName());
        return new KafkaProducer(properties);
    }
}
