/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.examples.vertx;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.vertx.JStreamVertxParallelStreamProcessor;
import io.confluent.parallelconsumer.vertx.VertxParallelEoSStreamProcessor;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

public class VertxApp {
    private static final Logger log = LoggerFactory.getLogger(VertxApp.class);
    static String inputTopic = "input-topic-" + RandomUtils.nextInt();
    JStreamVertxParallelStreamProcessor<String, String> parallelConsumer;

    Consumer<String, String> getKafkaConsumer() {
        return new KafkaConsumer(new Properties());
    }

    Producer<String, String> getKafkaProducer() {
        return new KafkaProducer(new Properties());
    }

    void run() {
        Consumer<String, String> kafkaConsumer = this.getKafkaConsumer();
        Producer<String, String> kafkaProducer = this.getKafkaProducer();
        ParallelConsumerOptions options = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).consumer(kafkaConsumer).producer(kafkaProducer).build();
        this.parallelConsumer = JStreamVertxParallelStreamProcessor.createEosStreamProcessor((ParallelConsumerOptions)options);
        this.parallelConsumer.subscribe((Collection)UniLists.of((Object)inputTopic));
        this.postSetup();
        int port = this.getPort();
        Stream resultStream = this.parallelConsumer.vertxHttpReqInfoStream(context -> {
            ConsumerRecord consumerRecord = context.getSingleConsumerRecord();
            log.info("Concurrently constructing and returning RequestInfo from record: {}", (Object)consumerRecord);
            Map params = UniMaps.of((Object)"recordKey", (Object)((String)consumerRecord.key()), (Object)"payload", (Object)((String)consumerRecord.value()));
            return new VertxParallelEoSStreamProcessor.RequestInfo("localhost", port, "/api", params);
        });
        resultStream.forEach(x -> log.info("From result stream: {}", x));
    }

    protected int getPort() {
        return 8080;
    }

    void close() {
        this.parallelConsumer.closeDrainFirst();
    }

    protected void postSetup() {
    }
}

