/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.util.concurrent;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import org.infinispan.commons.executors.BlockingResource;
import org.infinispan.executors.LimitedExecutor;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

@Scope(value=Scopes.GLOBAL)
public class BlockingManagerImpl
implements BlockingManager {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private final AtomicInteger id = log.isTraceEnabled() ? new AtomicInteger() : null;
    @Inject
    @ComponentName(value="org.infinispan.executors.non-blocking")
    Executor nonBlockingExecutor;
    @Inject
    @ComponentName(value="org.infinispan.executors.blocking")
    Executor blockingExecutor;
    private Scheduler blockingScheduler;
    private Scheduler nonBlockingScheduler;

    @Start
    void start() {
        this.blockingScheduler = Schedulers.from((Executor)new ReentrantBlockingExecutor());
        this.nonBlockingScheduler = Schedulers.from((Executor)this.nonBlockingExecutor);
    }

    private String nextTraceId() {
        return this.id != null ? "-BlockingManagerImpl-" + this.id.getAndIncrement() : null;
    }

    @Override
    public CompletionStage<Void> runBlocking(Runnable runnable, Object traceId) {
        return this.runBlockingOperation(runnable, traceId, this.blockingExecutor);
    }

    @Override
    public <E> CompletionStage<Void> subscribeBlockingConsumer(Publisher<E> publisher, Consumer<E> consumer, Object traceId) {
        Flowable valuePublisher = Flowable.fromPublisher(publisher).observeOn(this.blockingScheduler);
        if (log.isTraceEnabled()) {
            valuePublisher = valuePublisher.doOnNext(value -> log.tracef("Invoking blocking consumer for %s with value %s", traceId, value));
        }
        return this.continueOnNonBlockingThread(valuePublisher.doOnNext(consumer::accept).ignoreElements().toCompletionStage(null), traceId);
    }

    @Override
    public <T, A, R> CompletionStage<R> subscribeBlockingCollector(Publisher<T> publisher, Collector<? super T, A, R> collector, Object traceId) {
        Flowable valuePublisher = Flowable.fromPublisher(publisher).observeOn(this.blockingScheduler);
        if (log.isTraceEnabled()) {
            valuePublisher = valuePublisher.doOnNext(value -> log.tracef("Invoking blocking collector for %s with value %s", traceId, value));
        }
        return this.continueOnNonBlockingThread(Flowable.fromPublisher((Publisher)valuePublisher).collect(collector).toCompletionStage(), traceId);
    }

    private CompletionStage<Void> runBlockingOperation(Runnable runnable, Object traceId, Executor executor) {
        return this.runBlockingOperation(runnable, traceId, executor, true);
    }

    private CompletionStage<Void> runBlockingOperation(Runnable runnable, Object traceId, Executor executor, boolean requireReturnOnNonBlockingThread) {
        CompletableFuture<Void> stage;
        if (this.isCurrentThreadBlocking()) {
            if (log.isTraceEnabled()) {
                log.tracef("Invoked run on a blocking thread, running %s in same blocking thread", traceId);
            }
            try {
                runnable.run();
                return CompletableFutures.completedNull();
            }
            catch (Throwable t) {
                return CompletableFutures.completedExceptionFuture(t);
            }
        }
        if (log.isTraceEnabled()) {
            log.tracef("Submitting blocking run operation %s to blocking thread", traceId);
            stage = CompletableFuture.runAsync(() -> {
                log.tracef("Running blocking run operation %s", traceId);
                runnable.run();
            }, executor);
        } else {
            stage = CompletableFuture.runAsync(runnable, executor);
        }
        return requireReturnOnNonBlockingThread ? this.continueOnNonBlockingThread(stage, traceId) : CompletableFutures.completedNull();
    }

    @Override
    public <V> CompletionStage<V> supplyBlocking(Supplier<V> supplier, Object traceId) {
        return this.supplyBlockingOperation(supplier, traceId, this.blockingExecutor);
    }

    private <V> CompletionStage<V> supplyBlockingOperation(Supplier<V> supplier, Object traceId, Executor executor) {
        CompletableFuture<Object> stage;
        if (this.isCurrentThreadBlocking()) {
            if (log.isTraceEnabled()) {
                log.tracef("Invoked supply on a blocking thread, running %s in same blocking thread", traceId);
            }
            try {
                return CompletableFuture.completedFuture(supplier.get());
            }
            catch (Throwable t) {
                return CompletableFutures.completedExceptionFuture(t);
            }
        }
        if (log.isTraceEnabled()) {
            log.tracef("Submitting blocking supply operation %s to blocking thread", traceId);
            stage = CompletableFuture.supplyAsync(() -> {
                log.tracef("Running blocking supply operation %s", traceId);
                return supplier.get();
            }, executor);
        } else {
            stage = CompletableFuture.supplyAsync(supplier, executor);
        }
        return this.continueOnNonBlockingThread(stage, traceId);
    }

    @Override
    public <I, O> CompletionStage<O> handleBlocking(CompletionStage<? extends I> stage, BiFunction<? super I, Throwable, ? extends O> function, Object traceId) {
        if (this.isCurrentThreadBlocking()) {
            Object value = null;
            Throwable throwable = null;
            try {
                if (log.isTraceEnabled()) {
                    log.tracef("Invoked handle on a blocking thread, joining %s in same blocking thread", traceId);
                }
                value = CompletionStages.join(stage);
            }
            catch (Throwable t) {
                throwable = t;
            }
            return CompletableFuture.completedFuture(function.apply(value, throwable));
        }
        return this.continueOnNonBlockingThread(stage.handleAsync(function, this.blockingExecutor), traceId);
    }

    @Override
    public <I, O> CompletionStage<O> thenApplyBlocking(CompletionStage<? extends I> stage, Function<? super I, ? extends O> function, Object traceId) {
        if (this.isCurrentThreadBlocking()) {
            if (log.isTraceEnabled()) {
                log.tracef("Invoked thenApply on a blocking thread, joining %s in same blocking thread", traceId);
            }
            try {
                I value = CompletionStages.join(stage);
                return CompletableFuture.completedFuture(function.apply(value));
            }
            catch (Throwable t) {
                return CompletableFutures.completedExceptionFuture(t);
            }
        }
        return this.continueOnNonBlockingThread(stage.thenApplyAsync(function, this.blockingExecutor), traceId);
    }

    @Override
    public <I> CompletionStage<Void> thenRunBlocking(CompletionStage<? extends I> stage, Runnable runnable, Object traceId) {
        if (this.isCurrentThreadBlocking()) {
            if (log.isTraceEnabled()) {
                log.tracef("Invoked thenRun on a blocking thread, joining %s in same blocking thread", traceId);
            }
            try {
                return stage.thenRun(runnable);
            }
            catch (Throwable t) {
                return CompletableFutures.completedExceptionFuture(t);
            }
        }
        return this.continueOnNonBlockingThread(stage.thenRunAsync(runnable, this.blockingExecutor), traceId);
    }

    @Override
    public <V> CompletionStage<V> whenCompleteBlocking(CompletionStage<V> stage, BiConsumer<? super V, ? super Throwable> biConsumer, Object traceId) {
        if (this.isCurrentThreadBlocking()) {
            if (log.isTraceEnabled()) {
                log.tracef("Invoked whenComplete on a blocking thread, joining %s in same blocking thread", traceId);
            }
            Object value = null;
            Throwable throwable = null;
            try {
                value = CompletionStages.join(stage);
            }
            catch (Throwable t) {
                throwable = t;
            }
            try {
                biConsumer.accept(value, throwable);
            }
            catch (Throwable t) {
                if (throwable == null) {
                    return CompletableFutures.completedExceptionFuture(t);
                }
                throwable.addSuppressed(t);
                return CompletableFutures.completedExceptionFuture(throwable);
            }
            return stage.whenComplete(biConsumer);
        }
        return this.continueOnNonBlockingThread(stage.whenCompleteAsync(biConsumer, this.blockingExecutor), traceId);
    }

    @Override
    public <V> CompletionStage<V> continueOnNonBlockingThread(CompletionStage<V> delay, Object traceId) {
        if (CompletionStages.isCompletedSuccessfully(delay)) {
            if (log.isTraceEnabled()) {
                log.tracef("Stage for %s was already completed, returning in same thread", traceId);
            }
            return delay;
        }
        return delay.whenCompleteAsync((v, t) -> {
            if (t != null) {
                if (log.isTraceEnabled()) {
                    log.tracef("Continuing execution of id %s with exception %s", traceId, t.getMessage());
                }
            } else if (log.isTraceEnabled()) {
                log.tracef("Continuing execution of id %s", traceId);
            }
        }, this.nonBlockingExecutor);
    }

    @Override
    public <V> Publisher<V> blockingPublisher(Publisher<V> publisher) {
        return Flowable.defer(() -> {
            if (this.isCurrentThreadBlocking()) {
                return publisher;
            }
            return Flowable.fromPublisher((Publisher)publisher).subscribeOn(this.blockingScheduler).observeOn(this.nonBlockingScheduler);
        });
    }

    @Override
    public <V> CompletionStage<Void> blockingPublisherToVoidStage(Publisher<V> publisher, Object traceId) {
        Flowable flowable = Flowable.fromPublisher(publisher);
        if (!this.isCurrentThreadBlocking()) {
            if (log.isTraceEnabled()) {
                flowable = flowable.doOnSubscribe(subscription -> log.tracef("Subscribing to %s on blocking thread", traceId));
            }
            flowable = flowable.subscribeOn(this.blockingScheduler);
            if (log.isTraceEnabled()) {
                flowable = flowable.doOnSubscribe(subscription -> log.tracef("Publisher %s subscribing thread is %s", traceId, Thread.currentThread()));
            }
        } else if (log.isTraceEnabled()) {
            log.tracef("Invoked on a blocking thread, subscribing %s in same blocking thread", traceId);
        }
        CompletionStage stage = flowable.ignoreElements().toCompletionStage(null);
        return this.continueOnNonBlockingThread(stage, traceId);
    }

    @Override
    public BlockingManager.BlockingExecutor limitedBlockingExecutor(String name, int concurrentExecutions) {
        LimitedExecutor limitedExecutor = new LimitedExecutor(name, this.blockingExecutor, concurrentExecutions);
        return new LimitedBlockingExecutor(limitedExecutor);
    }

    protected boolean isCurrentThreadBlocking() {
        return Thread.currentThread().getThreadGroup() instanceof BlockingResource;
    }

    private class LimitedBlockingExecutor
    implements BlockingManager.BlockingExecutor {
        private final LimitedExecutor limitedExecutor;

        private LimitedBlockingExecutor(LimitedExecutor limitedExecutor) {
            this.limitedExecutor = limitedExecutor;
        }

        @Override
        public CompletionStage<Void> execute(Runnable runnable, Object traceId) {
            return BlockingManagerImpl.this.runBlockingOperation(runnable, traceId, this.limitedExecutor);
        }

        @Override
        public <V> CompletionStage<V> supply(Supplier<V> supplier, Object traceId) {
            return BlockingManagerImpl.this.supplyBlockingOperation(supplier, traceId, this.limitedExecutor);
        }
    }

    private class ReentrantBlockingExecutor
    implements Executor {
        private ReentrantBlockingExecutor() {
        }

        @Override
        public void execute(Runnable command) {
            BlockingManagerImpl.this.runBlockingOperation(command, BlockingManagerImpl.this.nextTraceId(), BlockingManagerImpl.this.blockingExecutor, false);
        }
    }
}

