package org.camunda.bpm.extension.reactor.projectreactor;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.camunda.bpm.extension.reactor.projectreactor.Event;
import org.camunda.bpm.extension.reactor.projectreactor.dispatch.SynchronousDispatcher;
import org.camunda.bpm.extension.reactor.projectreactor.filter.PassThroughFilter;
import org.camunda.bpm.extension.reactor.projectreactor.publisher.BusPublisher;
import org.camunda.bpm.extension.reactor.projectreactor.registry.Registration;
import org.camunda.bpm.extension.reactor.projectreactor.registry.Registries;
import org.camunda.bpm.extension.reactor.projectreactor.registry.Registry;
import org.camunda.bpm.extension.reactor.projectreactor.routing.ConsumerFilteringRouter;
import org.camunda.bpm.extension.reactor.projectreactor.routing.Router;
import org.camunda.bpm.extension.reactor.projectreactor.selector.ClassSelector;
import org.camunda.bpm.extension.reactor.projectreactor.selector.Selector;
import org.camunda.bpm.extension.reactor.projectreactor.selector.Selectors;
import org.camunda.bpm.extension.reactor.projectreactor.spec.EventBusSpec;
import org.camunda.bpm.extension.reactor.projectreactor.support.UUIDUtils;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/camunda/bpm/extension/reactor/projectreactor/EventBus.class */
public class EventBus implements Bus<Event<?>>, Consumer<Event<?>> {
    private static final Router DEFAULT_EVENT_ROUTER = new ConsumerFilteringRouter(new PassThroughFilter());
    private final Logger log;
    private final Dispatcher dispatcher;
    private final Registry<Object, Consumer<? extends Event<?>>> consumerRegistry;
    private final Router router;
    private final Consumer<Throwable> dispatchErrorHandler;
    private final Consumer<Throwable> uncaughtErrorHandler;
    private volatile UUID id;

    /* loaded from: input_file:org/camunda/bpm/extension/reactor/projectreactor/EventBus$ReplyToConsumer.class */
    public class ReplyToConsumer<E extends Event<?>, V> implements Consumer<E> {
        private final Function<E, V> fn;

        private ReplyToConsumer(Function<E, V> function) {
            this.fn = function;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v22, types: [org.camunda.bpm.extension.reactor.projectreactor.Bus] */
        @Override // java.util.function.Consumer
        public void accept(E e) {
            Event wrap;
            ?? replyToObservable;
            EventBus eventBus = EventBus.this;
            if (ReplyToEvent.class.isAssignableFrom(e.getClass()) && 0 != (replyToObservable = ((ReplyToEvent) e).getReplyToObservable())) {
                eventBus = replyToObservable;
            }
            try {
                V apply = this.fn.apply(e);
                if (null == apply) {
                    wrap = new Event(Void.class);
                } else {
                    wrap = Event.class.isAssignableFrom(apply.getClass()) ? (Event) apply : Event.wrap(apply);
                }
                eventBus.notify(e.getReplyTo(), (Object) wrap);
            } catch (Throwable th) {
                eventBus.notify(th.getClass(), (Class<?>) Event.wrap(th));
            }
        }

        public Function<E, V> getDelegate() {
            return this.fn;
        }
    }

    /* loaded from: input_file:org/camunda/bpm/extension/reactor/projectreactor/EventBus$ReplyToEvent.class */
    public static class ReplyToEvent<T> extends Event<T> {
        private static final long serialVersionUID = 1937884784799135647L;
        private final Bus replyToObservable;

        private ReplyToEvent(Event.Headers headers, T t, Object obj, Bus bus, Consumer<Throwable> consumer) {
            super(headers, t, consumer);
            setReplyTo(obj);
            this.replyToObservable = bus;
        }

        private ReplyToEvent(Event<T> event, Bus bus) {
            this(event.getHeaders(), event.getData(), event.getReplyTo(), bus, event.getErrorConsumer());
        }

        @Override // org.camunda.bpm.extension.reactor.projectreactor.Event
        public <X> Event<X> copy(X x) {
            return new ReplyToEvent(getHeaders(), x, getReplyTo(), this.replyToObservable, getErrorConsumer());
        }

        public Bus getReplyToObservable() {
            return this.replyToObservable;
        }
    }

    public static EventBusSpec config() {
        return new EventBusSpec();
    }

