/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.commandrouter.impl.kafka;

import io.opentracing.Span;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.kafka.common.TopicPartition;
import org.eclipse.hono.client.command.CommandAlreadyProcessedException;
import org.eclipse.hono.client.command.CommandToBeReprocessedException;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommandContext;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaCommandProcessingQueue {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaCommandProcessingQueue.class);
    private final Map<TopicPartition, TopicPartitionCommandQueue> commandQueues = new HashMap<TopicPartition, TopicPartitionCommandQueue>();
    private final Context vertxContext;

    public KafkaCommandProcessingQueue(Context vertxContext) {
        this.vertxContext = Objects.requireNonNull(vertxContext);
    }

    public void add(KafkaBasedCommandContext commandContext) {
        Objects.requireNonNull(commandContext);
        KafkaConsumerRecord record = commandContext.getCommand().getRecord();
        TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
        TopicPartitionCommandQueue commandQueue = this.commandQueues.computeIfAbsent(topicPartition, k -> new TopicPartitionCommandQueue());
        commandQueue.add(commandContext);
    }

    public boolean remove(KafkaBasedCommandContext commandContext) {
        Objects.requireNonNull(commandContext);
        KafkaConsumerRecord record = commandContext.getCommand().getRecord();
        TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
        return Optional.ofNullable(this.commandQueues.get(topicPartition)).map(commandQueue -> commandQueue.remove(commandContext)).orElse(false);
    }

    public Future<Void> applySendCommandAction(KafkaBasedCommandContext commandContext, Supplier<Future<Void>> sendActionSupplier) {
        TopicPartition topicPartition = new TopicPartition(commandContext.getCommand().getRecord().topic(), commandContext.getCommand().getRecord().partition());
        TopicPartitionCommandQueue commandQueue = this.commandQueues.get(topicPartition);
        if (commandQueue == null) {
            LOG.info("command won't be sent - commands received from partition [{}] aren't handled by this consumer anymore [{}]", (Object)topicPartition, (Object)commandContext.getCommand());
            TracingHelper.logError((Span)commandContext.getTracingSpan(), (String)String.format("command won't be sent - commands received from partition [%s] aren't handled by this consumer anymore", topicPartition));
            CommandToBeReprocessedException error = new CommandToBeReprocessedException();
            commandContext.release((Throwable)error);
            return Future.failedFuture((Throwable)error);
        }
        return commandQueue.applySendCommandAction(commandContext, sendActionSupplier);
    }

    public void setCurrentlyHandledPartitions(Collection<TopicPartition> partitions) {
        Objects.requireNonNull(partitions);
        this.removeCommandQueueEntries(commandQueueEntry -> !partitions.contains(commandQueueEntry.getKey()));
    }

    public void setRevokedPartitions(Collection<TopicPartition> partitions) {
        Objects.requireNonNull(partitions);
        this.removeCommandQueueEntries(commandQueueEntry -> partitions.contains(commandQueueEntry.getKey()));
    }

    private void removeCommandQueueEntries(Predicate<Map.Entry<TopicPartition, TopicPartitionCommandQueue>> filter) {
        Iterator<Map.Entry<TopicPartition, TopicPartitionCommandQueue>> commandQueuesIterator = this.commandQueues.entrySet().iterator();
        while (commandQueuesIterator.hasNext()) {
            Map.Entry<TopicPartition, TopicPartitionCommandQueue> commandQueueEntry = commandQueuesIterator.next();
            if (!filter.test(commandQueueEntry)) continue;
            if (commandQueueEntry.getValue().isEmpty()) {
                LOG.debug("partition [{}] isn't being handled anymore; command queue is empty", (Object)commandQueueEntry.getKey());
            } else {
                LOG.info("partition [{}] isn't being handled anymore but its command queue isn't empty! [queue size: {}]", (Object)commandQueueEntry.getKey(), (Object)commandQueueEntry.getValue().getSize());
                commandQueueEntry.getValue().markAsUnusedAndClear();
            }
            commandQueuesIterator.remove();
        }
    }

    class TopicPartitionCommandQueue {
        private static final String KEY_COMMAND_SEND_ACTION_SUPPLIER_AND_RESULT_PROMISE = "commandSendActionSupplierAndResultPromise";
        private final Deque<KafkaBasedCommandContext> queue = new LinkedList<KafkaBasedCommandContext>();

        TopicPartitionCommandQueue() {
        }

        public void add(KafkaBasedCommandContext commandContext) {
            Objects.requireNonNull(commandContext);
            this.queue.add(commandContext);
        }

        public boolean remove(KafkaBasedCommandContext commandContext) {
            Objects.requireNonNull(commandContext);
            if (this.queue.remove(commandContext)) {
                this.sendNextCommandInQueueIfPossible();
                return true;
            }
            return false;
        }

        public boolean isEmpty() {
            return this.queue.isEmpty();
        }

        public int getSize() {
            return this.queue.size();
        }

        public void markAsUnusedAndClear() {
            LinkedList<KafkaBasedCommandContext> queueCopy = new LinkedList<KafkaBasedCommandContext>(this.queue);
            this.queue.clear();
            queueCopy.forEach(commandContext -> {
                Pair<Supplier<Future<Void>>, Promise<Void>> actionAppliedPair = this.getSendActionSupplierAndResultPromise((KafkaBasedCommandContext)commandContext);
                if (actionAppliedPair != null) {
                    LOG.info("command won't be sent - its partition isn't being handled anymore [{}]", (Object)commandContext.getCommand());
                    TracingHelper.logError((Span)commandContext.getTracingSpan(), (String)"command won't be sent - its partition isn't being handled anymore");
                    CommandToBeReprocessedException error = new CommandToBeReprocessedException();
                    commandContext.release((Throwable)error);
                    ((Promise)actionAppliedPair.two()).fail((Throwable)error);
                }
            });
        }

        public Future<Void> applySendCommandAction(KafkaBasedCommandContext commandContext, Supplier<Future<Void>> sendActionSupplier) {
            Objects.requireNonNull(commandContext);
            Objects.requireNonNull(sendActionSupplier);
            Promise resultPromise = Promise.promise();
            if (commandContext.equals(this.queue.peek())) {
                this.sendGivenCommandAndNextInQueueIfPossible(this.queue.remove(), sendActionSupplier, (Promise<Void>)resultPromise, true);
            } else if (!this.queue.contains(commandContext)) {
                CommandAlreadyProcessedException error;
                if (commandContext.isCompleted()) {
                    LOG.debug("command won't be sent - already processed and not in queue anymore [{}]", (Object)commandContext.getCommand());
                    error = new CommandAlreadyProcessedException();
                } else {
                    LOG.info("command won't be sent - not in queue [{}]", (Object)commandContext.getCommand());
                    TracingHelper.logError((Span)commandContext.getTracingSpan(), (String)"command won't be sent - not in queue");
                    error = new CommandToBeReprocessedException();
                    commandContext.release((Throwable)error);
                }
                resultPromise.fail((Throwable)error);
            } else {
                LOG.debug("sending of command with offset {} gets delayed; waiting for processing of offset {} [queue size: {}; delayed {}]", new Object[]{this.getRecordOffset(commandContext), this.getRecordOffset(this.queue.peek()), this.queue.size(), commandContext.getCommand()});
                commandContext.getTracingSpan().log(String.format("waiting for an earlier command with offset %d to be processed first [queue size: %d]", this.getRecordOffset(this.queue.peek()), this.queue.size()));
                commandContext.getTracingSpan().setTag("processing_delayed", true);
                commandContext.put(KEY_COMMAND_SEND_ACTION_SUPPLIER_AND_RESULT_PROMISE, (Object)Pair.of(sendActionSupplier, (Object)resultPromise));
            }
            return resultPromise.future();
        }

        private void sendGivenCommandAndNextInQueueIfPossible(KafkaBasedCommandContext commandContext, Supplier<Future<Void>> sendActionSupplier, Promise<Void> sendActionCompletedPromise, boolean completedPromiseJustCreated) {
            LOG.trace("apply send action on [{}]", (Object)commandContext.getCommand());
            Future<Void> sendActionFuture = sendActionSupplier.get();
            sendActionFuture.onComplete(sendActionCompletedPromise);
            if (!this.queue.isEmpty()) {
                if (sendActionFuture.isComplete() && completedPromiseJustCreated) {
                    KafkaCommandProcessingQueue.this.vertxContext.runOnContext(v -> this.sendNextCommandInQueueIfPossible());
                } else {
                    this.sendNextCommandInQueueIfPossible();
                }
            }
        }

        private void sendNextCommandInQueueIfPossible() {
            Optional.ofNullable(this.queue.peek()).map(this::getSendActionSupplierAndResultPromise).ifPresent(pair -> this.sendGivenCommandAndNextInQueueIfPossible(this.queue.remove(), (Supplier)pair.one(), (Promise<Void>)((Promise)pair.two()), false));
        }

        private Pair<Supplier<Future<Void>>, Promise<Void>> getSendActionSupplierAndResultPromise(KafkaBasedCommandContext context) {
            return (Pair)context.get(KEY_COMMAND_SEND_ACTION_SUPPLIER_AND_RESULT_PROMISE);
        }

        private long getRecordOffset(KafkaBasedCommandContext context) {
            if (context == null) {
                return -1L;
            }
            return context.getCommand().getRecord().offset();
        }
    }
}

