/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.core;

import java.util.concurrent.Phaser;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

public final class ReactivePhaser
extends Phaser {
    private final Sinks.Many<Integer> sink = Sinks.many().replay().latest();

    public ReactivePhaser(int parties) {
        super(parties);
    }

    @Override
    public void forceTermination() {
        super.forceTermination();
        this.sink.tryEmitNext((Object)this.getPhase());
    }

    public Mono<Integer> arriveAndAwaitAdvanceReactively() {
        return Mono.fromSupplier(this::arrive).flatMap(arrivalPhase -> this.awaitAdvanceReactively((int)arrivalPhase).thenReturn(arrivalPhase)).cache();
    }

    public Mono<Integer> awaitAdvanceReactively(int phase) {
        if (phase < 0) {
            return Mono.just((Object)phase);
        }
        return this.sink.asFlux().publishOn(Schedulers.parallel()).filter(phaseAdvancedTo -> phaseAdvancedTo < 0 || phaseAdvancedTo > phase).next();
    }

    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        boolean shouldTerminate = super.onAdvance(phase, registeredParties);
        this.sink.tryEmitNext((Object)(shouldTerminate ? Integer.MIN_VALUE + phase : phase + 1 & Integer.MAX_VALUE));
        return shouldTerminate;
    }
}