    public static EventBus create() {
        return new EventBus(SynchronousDispatcher.INSTANCE);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static EventBus create(Environment environment) {
        return ((EventBusSpec) ((EventBusSpec) new EventBusSpec().env(environment)).dispatcher(environment.getDefaultDispatcher())).get();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static EventBus create(Dispatcher dispatcher) {
        return ((EventBusSpec) new EventBusSpec().dispatcher(dispatcher)).get();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static EventBus create(Environment environment, String str) {
        return ((EventBusSpec) ((EventBusSpec) new EventBusSpec().env(environment)).dispatcher(str)).get();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static EventBus create(Environment environment, Dispatcher dispatcher) {
        return ((EventBusSpec) ((EventBusSpec) new EventBusSpec().env(environment)).dispatcher(dispatcher)).get();
    }

    public EventBus(@Nullable Dispatcher dispatcher) {
        this(dispatcher, null);
    }

    public EventBus(@Nullable Dispatcher dispatcher, @Nullable Router router) {
        this(dispatcher, router, null, null);
    }

    public EventBus(@Nullable Dispatcher dispatcher, @Nullable Router router, @Nullable Consumer<Throwable> consumer, @Nullable Consumer<Throwable> consumer2) {
        this(Registries.create(), dispatcher, router, consumer, consumer2);
    }

    public EventBus(@Nonnull Registry<Object, Consumer<? extends Event<?>>> registry, @Nullable Dispatcher dispatcher, @Nullable Router router, @Nullable Consumer<Throwable> consumer, @Nullable Consumer<Throwable> consumer2) {
        this.log = LoggerFactory.getLogger(EventBus.class);
        Objects.requireNonNull(registry, "Consumer Registry cannot be null.");
        this.consumerRegistry = registry;
        this.dispatcher = null == dispatcher ? SynchronousDispatcher.INSTANCE : dispatcher;
        this.router = null == router ? DEFAULT_EVENT_ROUTER : router;
        if (null == consumer) {
            this.dispatchErrorHandler = th -> {
                Class<?> cls = th.getClass();
                this.router.route(cls, Event.wrap(th), this.consumerRegistry.select(cls), null, null);
            };
        } else {
            this.dispatchErrorHandler = consumer;
        }
        this.uncaughtErrorHandler = consumer2;
        on(new ClassSelector(Throwable.class), event -> {
            if (consumer2 == null) {
                this.log.error(((Throwable) event.getData()).getMessage(), (Throwable) event.getData());
            } else {
                consumer2.accept(event.getData());
            }
        });
    }

    public synchronized UUID getId() {
        if (null == this.id) {
            this.id = UUIDUtils.create();
        }
        return this.id;
    }

    public Registry<Object, Consumer<? extends Event<?>>> getConsumerRegistry() {
        return this.consumerRegistry;
    }

    public Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    public Router getRouter() {
        return this.router;
    }

    public Consumer<Throwable> getDispatchErrorHandler() {
        return this.dispatchErrorHandler;
    }

    public Consumer<Throwable> getUncaughtErrorHandler() {
        return this.uncaughtErrorHandler;
    }

    @Override // org.camunda.bpm.extension.reactor.projectreactor.Bus
    public boolean respondsToKey(Object obj) {
        List<Registration<Object, ? extends Consumer<? extends Event<?>>>> select = this.consumerRegistry.select(obj);
        return !select.isEmpty() && select.stream().anyMatch(registration -> {
            return !registration.isCancelled();
        });
    }

    @Override // org.camunda.bpm.extension.reactor.projectreactor.Bus
    public <T extends Event<?>> Registration<Object, Consumer<? extends Event<?>>> on(Selector selector, Consumer<T> consumer) {
        Objects.requireNonNull(selector, "Selector cannot be null.");
        Objects.requireNonNull(consumer, "Consumer cannot be null.");
        Class<?> extractGeneric = extractGeneric(consumer);
        return this.consumerRegistry.register(selector, event -> {
            if (null != selector.getHeaderResolver()) {
                event.getHeaders().setAll(selector.getHeaderResolver().resolve(event.getKey()));
            }
            if (extractGeneric == null || event.getData() == null || extractGeneric.isAssignableFrom(event.getData().getClass())) {
                consumer.accept(event);
            }
        });
    }

    private Class<?> extractGeneric(Consumer<? extends Event<?>> consumer) {
        if (consumer.getClass().getGenericInterfaces().length == 0) {
            return null;
        }
        Type type = consumer.getClass().getGenericInterfaces()[0];
        if (!ParameterizedType.class.isAssignableFrom(type.getClass())) {
            return null;
        }
        ParameterizedType parameterizedType = (ParameterizedType) type;
        if (parameterizedType.getActualTypeArguments().length == 0) {
            return null;
        }
        Type type2 = parameterizedType.getActualTypeArguments()[0];
        if (!ParameterizedType.class.isAssignableFrom(type2.getClass())) {
            return null;
        }
        ParameterizedType parameterizedType2 = (ParameterizedType) type2;
        if (parameterizedType2.getActualTypeArguments().length == 0) {
            return null;
        }
        Type type3 = parameterizedType2.getActualTypeArguments()[0];
        if (type3 instanceof ParameterizedType) {
            return (Class) ((ParameterizedType) type3).getRawType();
        }
        if (type3 instanceof Class) {
            return (Class) type3;
        }
        return null;
    }

    public Publisher<? extends Event<?>> on(Selector selector) {
        return new BusPublisher(this, selector);
    }

    @Override // org.camunda.bpm.extension.reactor.projectreactor.Bus
    public EventBus notify(Object obj, Event<?> event) {
        Objects.requireNonNull(obj, "Key cannot be null.");
        Objects.requireNonNull(event, "Event cannot be null.");
        event.setKey(obj);
        this.dispatcher.dispatch(event, this, this.dispatchErrorHandler);
        return this;
    }

    public final EventBus notify(@Nonnull Publisher<?> publisher, @Nonnull Object obj) {
        return notify((Publisher) publisher, obj2 -> {
            return obj;
        });
    }

    public final <T> EventBus notify(@Nonnull Publisher<? extends T> publisher, @Nonnull final Function<? super T, ?> function) {
        publisher.subscribe(new Subscriber<T>() { // from class: org.camunda.bpm.extension.reactor.projectreactor.EventBus.1
            Subscription s;

            public void onSubscribe(Subscription subscription) {
                this.s = subscription;
                subscription.request(Long.MAX_VALUE);
            }

            public void onNext(T t) {
                EventBus.this.notify(function.apply(t), Event.wrap(t));
            }

            public void onError(Throwable th) {
                if (this.s != null) {
                    this.s.cancel();
                }
            }

            public void onComplete() {
                if (this.s != null) {
                    this.s.cancel();
                }
            }
        });
        return this;
    }

    public <T extends Event<?>, V> Registration<?, Consumer<? extends Event<?>>> receive(Selector selector, Function<T, V> function) {
        return on(selector, new ReplyToConsumer(function));
    }

    public EventBus notify(Object obj, Supplier<? extends Event<?>> supplier) {
        return notify(obj, supplier.get());
    }

    public EventBus notify(Object obj) {
        return notify(obj, new Event<>(Void.class));
    }

    public EventBus send(Object obj, Event<?> event) {
        return notify(obj, (Event<?>) new ReplyToEvent(event, this));
    }

    public EventBus send(Object obj, Supplier<? extends Event<?>> supplier) {
        return notify(obj, (Event<?>) new ReplyToEvent(supplier.get(), this));
    }

    public EventBus send(Object obj, Event<?> event, Bus bus) {
        return notify(obj, (Event<?>) new ReplyToEvent(event, bus));
    }

    public EventBus send(Object obj, Supplier<? extends Event<?>> supplier, Bus bus) {
        return notify(obj, (Event<?>) new ReplyToEvent(supplier.get(), bus));
    }

    public <T extends Event<?>> EventBus sendAndReceive(Object obj, Event<?> event, Consumer<T> consumer) {
        Selector anonymous = Selectors.anonymous();
        on(anonymous, consumer).cancelAfterUse();
        notify(obj, event.setReplyTo(anonymous.getObject()));
        return this;
    }

    public <T extends Event<?>> EventBus sendAndReceive(Object obj, Supplier<? extends Event<?>> supplier, Consumer<T> consumer) {
        return sendAndReceive(obj, supplier.get(), consumer);
    }

    public <T> Consumer<Event<T>> prepare(final Object obj) {
        return new Consumer<Event<T>>() { // from class: org.camunda.bpm.extension.reactor.projectreactor.EventBus.2
            final List<Registration<Object, ? extends Consumer<? extends Event<?>>>> regs;
            final int size;

            {
                this.regs = EventBus.this.consumerRegistry.select(obj);
                this.size = this.regs.size();
            }

            @Override // java.util.function.Consumer
            public void accept(Event<T> event) {
                for (int i = 0; i < this.size; i++) {
                    Registration<Object, ? extends Consumer<? extends Event<?>>> registration = this.regs.get(i);
                    event.setKey(obj);
                    EventBus.this.dispatcher.dispatch(event, registration.getObject(), EventBus.this.dispatchErrorHandler);
                }
            }
        };
    }

    public <T> void schedule(Consumer<T> consumer, T t) {
        this.dispatcher.dispatch(null, event -> {
            consumer.accept(t);
        }, this.dispatchErrorHandler);
    }

    @Override // java.util.function.Consumer
    public void accept(Event<?> event) {
        this.router.route(event.getKey(), event, this.consumerRegistry.select(event.getKey()), null, this.dispatchErrorHandler);
    }
}
