/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.commandrouter.impl;

import io.opentracing.Span;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.eclipse.hono.client.command.CommandAlreadyProcessedException;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandToBeReprocessedException;
import org.eclipse.hono.commandrouter.impl.CommandProcessingQueue;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractCommandProcessingQueue<T extends CommandContext, K>
implements CommandProcessingQueue<T> {
    private final Logger LOG = LoggerFactory.getLogger(this.getClass());
    private final Map<K, CommandQueue> commandQueues = new HashMap<K, CommandQueue>();
    private final Vertx vertx;

    public AbstractCommandProcessingQueue(Vertx vertx) {
        this.vertx = Objects.requireNonNull(vertx);
    }

    @Override
    public final void add(T commandContext) {
        Objects.requireNonNull(commandContext);
        K key = this.getQueueKey(commandContext);
        CommandQueue commandQueue = this.commandQueues.computeIfAbsent(key, k -> new CommandQueue(key));
        commandQueue.add(commandContext);
    }

    protected abstract K getQueueKey(T var1);

    protected abstract String getCommandSourceForLog(K var1);

    @Override
    public final boolean remove(T commandContext) {
        Objects.requireNonNull(commandContext);
        K key = this.getQueueKey(commandContext);
        return Optional.ofNullable(this.commandQueues.get(key)).map(commandQueue -> commandQueue.remove(commandContext)).orElse(false);
    }

    @Override
    public final Future<Void> applySendCommandAction(T commandContext, Supplier<Future<Void>> sendActionSupplier) {
        K key = this.getQueueKey(commandContext);
        CommandQueue commandQueue = this.commandQueues.get(key);
        if (commandQueue == null) {
            String commandSourceForLog = this.getCommandSourceForLog(key);
            this.LOG.info("command won't be sent - commands from {} aren't handled by this consumer anymore [{}]", (Object)commandSourceForLog, (Object)commandContext.getCommand());
            TracingHelper.logError((Span)commandContext.getTracingSpan(), (String)"command won't be sent - commands from %s aren't handled by this consumer anymore".formatted(commandSourceForLog));
            CommandToBeReprocessedException error = new CommandToBeReprocessedException();
            commandContext.release((Throwable)error);
            return Future.failedFuture((Throwable)error);
        }
        return commandQueue.applySendCommandAction(commandContext, sendActionSupplier);
    }

    @Override
    public final void clear() {
        this.removeCommandQueueEntries(commandQueueEntry -> true);
    }

    protected final void removeCommandQueueEntries(Predicate<K> filter) {
        Objects.requireNonNull(filter);
        Iterator<Map.Entry<K, CommandQueue>> commandQueuesIterator = this.commandQueues.entrySet().iterator();
        while (commandQueuesIterator.hasNext()) {
            Map.Entry<K, CommandQueue> commandQueueEntry = commandQueuesIterator.next();
            if (!filter.test(commandQueueEntry.getKey())) continue;
            if (commandQueueEntry.getValue().isEmpty()) {
                this.LOG.debug("commands from {} aren't handled here anymore; command queue is empty", (Object)this.getCommandSourceForLog(commandQueueEntry.getKey()));
            } else {
                this.LOG.info("commands from {} aren't handled here anymore but the corresponding command queue isn't empty! [queue size: {}]", (Object)this.getCommandSourceForLog(commandQueueEntry.getKey()), (Object)commandQueueEntry.getValue().getSize());
                commandQueueEntry.getValue().markAsUnusedAndClear();
            }
            commandQueuesIterator.remove();
        }
    }

    public final class CommandQueue {
        private static final String KEY_COMMAND_SEND_ACTION_SUPPLIER_AND_RESULT_PROMISE = "commandSendActionSupplierAndResultPromise";
        private final Deque<T> queue = new ArrayDeque();
        private final K queueKey;

        CommandQueue(K queueKey) {
            this.queueKey = queueKey;
        }

        public void add(T commandContext) {
            Objects.requireNonNull(commandContext);
            this.queue.addLast(commandContext);
        }

        public boolean remove(T commandContext) {
            Objects.requireNonNull(commandContext);
            if (this.queue.remove(commandContext)) {
                this.sendNextCommandInQueueIfPossible();
                return true;
            }
            return false;
        }

        public boolean isEmpty() {
            return this.queue.isEmpty();
        }

        public int getSize() {
            return this.queue.size();
        }

        public void markAsUnusedAndClear() {
            ArrayList<CommandContext> queueCopy = new ArrayList<CommandContext>(this.queue);
            this.queue.clear();
            queueCopy.forEach(commandContext -> {
                Pair<Supplier<Future<Void>>, Promise<Void>> actionAppliedPair = this.getSendActionSupplierAndResultPromise(commandContext);
                if (actionAppliedPair != null) {
                    String commandSourceForLog = AbstractCommandProcessingQueue.this.getCommandSourceForLog(this.queueKey);
                    AbstractCommandProcessingQueue.this.LOG.info("command won't be sent - commands from {} aren't handled by this consumer anymore [{}]", (Object)commandSourceForLog, (Object)commandContext.getCommand());
                    TracingHelper.logError((Span)commandContext.getTracingSpan(), (String)"command won't be sent - commands from %s aren't handled by this consumer anymore".formatted(commandSourceForLog));
                    CommandToBeReprocessedException error = new CommandToBeReprocessedException();
                    commandContext.release((Throwable)error);
                    ((Promise)actionAppliedPair.two()).fail((Throwable)error);
                }
            });
        }

        public Future<Void> applySendCommandAction(T commandContext, Supplier<Future<Void>> sendActionSupplier) {
            Objects.requireNonNull(commandContext);
            Objects.requireNonNull(sendActionSupplier);
            Promise resultPromise = Promise.promise();
            if (commandContext.equals(this.queue.peek())) {
                this.sendGivenCommandAndNextInQueueIfPossible((CommandContext)this.queue.remove(), sendActionSupplier, (Promise<Void>)resultPromise, true);
            } else if (!this.queue.contains(commandContext)) {
                CommandAlreadyProcessedException error;
                if (commandContext.isCompleted()) {
                    AbstractCommandProcessingQueue.this.LOG.debug("command won't be sent - already processed and not in queue anymore [{}]", (Object)commandContext.getCommand());
                    error = new CommandAlreadyProcessedException();
                } else {
                    AbstractCommandProcessingQueue.this.LOG.info("command won't be sent - not in queue [{}]", (Object)commandContext.getCommand());
                    TracingHelper.logError((Span)commandContext.getTracingSpan(), (String)"command won't be sent - not in queue");
                    error = new CommandToBeReprocessedException();
                    commandContext.release((Throwable)error);
                }
                resultPromise.fail((Throwable)error);
            } else {
                CommandContext next = (CommandContext)this.queue.peek();
                AbstractCommandProcessingQueue.this.LOG.debug("sending of command gets delayed; waiting for processing of previous command [queue size: {}; delayed {}; waiting for {}]", new Object[]{this.queue.size(), commandContext.getCommand(), next.getCommand()});
                commandContext.getTracingSpan().log("waiting for an earlier command to be processed first [queue size: %d; waiting for %s]".formatted(this.queue.size(), next.getCommand()));
                commandContext.getTracingSpan().setTag("processing_delayed", true);
                commandContext.put(KEY_COMMAND_SEND_ACTION_SUPPLIER_AND_RESULT_PROMISE, (Object)Pair.of(sendActionSupplier, (Object)resultPromise));
            }
            return resultPromise.future();
        }

        private void sendGivenCommandAndNextInQueueIfPossible(T commandContext, Supplier<Future<Void>> sendActionSupplier, Promise<Void> sendActionCompletedPromise, boolean completedPromiseJustCreated) {
            AbstractCommandProcessingQueue.this.LOG.trace("apply send action on [{}]", (Object)commandContext.getCommand());
            Future<Void> sendActionFuture = sendActionSupplier.get();
            sendActionFuture.onComplete(sendActionCompletedPromise);
            if (!this.queue.isEmpty()) {
                if (sendActionFuture.isComplete() && completedPromiseJustCreated) {
                    AbstractCommandProcessingQueue.this.vertx.getOrCreateContext().runOnContext(v -> this.sendNextCommandInQueueIfPossible());
                } else {
                    this.sendNextCommandInQueueIfPossible();
                }
            }
        }

        private void sendNextCommandInQueueIfPossible() {
            Optional.ofNullable((CommandContext)this.queue.peek()).map(this::getSendActionSupplierAndResultPromise).ifPresent(pair -> this.sendGivenCommandAndNextInQueueIfPossible((CommandContext)this.queue.remove(), (Supplier)pair.one(), (Promise<Void>)((Promise)pair.two()), false));
        }

        private Pair<Supplier<Future<Void>>, Promise<Void>> getSendActionSupplierAndResultPromise(T context) {
            return (Pair)context.get(KEY_COMMAND_SEND_ACTION_SUPPLIER_AND_RESULT_PROMISE);
        }
    }
}

