/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.examples.core;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import java.util.Collection;
import java.util.Properties;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

public class CoreApp {
    private static final Logger log = LoggerFactory.getLogger(CoreApp.class);
    String inputTopic = "input-topic-" + RandomUtils.nextInt();
    String outputTopic = "output-topic-" + RandomUtils.nextInt();
    ParallelStreamProcessor<String, String> parallelConsumer;

    Consumer<String, String> getKafkaConsumer() {
        return new KafkaConsumer(new Properties());
    }

    Producer<String, String> getKafkaProducer() {
        return new KafkaProducer(new Properties());
    }

    void run() {
        this.parallelConsumer = this.setupParallelConsumer();
        this.postSetup();
        this.parallelConsumer.poll(record -> log.info("Concurrently processing a record: {}", record));
    }

    protected void postSetup() {
    }

    ParallelStreamProcessor<String, String> setupParallelConsumer() {
        Consumer<String, String> kafkaConsumer = this.getKafkaConsumer();
        Producer<String, String> kafkaProducer = this.getKafkaProducer();
        ParallelConsumerOptions options = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).maxMessagesToQueue(1000).maxNumberMessagesBeyondBaseCommitOffset(1000).consumer(kafkaConsumer).producer(kafkaProducer).build();
        ParallelStreamProcessor eosStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor((ParallelConsumerOptions)options);
        eosStreamProcessor.subscribe((Collection)UniLists.of((Object)this.inputTopic));
        return eosStreamProcessor;
    }

    void close() {
        this.parallelConsumer.close();
    }

    void runPollAndProduce() {
        this.parallelConsumer = this.setupParallelConsumer();
        this.postSetup();
        this.parallelConsumer.pollAndProduce(record -> {
            Result result = this.processBrokerRecord((ConsumerRecord<String, String>)record);
            return new ProducerRecord(this.outputTopic, (Object)((String)record.key()), (Object)result.payload);
        }, consumeProduceResult -> log.debug("Message {} saved to broker at offset {}", (Object)consumeProduceResult.getOut(), (Object)consumeProduceResult.getMeta().offset()));
    }

    private Result processBrokerRecord(ConsumerRecord<String, String> record) {
        return new Result("Some payload from " + (String)record.value());
    }

    static final class Result {
        private final String payload;

        public Result(String payload) {
            this.payload = payload;
        }

        public String getPayload() {
            return this.payload;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Result)) {
                return false;
            }
            Result other = (Result)o;
            String this$payload = this.getPayload();
            String other$payload = other.getPayload();
            return !(this$payload == null ? other$payload != null : !this$payload.equals(other$payload));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $payload = this.getPayload();
            result = result * 59 + ($payload == null ? 43 : $payload.hashCode());
            return result;
        }

        public String toString() {
            return "CoreApp.Result(payload=" + this.getPayload() + ")";
        }
    }
}

