/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.polling.reactive;

import io.atleon.polling.Pollable;
import io.atleon.polling.Polled;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.NonNull;

public class PollingEventLoop<P, O>
implements Sinks.EmitFailureHandler {
    private static final Logger log = LoggerFactory.getLogger(PollingEventLoop.class);
    private final Scheduler scheduler;
    private final Pollable<P, O> pollable;
    private final Sinks.Many<Collection<Polled<P, O>>> sink;
    private final PollingEvent pollEvent;
    private final AtomicBoolean active;
    private final Duration pollingInterval;

    public PollingEventLoop(Scheduler scheduler, Pollable<P, O> pollable, Duration pollingInterval, Sinks.Many<Collection<Polled<P, O>>> sink) {
        this.scheduler = scheduler;
        this.pollable = pollable;
        this.pollingInterval = pollingInterval;
        this.sink = sink;
        this.active = new AtomicBoolean(true);
        this.pollEvent = new PollingEvent();
    }

    void onRequest(long toAdd) {
        this.active.set(true);
        this.pollEvent.scheduleImmediate();
    }

    public Mono<Void> stop() {
        return Mono.defer(() -> {
            this.pollEvent.stop();
            return Mono.empty();
        }).onErrorResume(e -> Mono.empty());
    }

    public boolean onEmitFailure(@NonNull SignalType signalType, @NonNull Sinks.EmitResult emitResult) {
        if (!this.active.get()) {
            return false;
        }
        return emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED;
    }

    class PollingEvent
    implements Runnable {
        private final AtomicBoolean scheduled = new AtomicBoolean(false);

        PollingEvent() {
        }

        public void stop() {
            PollingEventLoop.this.active.compareAndSet(true, false);
        }

        @Override
        public void run() {
            block5: {
                try {
                    if (PollingEventLoop.this.active.get()) {
                        Collection result = PollingEventLoop.this.pollable.poll();
                        if (PollingEventLoop.this.active.get()) {
                            this.scheduled.set(false);
                            this.schedule();
                        }
                        if (result.iterator().hasNext()) {
                            PollingEventLoop.this.sink.emitNext(result, (Sinks.EmitFailureHandler)PollingEventLoop.this);
                        }
                    }
                }
                catch (Exception e) {
                    if (!PollingEventLoop.this.active.get()) break block5;
                    log.error("Unexpected exception", (Throwable)e);
                    PollingEventLoop.this.sink.emitError((Throwable)e, (Sinks.EmitFailureHandler)PollingEventLoop.this);
                }
            }
        }

        void schedule() {
            this.schedule(PollingEventLoop.this.pollingInterval.toMillis());
        }

        void scheduleImmediate() {
            this.schedule(0L);
        }

        void schedule(long interval) {
            if (!this.scheduled.getAndSet(true)) {
                PollingEventLoop.this.scheduler.schedule((Runnable)this, interval, TimeUnit.MILLISECONDS);
            }
        }
    }
}

