/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka.consumer;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.TimeoutException;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerCommitException;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerPollException;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractAtLeastOnceKafkaConsumer<T>
implements Lifecycle {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractAtLeastOnceKafkaConsumer.class);
    boolean stopped = false;
    private final KafkaConsumer<String, Buffer> kafkaConsumer;
    private final Set<String> topics;
    private final Pattern topicPattern;
    private final String topicsLogString;
    private final Handler<T> messageHandler;
    private final Handler<Throwable> closeHandler;
    private final Duration pollTimeout;
    private final Map<TopicPartition, OffsetAndMetadata> offsetsToBeCommitted = new HashMap<TopicPartition, OffsetAndMetadata>();
    private boolean respectTtl = true;

    public AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, String topic, Handler<T> messageHandler, Handler<Throwable> closeHandler, long pollTimeout) {
        this(kafkaConsumer, Set.of(Objects.requireNonNull(topic)), messageHandler, closeHandler, pollTimeout);
    }

    public AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, Set<String> topics, Handler<T> messageHandler, Handler<Throwable> closeHandler, long pollTimeout) {
        this(kafkaConsumer, Objects.requireNonNull(topics), null, messageHandler, closeHandler, pollTimeout);
    }

    public AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, Pattern topicPattern, Handler<T> messageHandler, Handler<Throwable> closeHandler, long pollTimeout) {
        this(kafkaConsumer, null, Objects.requireNonNull(topicPattern), messageHandler, closeHandler, pollTimeout);
    }

    private AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, Set<String> topics, Pattern topicPattern, Handler<T> messageHandler, Handler<Throwable> closeHandler, long pollTimeout) {
        Objects.requireNonNull(kafkaConsumer);
        Objects.requireNonNull(messageHandler);
        Objects.requireNonNull(closeHandler);
        this.kafkaConsumer = kafkaConsumer;
        this.messageHandler = messageHandler;
        this.closeHandler = closeHandler;
        this.topics = topics;
        this.topicPattern = topicPattern;
        this.topicsLogString = topics != null ? "[" + topics.stream().limit(3L).collect(Collectors.joining(", ")) + (topics.size() > 3 ? ", ...]" : "]") : topicPattern.toString();
        this.pollTimeout = Duration.ofMillis(pollTimeout);
    }

    protected abstract T createMessage(KafkaConsumerRecord<String, Buffer> var1);

    public Future<Void> start() {
        if (this.stopped) {
            return Future.failedFuture((String)"consumer already stopped");
        }
        this.kafkaConsumer.partitionsAssignedHandler(this::onPartitionsAssigned);
        this.kafkaConsumer.partitionsRevokedHandler(this::onPartitionsRevoked);
        Promise promise = Promise.promise();
        if (this.topics != null) {
            this.kafkaConsumer.subscribe(this.topics, (Handler)promise);
        } else {
            this.kafkaConsumer.subscribe(this.topicPattern, (Handler)promise);
        }
        return promise.future().compose(v -> {
            Promise pollPromise = Promise.promise();
            this.kafkaConsumer.poll(this.pollTimeout, (Handler)pollPromise);
            return pollPromise.future().onSuccess(this::handleBatch).recover(cause -> Future.failedFuture((Throwable)new KafkaConsumerPollException((Throwable)cause))).mapEmpty();
        });
    }

    private void onPartitionsAssigned(Set<TopicPartition> partitionsSet) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("partitions assigned: [{}]", (Object)this.getPartitionsDebugString(partitionsSet));
        }
    }

    private void onPartitionsRevoked(Set<TopicPartition> partitionsSet) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("partitions revoked: [{}]", (Object)this.getPartitionsDebugString(partitionsSet));
        }
    }

    private String getPartitionsDebugString(Set<TopicPartition> partitionsSet) {
        return partitionsSet.size() <= 20 ? partitionsSet.stream().collect(Collectors.groupingBy(TopicPartition::getTopic, Collectors.mapping(TopicPartition::getPartition, Collectors.toCollection(TreeSet::new)))).toString() : partitionsSet.size() + " topic partitions";
    }

    public Future<Void> stop() {
        return this.tryCommitAndClose();
    }

    public final void setRespectTtl(boolean respectTtl) {
        this.respectTtl = respectTtl;
    }

    private void handleBatch(KafkaConsumerRecords<String, Buffer> records) {
        try {
            if (!records.isEmpty()) {
                LOG.debug("polled {} records on {}", (Object)records.size(), (Object)this.topicsLogString);
            }
            for (int i = 0; i < records.size(); ++i) {
                if (this.stopped) {
                    return;
                }
                KafkaConsumerRecord record = records.recordAt(i);
                if (this.respectTtl && KafkaRecordHelper.isTtlElapsed(record.headers())) {
                    this.addToCurrentOffsets((KafkaConsumerRecord<String, Buffer>)record);
                    continue;
                }
                T message = this.createMessage((KafkaConsumerRecord<String, Buffer>)record);
                try {
                    this.messageHandler.handle(message);
                    this.addToCurrentOffsets((KafkaConsumerRecord<String, Buffer>)record);
                    continue;
                }
                catch (RuntimeException messageHandlingError) {
                    LOG.debug("Message handler failed", (Throwable)messageHandlingError);
                    break;
                }
            }
            this.commit(true).compose(ok -> this.poll()).onSuccess(this::handleBatch);
        }
        catch (Exception ex) {
            LOG.error("Consumer failed, closing", (Throwable)ex);
            this.tryCommitAndClose().onComplete(v -> this.closeHandler.handle((Object)ex));
        }
    }

    private Future<KafkaConsumerRecords<String, Buffer>> poll() {
        if (this.stopped) {
            return Future.failedFuture((String)"consumer already stopped");
        }
        Promise pollPromise = Promise.promise();
        this.kafkaConsumer.poll(this.pollTimeout, (Handler)pollPromise);
        return pollPromise.future().recover(cause -> {
            LOG.error("Error polling messages: " + cause);
            KafkaConsumerPollException exception = new KafkaConsumerPollException((Throwable)cause);
            this.closeAndCallHandler(exception);
            return Future.failedFuture((Throwable)exception);
        });
    }

    private void addToCurrentOffsets(KafkaConsumerRecord<String, Buffer> record) {
        TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1L, "");
        this.offsetsToBeCommitted.put(topicPartition, offsetAndMetadata);
    }

    private Future<Void> commit(boolean retry) {
        if (this.offsetsToBeCommitted.isEmpty()) {
            return Future.succeededFuture();
        }
        if (this.stopped) {
            return Future.failedFuture((String)"consumer already stopped");
        }
        return this.commitCurrentOffsets().recover(cause -> {
            LOG.error("Error committing offsets: " + cause);
            if (cause instanceof TimeoutException && retry) {
                LOG.debug("Committing offsets timed out. Maybe increase 'default.api.timeout.ms'?");
                return this.commit(false);
            }
            KafkaConsumerCommitException exception = new KafkaConsumerCommitException((Throwable)cause);
            this.closeAndCallHandler(exception);
            return Future.failedFuture((Throwable)exception);
        });
    }

    private Future<Void> commitCurrentOffsets() {
        if (this.offsetsToBeCommitted.isEmpty()) {
            LOG.debug("no offsets to commit");
            return Future.succeededFuture();
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("committing offsets: {}", this.offsetsToBeCommitted);
        } else {
            LOG.debug("committing current offsets");
        }
        Promise completionHandler = Promise.promise();
        this.kafkaConsumer.commit(this.offsetsToBeCommitted, (Handler)completionHandler);
        return completionHandler.future().map(committedOffsets -> {
            LOG.debug("successfully committed offsets");
            this.offsetsToBeCommitted.clear();
            return null;
        });
    }

    private void closeAndCallHandler(Throwable exception) {
        LOG.error("Closing consumer with cause", exception);
        this.closeConsumer().onComplete(v -> this.closeHandler.handle((Object)exception));
    }

    private Future<Void> tryCommitAndClose() {
        this.stopped = true;
        Promise returnFuture = Promise.promise();
        this.commitCurrentOffsets().onComplete(commitResult -> this.closeConsumer().onComplete(v -> {
            if (commitResult.succeeded()) {
                returnFuture.complete();
            } else {
                returnFuture.fail(commitResult.cause());
            }
        }));
        return returnFuture.future();
    }

    private Future<Void> closeConsumer() {
        this.stopped = true;
        Promise promise = Promise.promise();
        this.kafkaConsumer.close((Handler)promise);
        this.offsetsToBeCommitted.clear();
        return promise.future();
    }
}

