/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.columnar.client.java.internal;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import org.jetbrains.annotations.ApiStatus;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@ApiStatus.Internal
public class ReactorHelper {
    private static final Object STREAM_END_SENTINEL = new Object();

    private ReactorHelper() {
        throw new AssertionError((Object)"not instantiable");
    }

    public static CancellationException propagateAsCancellation(InterruptedException e) {
        Thread.currentThread().interrupt();
        CancellationException t = new CancellationException("Thread was interrupted.");
        t.addSuppressed(e);
        return t;
    }

    private static <T> void putUninterruptibly(BlockingQueue<T> queue, T value) {
        boolean wasInterrupted = Thread.interrupted();
        while (true) {
            try {
                queue.put(value);
                if (wasInterrupted) {
                    Thread.currentThread().interrupt();
                }
                return;
            }
            catch (InterruptedException e) {
                wasInterrupted = true;
                continue;
            }
            break;
        }
    }

    public static <T> void forEachBlocking(Flux<T> flux, int buffer, Consumer<T> callback) {
        Semaphore semaphore = new Semaphore(1);
        LinkedBlockingQueue queue = new LinkedBlockingQueue();
        Disposable subscription = flux.buffer(buffer).delayUntil(item -> Mono.fromRunnable(() -> {
            try {
                semaphore.acquire();
                queue.put(item);
            }
            catch (InterruptedException weGotCancelled) {
                semaphore.release();
                Thread.currentThread().interrupt();
            }
        }).subscribeOn(Schedulers.boundedElastic())).doOnCancel(() -> ReactorHelper.putUninterruptibly(queue, new CancellationException())).subscribe(item -> {}, error -> ReactorHelper.putUninterruptibly(queue, error), () -> ReactorHelper.putUninterruptibly(queue, STREAM_END_SENTINEL));
        try {
            Object item2;
            while (true) {
                try {
                    item2 = queue.take();
                    semaphore.release();
                }
                catch (InterruptedException e) {
                    throw ReactorHelper.propagateAsCancellation(e);
                }
                if (!(item2 instanceof List)) break;
                List list = (List)item2;
                list.forEach(callback);
            }
            if (item2 == STREAM_END_SENTINEL) {
                return;
            }
            if (item2 instanceof Throwable) {
                throw Exceptions.propagate((Throwable)((Throwable)item2));
            }
            throw new RuntimeException("Please report this bug in the SDK; Unexpected item in queue: " + String.valueOf(item2));
        }
        finally {
            subscription.dispose();
            semaphore.release(0x3FFFFFFF);
        }
    }
}

