/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.util.internal;

import io.netty.util.concurrent.ScheduledFuture;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import ratpack.exec.Downstream;
import ratpack.exec.Promise;
import ratpack.exec.Upstream;
import ratpack.exec.internal.Continuation;
import ratpack.exec.internal.DefaultExecution;
import ratpack.exec.util.ReadWriteAccess;

public class DefaultReadWriteAccess
implements ReadWriteAccess {
    private static final AtomicIntegerFieldUpdater<DefaultReadWriteAccess> DRAINING_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultReadWriteAccess.class, "draining");
    private static final AtomicIntegerFieldUpdater<DefaultReadWriteAccess> ACTIVE_READERS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultReadWriteAccess.class, "activeReaders");
    private static final AtomicReferenceFieldUpdater<DefaultReadWriteAccess, Access> PENDING_WRITE_REF_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultReadWriteAccess.class, Access.class, "pendingWriteRef");
    private static final AtomicReferenceFieldUpdater<DefaultReadWriteAccess, Queue> QUEUE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultReadWriteAccess.class, Queue.class, "queue");
    private volatile Queue<Access<?>> queue;
    private volatile int draining;
    private volatile int activeReaders;
    private final Duration defaultTimeout;
    private volatile Access<?> pendingWriteRef;

    public DefaultReadWriteAccess(Duration defaultTimeout) {
        if (defaultTimeout.isNegative()) {
            throw new IllegalArgumentException("defaultTimeout must not be negative");
        }
        this.defaultTimeout = defaultTimeout;
    }

    @Override
    public Duration getDefaultTimeout() {
        return this.defaultTimeout;
    }

    @Override
    public <T> Promise<T> read(Promise<T> promise) {
        return promise.transform(up -> down -> new Access(true, (Upstream)up, this.defaultTimeout, down));
    }

    @Override
    public <T> Promise<T> read(Promise<T> promise, Duration timeout) {
        return promise.transform(up -> down -> new Access(true, (Upstream)up, timeout, down));
    }

    @Override
    public <T> Promise<T> write(Promise<T> promise) {
        return promise.transform(up -> down -> new Access(false, (Upstream)up, this.defaultTimeout, down));
    }

    @Override
    public <T> Promise<T> write(Promise<T> promise, Duration timeout) {
        return promise.transform(up -> down -> new Access(false, (Upstream)up, timeout, down));
    }

    private boolean casQueue(Queue<Access<?>> expected, Queue<Access<?>> queue) {
        return QUEUE_UPDATER.compareAndSet(this, expected, queue);
    }

    private void drain() {
        if (this.startDraining()) {
            Queue<Access<?>> queue = this.getQueue();
            if (queue != null) {
                if (this.drainQueue(queue)) {
                    return;
                }
                this.resetQueue();
                if (!queue.isEmpty() && this.drainQueue(queue)) {
                    return;
                }
            }
            this.notDraining();
            if (this.getQueue() != null) {
                this.drain();
            }
        }
    }

    private void resetQueue() {
        QUEUE_UPDATER.set(this, null);
    }

    private boolean drainQueue(Queue<Access<?>> queue) {
        Access access = queue.poll();
        while (access != null) {
            if (access.read) {
                access.access();
                access = queue.poll();
                continue;
            }
            if (this.activeReaders() == 0.0) {
                access.access();
            } else {
                PENDING_WRITE_REF_UPDATER.set(this, access);
                if (this.activeReaders() == 0.0 && (access = (Access)PENDING_WRITE_REF_UPDATER.getAndSet(this, null)) != null) {
                    access.access();
                }
            }
            return true;
        }
        return false;
    }

    private void addToQueue(Access<?> access) {
        Queue<Access<?>> queue = this.getQueue();
        if (queue == null) {
            queue = new ConcurrentLinkedQueue();
            queue.add(access);
            if (!this.casQueue(null, queue)) {
                this.addToQueue(access);
            }
        } else {
            queue.add(access);
            if (!this.casQueue(queue, queue)) {
                this.addToQueue(access);
            }
        }
    }

    private Queue<Access<?>> getQueue() {
        return QUEUE_UPDATER.get(this);
    }

    private void notDraining() {
        DRAINING_UPDATER.set(this, 0);
    }

    private int decActiveReaders() {
        return ACTIVE_READERS_UPDATER.decrementAndGet(this);
    }

    private void incActiveReaders() {
        ACTIVE_READERS_UPDATER.incrementAndGet(this);
    }

    private double activeReaders() {
        return ACTIVE_READERS_UPDATER.get(this);
    }

    private boolean startDraining() {
        return DRAINING_UPDATER.compareAndSet(this, 0, 1);
    }

    private class Access<T> {
        private final boolean read;
        private final Upstream<? extends T> upstream;
        private final Duration timeout;
        private final Downstream<? super T> downstream;
        private final DefaultExecution execution;
        private boolean fired;
        private Continuation continuation;
        private ScheduledFuture<?> timeoutFuture;

        private Access(boolean read, Upstream<? extends T> upstream, Duration timeout, Downstream<? super T> downstream) {
            if (timeout.isNegative()) {
                throw new IllegalArgumentException("Timeout value must not be negative");
            }
            this.read = read;
            this.upstream = upstream;
            this.timeout = timeout;
            this.downstream = downstream;
            this.execution = DefaultExecution.get();
            this.execution.delimit(e -> {
                this.relinquish();
                if (this.fire()) {
                    downstream.error((Throwable)e);
                }
            }, continuation -> {
                if (!timeout.isZero()) {
                    this.timeoutFuture = this.execution.getEventLoop().schedule(this::timeout, timeout.toMillis(), TimeUnit.MILLISECONDS);
                }
                this.continuation = continuation;
                DefaultReadWriteAccess.this.addToQueue(this);
                DefaultReadWriteAccess.this.drain();
            });
        }

        private boolean fire() {
            if (this.fired) {
                return false;
            }
            this.fired = true;
            return true;
        }

        private void timeout() {
            if (this.fire()) {
                this.continuation.resume(() -> {
                    DefaultReadWriteAccess.this.drain();
                    this.downstream.error(new ReadWriteAccess.TimeoutException("Could not acquire " + (this.read ? "read" : "write") + " access within " + this.timeout));
                });
            }
        }

        private void access() {
            if (this.read) {
                DefaultReadWriteAccess.this.incActiveReaders();
            }
            if (this.fire()) {
                if (this.timeoutFuture != null) {
                    this.timeoutFuture.cancel(false);
                }
                this.continuation.resume(() -> this.upstream.connect(new Downstream<T>(){

                    @Override
                    public void success(T value) {
                        Access.this.relinquish();
                        Access.this.downstream.success(value);
                    }

                    @Override
                    public void error(Throwable throwable) {
                        Access.this.relinquish();
                        Access.this.downstream.error(throwable);
                    }

                    @Override
                    public void complete() {
                        Access.this.relinquish();
                        Access.this.downstream.complete();
                    }
                }));
            } else {
                this.relinquish();
            }
        }

        private void relinquish() {
            if (this.read) {
                Access pendingWrite;
                if (DefaultReadWriteAccess.this.decActiveReaders() == 0 && (pendingWrite = (Access)PENDING_WRITE_REF_UPDATER.getAndSet(DefaultReadWriteAccess.this, null)) != null) {
                    pendingWrite.access();
                    return;
                }
            } else {
                DefaultReadWriteAccess.this.notDraining();
            }
            DefaultReadWriteAccess.this.drain();
        }
    }
}

