/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;

class UpstreamFramesSubscriber
implements Subscriber<ByteBuf>,
Disposable {
    private static final Logger logger = LoggerFactory.getLogger(UpstreamFramesSubscriber.class);
    private final AtomicBoolean disposed = new AtomicBoolean();
    private final Consumer<ByteBuf> itemConsumer;
    private final Disposable downstreamRequestDisposable;
    private final Disposable resumeSaveStreamDisposable;
    private volatile Subscription subs;
    private volatile boolean resumeStarted;
    private final Queue<ByteBuf> framesCache;
    private long request;
    private long downStreamRequestN;
    private long resumeSaveStreamRequestN;

    UpstreamFramesSubscriber(int estimatedDownstreamRequest, Flux<Long> downstreamRequests, Flux<Long> resumeSaveStreamRequests, Consumer<ByteBuf> itemConsumer) {
        this.itemConsumer = itemConsumer;
        this.framesCache = (Queue)Queues.unbounded((int)estimatedDownstreamRequest).get();
        this.downstreamRequestDisposable = downstreamRequests.subscribe(requestN -> this.requestN(0L, (long)requestN));
        this.resumeSaveStreamDisposable = resumeSaveStreamRequests.subscribe(requestN -> this.requestN((long)requestN, 0L));
    }

    public void onSubscribe(Subscription s) {
        this.subs = s;
        if (!this.isDisposed()) {
            this.doRequest();
        } else {
            s.cancel();
        }
    }

    public void onNext(ByteBuf item) {
        this.processFrame(item);
    }

    public void onError(Throwable t) {
        this.dispose();
    }

    public void onComplete() {
        this.dispose();
    }

    public void resumeStart() {
        this.resumeStarted = true;
    }

    public void resumeComplete() {
        ByteBuf frame = this.framesCache.poll();
        while (frame != null) {
            this.itemConsumer.accept(frame);
            frame = this.framesCache.poll();
        }
        this.resumeStarted = false;
        this.doRequest();
    }

    public void dispose() {
        if (this.disposed.compareAndSet(false, true)) {
            this.releaseCache();
            if (this.subs != null) {
                this.subs.cancel();
            }
            this.resumeSaveStreamDisposable.dispose();
            this.downstreamRequestDisposable.dispose();
        }
    }

    public boolean isDisposed() {
        return this.disposed.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void requestN(long resumeStreamRequest, long downStreamRequest) {
        UpstreamFramesSubscriber upstreamFramesSubscriber = this;
        synchronized (upstreamFramesSubscriber) {
            this.downStreamRequestN = Operators.addCap((long)this.downStreamRequestN, (long)downStreamRequest);
            this.resumeSaveStreamRequestN = Operators.addCap((long)this.resumeSaveStreamRequestN, (long)resumeStreamRequest);
            long requests = Math.min(this.downStreamRequestN, this.resumeSaveStreamRequestN);
            if (requests > 0L) {
                this.downStreamRequestN -= requests;
                this.resumeSaveStreamRequestN -= requests;
                logger.debug("Upstream subscriber requestN: {}", (Object)requests);
                this.request = Operators.addCap((long)this.request, (long)requests);
            }
        }
        this.doRequest();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doRequest() {
        if (this.subs != null && !this.resumeStarted) {
            UpstreamFramesSubscriber upstreamFramesSubscriber = this;
            synchronized (upstreamFramesSubscriber) {
                long r = this.request;
                if (r > 0L) {
                    this.subs.request(r);
                    this.request = 0L;
                }
            }
        }
    }

    private void releaseCache() {
        ByteBuf frame = this.framesCache.poll();
        while (frame != null && frame.refCnt() > 0) {
            frame.release();
        }
    }

    private void processFrame(ByteBuf item) {
        if (this.resumeStarted) {
            this.framesCache.offer(item);
        } else {
            this.itemConsumer.accept(item);
        }
    }
}

