package org.apache.rya.kafka.connect.client.command;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.rya.kafka.connect.api.StatementsDeserializer;
import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand;
import org.eclipse.rdf4j.model.Statement;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.class */
public class ReadStatementsCommand implements RyaKafkaClientCommand {
    @Override // org.apache.rya.kafka.connect.client.RyaKafkaClientCommand
    public String getCommand() {
        return "read";
    }

    @Override // org.apache.rya.kafka.connect.client.RyaKafkaClientCommand
    public String getDescription() {
        return "Reads Statements from the specified Kafka topic.";
    }

    @Override // org.apache.rya.kafka.connect.client.RyaKafkaClientCommand
    public boolean validArguments(String[] strArr) {
        boolean z = true;
        try {
            new JCommander(new RyaKafkaClientCommand.KafkaParameters(), strArr);
        } catch (ParameterException e) {
            z = false;
        }
        return z;
    }

    @Override // org.apache.rya.kafka.connect.client.RyaKafkaClientCommand
    public void execute(String[] strArr) throws RyaKafkaClientCommand.ArgumentsException, RyaKafkaClientCommand.ExecutionException {
        Objects.requireNonNull(strArr);
        RyaKafkaClientCommand.KafkaParameters kafkaParameters = new RyaKafkaClientCommand.KafkaParameters();
        try {
            new JCommander(kafkaParameters, strArr);
            KafkaConsumer<String, Set<Statement>> makeConsumer = makeConsumer(kafkaParameters);
            Throwable th = null;
            try {
                try {
                    makeConsumer.subscribe(Collections.singleton(kafkaParameters.topic));
                    Iterator<ConsumerRecord<String, Set<Statement>>> it = makeConsumer.poll(500L).iterator();
                    while (it.hasNext()) {
                        Iterator<Statement> it2 = it.next().value().iterator();
                        while (it2.hasNext()) {
                            System.out.println(it2.next());
                        }
                    }
                    if (makeConsumer != null) {
                        if (0 == 0) {
                            makeConsumer.close();
                            return;
                        }
                        try {
                            makeConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (makeConsumer != null) {
                    if (th != null) {
                        try {
                            makeConsumer.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        makeConsumer.close();
                    }
                }
                throw th4;
            }
        } catch (ParameterException e) {
            throw new RyaKafkaClientCommand.ArgumentsException("Could not read the Statements from the topic because of invalid command line parameters.", e);
        }
    }

    private KafkaConsumer<String, Set<Statement>> makeConsumer(RyaKafkaClientCommand.KafkaParameters kafkaParameters) {
        Objects.requireNonNull(kafkaParameters);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaParameters.bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StatementsDeserializer.class);
        String uuid = UUID.randomUUID().toString();
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, uuid);
        properties.put("client.id", "Kafka-Connect-Client-" + uuid);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new KafkaConsumer<>(properties);
    }
}
