/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus.impl.clustered;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.impl.MessageImpl;
import io.vertx.core.eventbus.impl.OutboundDeliveryContext;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;

public class Serializer {
    private final ContextInternal context;
    private final Map<String, SerializerQueue> queues;

    private Serializer(ContextInternal context) {
        this.context = context;
        this.queues = new HashMap<String, SerializerQueue>();
        if (context.isDeployment()) {
            context.addCloseHook(this::close);
        }
    }

    public static Serializer get(ContextInternal context) {
        ConcurrentMap<Object, Object> contextData = context.contextData();
        Serializer serializer = (Serializer)contextData.get(Serializer.class);
        if (serializer == null) {
            Serializer candidate = new Serializer(context);
            Serializer previous = (Serializer)contextData.putIfAbsent(Serializer.class, candidate);
            serializer = previous == null ? candidate : previous;
        }
        return serializer;
    }

    public <T> void queue(OutboundDeliveryContext<?> sendContext, BiConsumer<Message<?>, Promise<T>> selectHandler, BiConsumer<OutboundDeliveryContext<?>, T> successHandler, BiConsumer<OutboundDeliveryContext<?>, Throwable> failureHandler) {
        ContextInternal ctx = (ContextInternal)Vertx.currentContext();
        if (ctx != this.context) {
            this.context.runOnContext(v -> this.queue(sendContext, selectHandler, successHandler, failureHandler));
            return;
        }
        MessageImpl message = sendContext.message;
        String address = message.address();
        PromiseInternal promise = sendContext.ctx.promise();
        promise.future().onComplete(ar -> {
            if (ar.succeeded()) {
                successHandler.accept(sendContext, ar.result());
            } else {
                failureHandler.accept(sendContext, ar.cause());
            }
        });
        SerializerQueue queue = this.queues.computeIfAbsent(address, x$0 -> new SerializerQueue((String)x$0));
        queue.add(new SerializedTask(sendContext, selectHandler, promise));
    }

    private void close(Promise<Void> promise) {
        this.queues.forEach((address, queue) -> queue.close());
        promise.complete();
    }

    private class SerializedTask<U>
    implements Handler<AsyncResult<U>> {
        final OutboundDeliveryContext<?> sendContext;
        final BiConsumer<Message<?>, Promise<U>> selectHandler;
        final Promise<U> promise;
        final Promise<U> internalPromise;
        Promise<Void> completion;

        SerializedTask(OutboundDeliveryContext<?> sendContext, BiConsumer<Message<?>, Promise<U>> selectHandler, Promise<U> promise) {
            this.sendContext = sendContext;
            this.selectHandler = selectHandler;
            this.promise = promise;
            this.internalPromise = Serializer.this.context.promise();
            this.internalPromise.future().onComplete(this);
        }

        void process(Promise<Void> completion) {
            this.completion = completion;
            this.selectHandler.accept(this.sendContext.message, this.internalPromise);
        }

        @Override
        public void handle(AsyncResult<U> ar) {
            if (ar.succeeded()) {
                this.promise.tryComplete(ar.result());
            } else {
                this.promise.tryFail(ar.cause());
            }
            this.completion.complete();
        }
    }

    private class SerializerQueue {
        final Queue<SerializedTask<?>> tasks;
        final String address;
        boolean closed;

        SerializerQueue(String address) {
            this.address = address;
            this.tasks = new LinkedList();
        }

        void add(SerializedTask<?> serializedTask) {
            this.tasks.add(serializedTask);
            if (this.tasks.size() == 1) {
                this.process(serializedTask);
            }
        }

        void process(SerializedTask<?> serializedTask) {
            PromiseInternal<Void> completion = Serializer.this.context.promise();
            serializedTask.process(completion);
            completion.future().onComplete(v -> this.processed());
        }

        void processed() {
            if (!this.closed) {
                this.tasks.remove();
                SerializedTask<?> next = this.tasks.peek();
                if (next != null) {
                    this.process(next);
                } else {
                    Serializer.this.queues.remove(this.address);
                }
            }
        }

        void close() {
            this.closed = true;
            while (!this.tasks.isEmpty()) {
                this.tasks.remove().promise.tryFail("Context is closing");
            }
        }
    }
}

