/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.shell.jline.tui.component.view.event;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.shell.jline.tui.component.message.StaticShellMessageHeaderAccessor;
import org.springframework.shell.jline.tui.component.view.control.View;
import org.springframework.shell.jline.tui.component.view.control.ViewEvent;
import org.springframework.shell.jline.tui.component.view.event.EventLoop;
import org.springframework.shell.jline.tui.component.view.event.KeyEvent;
import org.springframework.shell.jline.tui.component.view.event.MouseEvent;
import org.springframework.shell.jline.tui.component.view.event.processor.AnimationEventLoopProcessor;
import org.springframework.shell.jline.tui.component.view.event.processor.TaskEventLoopProcessor;
import org.springframework.util.Assert;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class DefaultEventLoop
implements EventLoop {
    private static final Log log = LogFactory.getLog(DefaultEventLoop.class);
    private final Queue<Message<?>> messageQueue = new PriorityQueue(MessageComparator.comparingPriority());
    private final Sinks.Many<Message<?>> many = Sinks.many().unicast().onBackpressureBuffer(this.messageQueue);
    private Flux<Message<?>> sink;
    private final Disposable.Composite disposables = Disposables.composite();
    private final Scheduler scheduler = Schedulers.boundedElastic();
    private volatile boolean active = true;
    private final List<EventLoop.EventLoopProcessor> processors = new ArrayList<EventLoop.EventLoopProcessor>();

    public DefaultEventLoop() {
        this(null);
    }

    public DefaultEventLoop(@Nullable List<EventLoop.EventLoopProcessor> processors) {
        if (processors != null) {
            this.processors.addAll(processors);
        }
        this.processors.add(new AnimationEventLoopProcessor());
        this.processors.add(new TaskEventLoopProcessor());
        this.init();
    }

    private void init() {
        this.sink = this.many.asFlux().flatMap(m -> {
            Flux<? extends Message<?>> pm = null;
            for (EventLoop.EventLoopProcessor processor : this.processors) {
                if (!processor.canProcess((Message<?>)m)) continue;
                pm = processor.process((Message<?>)m);
                break;
            }
            if (pm != null) {
                return pm;
            }
            return Mono.just((Object)m);
        }).share();
    }

    @Override
    public void dispatch(Message<?> message) {
        log.debug((Object)("dispatch " + String.valueOf(message)));
        if (!this.doSend(message, 1000L)) {
            log.warn((Object)("Failed to send message: " + String.valueOf(message)));
        }
    }

    @Override
    public void dispatch(Publisher<? extends Message<?>> messages) {
        this.subscribeTo(messages);
    }

    public Flux<Message<?>> events() {
        return this.sink;
    }

    @Override
    public <T> Flux<T> events(EventLoop.Type type, Class<T> clazz) {
        return this.events().filter(m -> type.equals((Object)StaticShellMessageHeaderAccessor.getEventType(m))).map(m -> m.getPayload()).ofType(clazz);
    }

    @Override
    public <T> Flux<T> events(EventLoop.Type type, ParameterizedTypeReference<T> typeRef) {
        ResolvableType resolvableType = ResolvableType.forType(typeRef);
        Class rawClass = resolvableType.getRawClass();
        Assert.state((rawClass != null ? 1 : 0) != 0, (String)"'rawClass' must not be null");
        return this.events().filter(m -> type.equals((Object)StaticShellMessageHeaderAccessor.getEventType(m))).map(m -> m.getPayload()).ofType(rawClass);
    }

    @Override
    public Flux<KeyEvent> keyEvents() {
        return this.events().filter(m -> EventLoop.Type.KEY.equals((Object)StaticShellMessageHeaderAccessor.getEventType(m))).map(m -> m.getPayload()).ofType(KeyEvent.class);
    }

    @Override
    public Flux<MouseEvent> mouseEvents() {
        return this.events().filter(m -> EventLoop.Type.MOUSE.equals((Object)StaticShellMessageHeaderAccessor.getEventType(m))).map(m -> m.getPayload()).ofType(MouseEvent.class);
    }

    @Override
    public Flux<String> systemEvents() {
        return this.events().filter(m -> EventLoop.Type.SYSTEM.equals((Object)StaticShellMessageHeaderAccessor.getEventType(m))).map(m -> m.getPayload()).ofType(String.class);
    }

    @Override
    public Flux<String> signalEvents() {
        return this.events().filter(m -> EventLoop.Type.SIGNAL.equals((Object)StaticShellMessageHeaderAccessor.getEventType(m))).map(m -> m.getPayload()).ofType(String.class);
    }

    @Override
    public <T extends ViewEvent> Flux<T> viewEvents(Class<T> clazz) {
        return this.events(EventLoop.Type.VIEW, clazz);
    }

    @Override
    public <T extends ViewEvent> Flux<T> viewEvents(ParameterizedTypeReference<T> typeRef) {
        return this.events(EventLoop.Type.VIEW, typeRef);
    }

    @Override
    public <T extends ViewEvent> Flux<T> viewEvents(Class<T> clazz, View filterBy) {
        return this.events(EventLoop.Type.VIEW, clazz).filter(args -> args.view() == filterBy);
    }

    @Override
    public <T extends ViewEvent> Flux<T> viewEvents(ParameterizedTypeReference<T> typeRef, View filterBy) {
        return this.events(EventLoop.Type.VIEW, typeRef).filter(args -> args.view() == filterBy);
    }

    @Override
    public void onDestroy(Disposable disposable) {
        this.disposables.add(disposable);
    }

    private boolean doSend(Message<?> message, long timeout) {
        if (!this.active || this.many.currentSubscriberCount() == 0) {
            return false;
        }
        long remainingTime = 0L;
        if (timeout > 0L) {
            remainingTime = timeout;
        }
        long parkTimeout = 10L;
        long parkTimeoutNs = TimeUnit.MILLISECONDS.toNanos(parkTimeout);
        while (this.active && !this.tryEmitMessage(message)) {
            if (timeout >= 0L && (remainingTime -= parkTimeout) <= 0L) {
                return false;
            }
            LockSupport.parkNanos(parkTimeoutNs);
        }
        return true;
    }

    private boolean tryEmitMessage(Message<?> message) {
        return switch (this.many.tryEmitNext(message)) {
            default -> throw new IncompatibleClassChangeError();
            case Sinks.EmitResult.OK -> true;
            case Sinks.EmitResult.FAIL_NON_SERIALIZED, Sinks.EmitResult.FAIL_OVERFLOW -> false;
            case Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER -> throw new IllegalStateException("The [" + String.valueOf(this) + "] doesn't have subscribers to accept messages");
            case Sinks.EmitResult.FAIL_TERMINATED, Sinks.EmitResult.FAIL_CANCELLED -> throw new IllegalStateException("Cannot emit messages into the cancelled or terminated sink: " + String.valueOf(this.many));
        };
    }

    public void subscribeTo(Publisher<? extends Message<?>> publisher) {
        this.disposables.add(Flux.from(publisher).publishOn(this.scheduler).flatMap(message -> Mono.just((Object)message).handle((messageToHandle, syncSink) -> this.sendReactiveMessage((Message<?>)messageToHandle)).contextWrite(StaticShellMessageHeaderAccessor.getReactorContext(message))).contextCapture().subscribe());
    }

    private void sendReactiveMessage(Message<?> message) {
        Message messageToSend = message;
        if (messageToSend.getHeaders().containsKey((Object)"reactorContext")) {
            messageToSend = MessageBuilder.fromMessage(message).removeHeader("reactorContext").build();
        }
        try {
            this.dispatch(messageToSend);
        }
        catch (Exception ex) {
            log.warn((Object)("Error during processing event: " + String.valueOf(messageToSend)));
        }
    }

    public void destroy() {
        this.active = false;
        this.disposables.dispose();
        this.many.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
        this.scheduler.dispose();
    }

    private static class MessageComparator
    implements Comparator<Message<?>> {
        private MessageComparator() {
        }

        @Override
        public int compare(Message<?> left, Message<?> right) {
            Integer l = StaticShellMessageHeaderAccessor.getPriority(left);
            Integer r = StaticShellMessageHeaderAccessor.getPriority(right);
            if (l != null && r != null) {
                return l.compareTo(r);
            }
            return 0;
        }

        static Comparator<Message<?>> comparingPriority() {
            return new MessageComparator();
        }
    }
}

