/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.messaging.support.channel;

import java.util.HashMap;
import java.util.Map;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.channel.AbstractSubscribableChannel;
import reactor.core.Reactor;
import reactor.event.Event;
import reactor.event.registry.Registration;
import reactor.event.selector.ObjectSelector;
import reactor.event.selector.Selector;
import reactor.function.Consumer;

public class ReactorSubscribableChannel
extends AbstractSubscribableChannel {
    private final Reactor reactor;
    private final Object key = new Object();
    private final Map<MessageHandler, Registration<?>> registrations = new HashMap();

    public ReactorSubscribableChannel(Reactor reactor) {
        this.reactor = reactor;
    }

    @Override
    protected boolean hasSubscription(MessageHandler handler) {
        return this.registrations.containsKey(handler);
    }

    @Override
    public boolean sendInternal(Message<?> message, long timeout) {
        this.reactor.notify(this.key, Event.wrap(message));
        return true;
    }

    @Override
    public boolean subscribeInternal(MessageHandler handler) {
        Selector selector = ObjectSelector.objectSelector((Object)this.key);
        MessageHandlerConsumer consumer = new MessageHandlerConsumer(handler);
        Registration registration = this.reactor.on(selector, (Consumer)consumer);
        this.registrations.put(handler, registration);
        return true;
    }

    @Override
    public boolean unsubscribeInternal(MessageHandler handler) {
        Registration<?> registration = this.registrations.remove(handler);
        if (registration != null) {
            registration.cancel();
            return true;
        }
        return false;
    }

    private final class MessageHandlerConsumer
    implements Consumer<Event<Message<?>>> {
        private final MessageHandler handler;

        private MessageHandlerConsumer(MessageHandler handler) {
            this.handler = handler;
        }

        public void accept(Event<Message<?>> event) {
            Message message = (Message)event.getData();
            try {
                this.handler.handleMessage(message);
            }
            catch (Throwable t) {
                ReactorSubscribableChannel.this.logger.error((Object)("Failed to process message " + message), t);
            }
        }
    }
}

