/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.mongodb.gridfs;

import com.mongodb.reactivestreams.client.Success;
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

class AsyncInputStreamAdapter
implements AsyncInputStream {
    private static final AtomicLongFieldUpdater<AsyncInputStreamAdapter> DEMAND = AtomicLongFieldUpdater.newUpdater(AsyncInputStreamAdapter.class, "demand");
    private static final AtomicIntegerFieldUpdater<AsyncInputStreamAdapter> SUBSCRIBED = AtomicIntegerFieldUpdater.newUpdater(AsyncInputStreamAdapter.class, "subscribed");
    private static final int SUBSCRIPTION_NOT_SUBSCRIBED = 0;
    private static final int SUBSCRIPTION_SUBSCRIBED = 1;
    private final Publisher<? extends DataBuffer> buffers;
    private final Context subscriberContext;
    private volatile Subscription subscription;
    private volatile boolean cancelled;
    private volatile boolean allDataBuffersReceived;
    private volatile Throwable error;
    private final Queue<ReadRequest> readRequests = (Queue)Queues.small().get();
    private final Queue<DataBuffer> bufferQueue = (Queue)Queues.small().get();
    volatile long demand;
    volatile int subscribed = 0;

    public Publisher<Integer> read(ByteBuffer dst) {
        return Flux.create(sink -> {
            this.readRequests.offer(new ReadRequest((FluxSink<Integer>)sink, dst));
            sink.onCancel(this::terminatePendingReads);
            sink.onDispose(this::terminatePendingReads);
            sink.onRequest(this::request);
        });
    }

    void onError(FluxSink<Integer> sink, Throwable e) {
        this.readRequests.poll();
        sink.error(e);
    }

    void onComplete(FluxSink<Integer> sink, int writtenBytes) {
        this.readRequests.poll();
        DEMAND.decrementAndGet(this);
        sink.next((Object)writtenBytes);
        sink.complete();
    }

    public Publisher<Long> skip(long bytesToSkip) {
        throw new UnsupportedOperationException("Skip is currently not implemented");
    }

    public Publisher<Success> close() {
        return Mono.create(sink -> {
            this.cancelled = true;
            if (this.error != null) {
                this.terminatePendingReads();
                sink.error(this.error);
                return;
            }
            this.terminatePendingReads();
            sink.success((Object)Success.SUCCESS);
        });
    }

    protected void request(long n) {
        if (this.allDataBuffersReceived && this.bufferQueue.isEmpty()) {
            this.terminatePendingReads();
            return;
        }
        Operators.addCap(DEMAND, (Object)this, (long)n);
        if (SUBSCRIBED.get(this) == 0) {
            if (SUBSCRIBED.compareAndSet(this, 0, 1)) {
                this.buffers.subscribe((Subscriber)new DataBufferCoreSubscriber());
            }
        } else {
            Subscription subscription = this.subscription;
            if (subscription != null) {
                this.requestFromSubscription(subscription);
            }
        }
    }

    void requestFromSubscription(Subscription subscription) {
        if (this.cancelled) {
            subscription.cancel();
        }
        this.drainLoop();
    }

    void drainLoop() {
        DataBuffer wip;
        while (DEMAND.get(this) > 0L && (wip = this.bufferQueue.peek()) != null) {
            if (wip.readableByteCount() == 0) {
                this.bufferQueue.poll();
                continue;
            }
            ReadRequest consumer = this.readRequests.peek();
            if (consumer == null) break;
            consumer.transferBytes(wip, wip.readableByteCount());
        }
        if (this.bufferQueue.isEmpty()) {
            if (this.allDataBuffersReceived) {
                this.terminatePendingReads();
                return;
            }
            if (this.demand > 0L) {
                this.subscription.request(1L);
            }
        }
    }

    void terminatePendingReads() {
        ReadRequest readers;
        while ((readers = this.readRequests.poll()) != null) {
            readers.onComplete();
        }
    }

    public AsyncInputStreamAdapter(Publisher<? extends DataBuffer> buffers, Context subscriberContext) {
        this.buffers = buffers;
        this.subscriberContext = subscriberContext;
    }

    class ReadRequest {
        private final FluxSink<Integer> sink;
        private final ByteBuffer dst;
        private int writtenBytes;

        ReadRequest(FluxSink<Integer> sink, ByteBuffer dst) {
            this.sink = sink;
            this.dst = dst;
            this.writtenBytes = -1;
        }

        public void onComplete() {
            if (AsyncInputStreamAdapter.this.error != null) {
                AsyncInputStreamAdapter.this.onError(this.sink, AsyncInputStreamAdapter.this.error);
                return;
            }
            AsyncInputStreamAdapter.this.onComplete(this.sink, this.writtenBytes);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void transferBytes(DataBuffer db, int bytes) {
            try {
                if (AsyncInputStreamAdapter.this.error != null) {
                    AsyncInputStreamAdapter.this.onError(this.sink, AsyncInputStreamAdapter.this.error);
                    return;
                }
                ByteBuffer byteBuffer = db.asByteBuffer();
                int remaining = byteBuffer.remaining();
                int writeCapacity = Math.min(this.dst.remaining(), remaining);
                int limit = Math.min(byteBuffer.position() + writeCapacity, byteBuffer.capacity());
                int toWrite = limit - byteBuffer.position();
                if (toWrite == 0) {
                    AsyncInputStreamAdapter.this.onComplete(this.sink, this.writtenBytes);
                    return;
                }
                int oldPosition = byteBuffer.position();
                byteBuffer.limit(toWrite);
                this.dst.put(byteBuffer);
                byteBuffer.limit(byteBuffer.capacity());
                byteBuffer.position(oldPosition);
                db.readPosition(db.readPosition() + toWrite);
                this.writtenBytes = this.writtenBytes == -1 ? bytes : (this.writtenBytes += bytes);
            }
            catch (Exception e) {
                AsyncInputStreamAdapter.this.onError(this.sink, e);
            }
            finally {
                if (db.readableByteCount() == 0) {
                    DataBufferUtils.release((DataBuffer)db);
                }
            }
        }
    }

    private class DataBufferCoreSubscriber
    implements CoreSubscriber<DataBuffer> {
        private DataBufferCoreSubscriber() {
        }

        public Context currentContext() {
            return AsyncInputStreamAdapter.this.subscriberContext;
        }

        public void onSubscribe(Subscription s) {
            AsyncInputStreamAdapter.this.subscription = s;
            s.request(1L);
        }

        public void onNext(DataBuffer dataBuffer) {
            if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.allDataBuffersReceived) {
                DataBufferUtils.release((DataBuffer)dataBuffer);
                Operators.onNextDropped((Object)dataBuffer, (Context)AsyncInputStreamAdapter.this.subscriberContext);
                return;
            }
            ReadRequest readRequest = (ReadRequest)AsyncInputStreamAdapter.this.readRequests.peek();
            if (readRequest == null) {
                DataBufferUtils.release((DataBuffer)dataBuffer);
                Operators.onNextDropped((Object)dataBuffer, (Context)AsyncInputStreamAdapter.this.subscriberContext);
                AsyncInputStreamAdapter.this.subscription.cancel();
                return;
            }
            AsyncInputStreamAdapter.this.bufferQueue.offer(dataBuffer);
            AsyncInputStreamAdapter.this.drainLoop();
        }

        public void onError(Throwable t) {
            if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.allDataBuffersReceived) {
                Operators.onErrorDropped((Throwable)t, (Context)AsyncInputStreamAdapter.this.subscriberContext);
                return;
            }
            AsyncInputStreamAdapter.this.error = t;
            AsyncInputStreamAdapter.this.allDataBuffersReceived = true;
            AsyncInputStreamAdapter.this.terminatePendingReads();
        }

        public void onComplete() {
            AsyncInputStreamAdapter.this.allDataBuffersReceived = true;
            if (AsyncInputStreamAdapter.this.bufferQueue.isEmpty()) {
                AsyncInputStreamAdapter.this.terminatePendingReads();
            }
        }
    }
}

