/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.examples.core;

import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import io.confluent.parallelconsumer.RecordContext;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.concurrent.CircuitBreakingException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

public class CoreApp {
    private static final Logger log = LoggerFactory.getLogger(CoreApp.class);
    String inputTopic = "input-topic-" + RandomUtils.nextInt();
    String outputTopic = "output-topic-" + RandomUtils.nextInt();
    ParallelStreamProcessor<String, String> parallelConsumer;

    Consumer<String, String> getKafkaConsumer() {
        return new KafkaConsumer(new Properties());
    }

    Producer<String, String> getKafkaProducer() {
        return new KafkaProducer(new Properties());
    }

    void run() {
        this.parallelConsumer = this.setupParallelConsumer();
        this.postSetup();
        this.parallelConsumer.poll(record -> log.info("Concurrently processing a record: {}", record));
    }

    protected void postSetup() {
    }

    ParallelStreamProcessor<String, String> setupParallelConsumer() {
        Consumer<String, String> kafkaConsumer = this.getKafkaConsumer();
        Producer<String, String> kafkaProducer = this.getKafkaProducer();
        ParallelConsumerOptions options = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).maxConcurrency(1000).consumer(kafkaConsumer).producer(kafkaProducer).build();
        ParallelStreamProcessor eosStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor((ParallelConsumerOptions)options);
        eosStreamProcessor.subscribe((Collection)UniLists.of((Object)this.inputTopic));
        return eosStreamProcessor;
    }

    void close() {
        this.parallelConsumer.close();
    }

    void runPollAndProduce() {
        this.parallelConsumer = this.setupParallelConsumer();
        this.postSetup();
        this.parallelConsumer.pollAndProduce(context -> {
            ConsumerRecord consumerRecord = context.getSingleRecord().getConsumerRecord();
            Result result = this.processBrokerRecord((ConsumerRecord<String, String>)consumerRecord);
            return new ProducerRecord(this.outputTopic, (Object)((String)consumerRecord.key()), (Object)result.payload);
        }, consumeProduceResult -> log.debug("Message {} saved to broker at offset {}", (Object)consumeProduceResult.getOut(), (Object)consumeProduceResult.getMeta().offset()));
    }

    private Result processBrokerRecord(ConsumerRecord<String, String> consumerRecord) {
        return new Result("Some payload from " + (String)consumerRecord.value());
    }

    void customRetryDelay() {
        double multiplier = 0.5;
        boolean baseDelaySecond = true;
        ParallelConsumerOptions.builder().retryDelayProvider(recordContext -> {
            int numberOfFailedAttempts = recordContext.getNumberOfFailedAttempts();
            long delayMillis = (long)(1.0 * Math.pow(0.5, numberOfFailedAttempts) * 1000.0);
            return Duration.ofMillis(delayMillis);
        });
    }

    void maxRetries() {
        ParallelStreamProcessor pc = ParallelStreamProcessor.createEosStreamProcessor(null);
        int maxRetries = 10;
        ConcurrentHashMap retriesCount = new ConcurrentHashMap();
        pc.poll(context -> {
            ConsumerRecord consumerRecord = context.getSingleRecord().getConsumerRecord();
            Long retryCount = retriesCount.computeIfAbsent(consumerRecord, ignore -> 0L);
            if (retryCount < 10L) {
                this.processRecord((ConsumerRecord<String, String>)consumerRecord);
                retriesCount.remove(consumerRecord);
            } else {
                log.warn("Retry count {} exceeded max of {} for record {}", new Object[]{retryCount, 10, consumerRecord});
                retriesCount.remove(consumerRecord);
            }
        });
    }

    private void processRecord(ConsumerRecord<String, String> record) {
    }

    void circuitBreaker() {
        ParallelStreamProcessor pc = ParallelStreamProcessor.createEosStreamProcessor(null);
        ConcurrentHashMap upMap = new ConcurrentHashMap();
        pc.poll(context -> {
            ConsumerRecord consumerRecord = context.getSingleRecord().getConsumerRecord();
            String serverId = this.extractServerId((ConsumerRecord<String, String>)consumerRecord);
            boolean up = upMap.computeIfAbsent(serverId, ignore -> true);
            if (!up) {
                up = this.updateStatusOfSever(serverId);
            }
            if (up) {
                try {
                    this.processRecord((ConsumerRecord<String, String>)consumerRecord);
                }
                catch (CircuitBreakingException e) {
                    log.warn("Server {} is circuitBroken, will retry message when server is up. Record: {}", (Object)serverId, (Object)consumerRecord);
                    upMap.put(serverId, false);
                }
            } else {
                throw new RuntimeException(StringUtils.msg((String)"Server {} currently down, will retry record latter {}", (Object[])new Object[]{up, consumerRecord}));
            }
            upMap.put(serverId, true);
        });
    }

    private boolean updateStatusOfSever(String serverId) {
        return false;
    }

    private String extractServerId(ConsumerRecord<String, String> consumerRecord) {
        return null;
    }

    void batching() {
        ParallelStreamProcessor.createEosStreamProcessor((ParallelConsumerOptions)ParallelConsumerOptions.builder().consumer(this.getKafkaConsumer()).producer(this.getKafkaProducer()).maxConcurrency(100).batchSize(Integer.valueOf(5)).build());
        this.parallelConsumer.poll(context -> {
            List<String> payload = context.stream().map(this::preparePayload).collect(Collectors.toList());
            this.processBatchPayload(payload);
        });
    }

    private void processBatchPayload(List<String> batchPayload) {
    }

    private String preparePayload(RecordContext<String, String> rc) {
        ConsumerRecord consumerRecords = rc.getConsumerRecord();
        int failureCount = rc.getNumberOfFailedAttempts();
        return StringUtils.msg((String)"{}, {}", (Object[])new Object[]{consumerRecords, failureCount});
    }

    static final class Result {
        private final String payload;

        public Result(String payload) {
            this.payload = payload;
        }

        public String getPayload() {
            return this.payload;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Result)) {
                return false;
            }
            Result other = (Result)o;
            String this$payload = this.getPayload();
            String other$payload = other.getPayload();
            return !(this$payload == null ? other$payload != null : !this$payload.equals(other$payload));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $payload = this.getPayload();
            result = result * 59 + ($payload == null ? 43 : $payload.hashCode());
            return result;
        }

        public String toString() {
            return "CoreApp.Result(payload=" + this.getPayload() + ")";
        }
    }
}

