/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rya.kafka.connect.client.command;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Collections;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.rya.kafka.connect.api.StatementsDeserializer;
import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand;
import org.eclipse.rdf4j.model.Statement;

@DefaultAnnotation(value={NonNull.class})
public class ReadStatementsCommand
implements RyaKafkaClientCommand {
    @Override
    public String getCommand() {
        return "read";
    }

    @Override
    public String getDescription() {
        return "Reads Statements from the specified Kafka topic.";
    }

    @Override
    public boolean validArguments(String[] args) {
        boolean valid = true;
        try {
            new JCommander((Object)new RyaKafkaClientCommand.KafkaParameters(), args);
        }
        catch (ParameterException e) {
            valid = false;
        }
        return valid;
    }

    @Override
    public void execute(String[] args) throws RyaKafkaClientCommand.ArgumentsException, RyaKafkaClientCommand.ExecutionException {
        Objects.requireNonNull(args);
        RyaKafkaClientCommand.KafkaParameters params = new RyaKafkaClientCommand.KafkaParameters();
        try {
            new JCommander((Object)params, args);
        }
        catch (ParameterException e) {
            throw new RyaKafkaClientCommand.ArgumentsException("Could not read the Statements from the topic because of invalid command line parameters.", e);
        }
        try (KafkaConsumer<String, Set<Statement>> consumer = this.makeConsumer(params);){
            consumer.subscribe(Collections.singleton(params.topic));
            for (ConsumerRecord record : consumer.poll(500L)) {
                for (Statement stmt : (Set)record.value()) {
                    System.out.println(stmt);
                }
            }
        }
    }

    private KafkaConsumer<String, Set<Statement>> makeConsumer(RyaKafkaClientCommand.KafkaParameters params) {
        Objects.requireNonNull(params);
        Properties props = new Properties();
        props.put("bootstrap.servers", params.bootstrapServers);
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StatementsDeserializer.class);
        String groupId = UUID.randomUUID().toString();
        props.put("group.id", groupId);
        props.put("client.id", "Kafka-Connect-Client-" + groupId);
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", (Object)false);
        return new KafkaConsumer(props);
    }
}

