/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.rsocket.resume.ResumableDuplexConnection;
import io.rsocket.resume.ResumableFramesStore;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Operators;

public class InMemoryResumableFramesStore
extends Flux<ByteBuf>
implements CoreSubscriber<ByteBuf>,
ResumableFramesStore,
Subscription {
    private static final Logger logger = LoggerFactory.getLogger(InMemoryResumableFramesStore.class);
    final MonoProcessor<Void> disposed = MonoProcessor.create();
    final ArrayList<ByteBuf> cachedFrames;
    final String tag;
    final int cacheLimit;
    volatile long impliedPosition;
    static final AtomicLongFieldUpdater<InMemoryResumableFramesStore> IMPLIED_POSITION = AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "impliedPosition");
    volatile long position;
    static final AtomicLongFieldUpdater<InMemoryResumableFramesStore> POSITION = AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "position");
    volatile int cacheSize;
    static final AtomicIntegerFieldUpdater<InMemoryResumableFramesStore> CACHE_SIZE = AtomicIntegerFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "cacheSize");
    CoreSubscriber<? super Void> saveFramesSubscriber;
    CoreSubscriber<? super ByteBuf> actual;
    volatile int state;
    static final AtomicIntegerFieldUpdater<InMemoryResumableFramesStore> STATE = AtomicIntegerFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "state");

    public InMemoryResumableFramesStore(String tag, int cacheSizeBytes) {
        this.tag = tag;
        this.cacheLimit = cacheSizeBytes;
        this.cachedFrames = new ArrayList();
    }

    @Override
    public Mono<Void> saveFrames(Flux<ByteBuf> frames) {
        return frames.transform(Operators.lift((__, actual) -> {
            this.saveFramesSubscriber = actual;
            return this;
        })).then();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void releaseFrames(long remoteImpliedPos) {
        long pos = this.position;
        logger.debug("{} Removing frames for local: {}, remote implied: {}", new Object[]{this.tag, pos, remoteImpliedPos});
        long toRemoveBytes = Math.max(0L, remoteImpliedPos - pos);
        int removedBytes = 0;
        ArrayList<ByteBuf> frames = this.cachedFrames;
        InMemoryResumableFramesStore inMemoryResumableFramesStore = this;
        synchronized (inMemoryResumableFramesStore) {
            while (toRemoveBytes > (long)removedBytes && frames.size() > 0) {
                ByteBuf cachedFrame = frames.remove(0);
                int frameSize = cachedFrame.readableBytes();
                cachedFrame.release();
                removedBytes += frameSize;
            }
        }
        if (toRemoveBytes > (long)removedBytes) {
            throw new IllegalStateException(String.format("Local and remote state disagreement: need to remove additional %d bytes, but cache is empty", toRemoveBytes));
        }
        if (toRemoveBytes < (long)removedBytes) {
            throw new IllegalStateException("Local and remote state disagreement: local and remote frame sizes are not equal");
        }
        POSITION.addAndGet(this, removedBytes);
        if (this.cacheLimit != Integer.MAX_VALUE) {
            CACHE_SIZE.addAndGet(this, -removedBytes);
            logger.debug("{} Removed frames. Current cache size: {}", (Object)this.tag, (Object)this.cacheSize);
        }
    }

    @Override
    public Flux<ByteBuf> resumeStream() {
        return this;
    }

    @Override
    public long framePosition() {
        return this.position;
    }

    @Override
    public long frameImpliedPosition() {
        return this.impliedPosition & Long.MAX_VALUE;
    }

    @Override
    public boolean resumableFrameReceived(ByteBuf frame) {
        long impliedPosition;
        int frameSize = frame.readableBytes();
        do {
            if ((impliedPosition = this.impliedPosition) >= 0L) continue;
            return false;
        } while (!IMPLIED_POSITION.compareAndSet(this, impliedPosition, impliedPosition + (long)frameSize));
        return true;
    }

    void pauseImplied() {
        long impliedPosition;
        while (!IMPLIED_POSITION.compareAndSet(this, impliedPosition = this.impliedPosition, impliedPosition | Long.MIN_VALUE)) {
        }
        logger.debug("Tag {}. Paused at position[{}]", (Object)this.tag, (Object)impliedPosition);
    }

    void resumeImplied() {
        long restoredImpliedPosition;
        long impliedPosition;
        while (!IMPLIED_POSITION.compareAndSet(this, impliedPosition = this.impliedPosition, restoredImpliedPosition = impliedPosition & Long.MAX_VALUE)) {
        }
        logger.debug("Tag {}. Resumed at position[{}]", (Object)this.tag, (Object)restoredImpliedPosition);
    }

    @Override
    public Mono<Void> onClose() {
        return this.disposed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dispose() {
        if (STATE.getAndSet(this, 2) != 2) {
            this.cacheSize = 0;
            InMemoryResumableFramesStore inMemoryResumableFramesStore = this;
            synchronized (inMemoryResumableFramesStore) {
                for (ByteBuf frame : this.cachedFrames) {
                    if (frame == null) continue;
                    frame.release();
                }
                this.cachedFrames.clear();
            }
            this.disposed.onComplete();
        }
    }

    public boolean isDisposed() {
        return this.disposed.isTerminated();
    }

    public void onSubscribe(Subscription s) {
        this.saveFramesSubscriber.onSubscribe(Operators.emptySubscription());
        s.request(Long.MAX_VALUE);
    }

    public void onError(Throwable t) {
        this.saveFramesSubscriber.onError(t);
    }

    public void onComplete() {
        this.saveFramesSubscriber.onComplete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onNext(ByteBuf frame) {
        boolean isResumable = ResumableDuplexConnection.isResumableFrame(frame);
        if (isResumable) {
            long availableSize;
            ArrayList<ByteBuf> frames = this.cachedFrames;
            int incomingFrameSize = frame.readableBytes();
            int cacheLimit = this.cacheLimit;
            if (cacheLimit != Integer.MAX_VALUE && (availableSize = (long)(cacheLimit - this.cacheSize)) < (long)incomingFrameSize) {
                int removedBytes = 0;
                InMemoryResumableFramesStore inMemoryResumableFramesStore = this;
                synchronized (inMemoryResumableFramesStore) {
                    while (availableSize < (long)incomingFrameSize && frames.size() != 0) {
                        ByteBuf cachedFrame = frames.remove(0);
                        int frameSize = cachedFrame.readableBytes();
                        availableSize += (long)frameSize;
                        removedBytes += frameSize;
                        cachedFrame.release();
                    }
                }
                CACHE_SIZE.addAndGet(this, -removedBytes);
                POSITION.addAndGet(this, removedBytes);
            }
            InMemoryResumableFramesStore inMemoryResumableFramesStore = this;
            synchronized (inMemoryResumableFramesStore) {
                frames.add(frame);
            }
            if (cacheLimit != Integer.MAX_VALUE) {
                CACHE_SIZE.addAndGet(this, incomingFrameSize);
            }
        }
        int state = this.state;
        CoreSubscriber<? super ByteBuf> actual = this.actual;
        if (state == 1) {
            actual.onNext((Object)frame.retain());
        } else if (!isResumable) {
            frame.release();
        }
    }

    public void request(long n) {
    }

    public void cancel() {
        this.pauseImplied();
        this.state = 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
        int state = this.state;
        logger.debug("Tag: {}. Subscribed State[{}]", (Object)this.tag, (Object)state);
        actual.onSubscribe((Subscription)this);
        if (state != 2) {
            InMemoryResumableFramesStore inMemoryResumableFramesStore = this;
            synchronized (inMemoryResumableFramesStore) {
                for (ByteBuf frame : this.cachedFrames) {
                    actual.onNext((Object)frame.retain());
                }
            }
            this.actual = actual;
            this.resumeImplied();
            STATE.compareAndSet(this, 0, 1);
        }
    }
}

