/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.common.semaphore;

import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import org.apache.pulsar.common.semaphore.AsyncSemaphore;
import org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncSemaphoreImpl
implements AsyncSemaphore,
AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(AsyncSemaphoreImpl.class);
    private final AtomicLong availablePermits;
    private final Queue<PendingRequest> queue;
    private final long maxPermits;
    private final long timeoutMillis;
    private final ScheduledExecutorService executor;
    private final boolean shutdownExecutor;
    private final LongConsumer queueLatencyRecorder;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Runnable processQueueRunnable = Runnables.catchingAndLoggingThrowables(this::internalProcessQueue);
    private final ScheduledFuture<?> processQueueScheduledFuture;

    public AsyncSemaphoreImpl(long maxPermits, int maxQueueSize, long timeoutMillis) {
        this(maxPermits, maxQueueSize, timeoutMillis, maxPermits > 0L ? AsyncSemaphoreImpl.createExecutor() : null, maxPermits > 0L, null);
    }

    public AsyncSemaphoreImpl(long maxPermits, int maxQueueSize, long timeoutMillis, ScheduledExecutorService executor, LongConsumer queueLatencyRecorder) {
        this(maxPermits, maxQueueSize, timeoutMillis, executor, false, queueLatencyRecorder);
    }

    AsyncSemaphoreImpl(long maxPermits, int maxQueueSize, long timeoutMillis, ScheduledExecutorService executor, boolean shutdownExecutor, LongConsumer queueLatencyRecorder) {
        this.availablePermits = new AtomicLong(maxPermits);
        this.maxPermits = maxPermits;
        this.queue = maxQueueSize > 0 ? new ArrayBlockingQueue(maxQueueSize) : new LinkedBlockingQueue();
        this.timeoutMillis = timeoutMillis;
        this.executor = executor;
        this.shutdownExecutor = shutdownExecutor;
        this.queueLatencyRecorder = queueLatencyRecorder;
        this.processQueueScheduledFuture = executor != null ? executor.schedule(this.processQueueRunnable, timeoutMillis / 2L, TimeUnit.MILLISECONDS) : null;
    }

    private static ScheduledExecutorService createExecutor() {
        return Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("async-semaphore-executor"));
    }

    @Override
    public CompletableFuture<AsyncSemaphore.AsyncSemaphorePermit> acquire(long permits, BooleanSupplier isCancelled) {
        return this.internalAcquire(permits, null, isCancelled);
    }

    private CompletableFuture<AsyncSemaphore.AsyncSemaphorePermit> internalAcquire(long permits, SemaphorePermit previousPermit, BooleanSupplier isCancelled) {
        this.validatePermits(permits);
        if (this.isUnbounded()) {
            return CompletableFuture.completedFuture(new SemaphorePermit(permits));
        }
        CompletableFuture<AsyncSemaphore.AsyncSemaphorePermit> future = new CompletableFuture<AsyncSemaphore.AsyncSemaphorePermit>();
        if (this.closed.get()) {
            future.completeExceptionally(new AsyncSemaphore.PermitAcquireAlreadyClosedException("Semaphore is closed"));
            return future;
        }
        PendingRequest request = new PendingRequest(permits, previousPermit, future, isCancelled);
        if (!this.queue.offer(request)) {
            future.completeExceptionally(new AsyncSemaphore.PermitAcquireQueueFullException("Semaphore queue is full"));
            return future;
        }
        ScheduledFuture<?> timeoutTask = this.executor.schedule(() -> {
            if (!request.future.isDone() && this.queue.remove(request)) {
                this.recordQueueLatency(Long.MAX_VALUE);
                this.recordQueueLatency(request.getAgeNanos());
                future.completeExceptionally(new AsyncSemaphore.PermitAcquireTimeoutException("Permit acquisition timed out"));
                this.processQueue();
            }
        }, this.timeoutMillis, TimeUnit.MILLISECONDS);
        request.setTimeoutTask(timeoutTask);
        this.processQueue();
        return future;
    }

    private void validatePermits(long permits) {
        if (permits < 0L) {
            throw new IllegalArgumentException("Invalid negative permits value: " + permits);
        }
        if (!this.isUnbounded() && permits > this.maxPermits) {
            throw new IllegalArgumentException("Requested permits=" + permits + " is larger than maxPermits=" + this.maxPermits);
        }
    }

    private boolean isUnbounded() {
        return this.maxPermits <= 0L;
    }

    private void recordQueueLatency(long ageNanos) {
        if (this.queueLatencyRecorder != null) {
            this.queueLatencyRecorder.accept(ageNanos);
        }
    }

    @Override
    public CompletableFuture<AsyncSemaphore.AsyncSemaphorePermit> update(AsyncSemaphore.AsyncSemaphorePermit permit, long newPermits, BooleanSupplier isCancelled) {
        this.validatePermits(newPermits);
        if (this.isUnbounded()) {
            return CompletableFuture.completedFuture(new SemaphorePermit(newPermits));
        }
        long oldPermits = permit.getPermits();
        long additionalPermits = newPermits - oldPermits;
        if (additionalPermits > 0L) {
            return this.internalAcquire(newPermits, this.castToImplementation(permit), isCancelled);
        }
        long leftoverPermits = this.castToImplementation(permit).releasePermits() - newPermits;
        if (leftoverPermits >= 0L) {
            if (leftoverPermits > 0L) {
                this.availablePermits.addAndGet(leftoverPermits);
                this.processQueue();
            }
            return CompletableFuture.completedFuture(new SemaphorePermit(newPermits));
        }
        return this.acquire(newPermits, isCancelled);
    }

    @Override
    public void release(AsyncSemaphore.AsyncSemaphorePermit permit) {
        if (this.isUnbounded()) {
            return;
        }
        long releasedPermits = this.castToImplementation(permit).releasePermits();
        if (releasedPermits > 0L) {
            this.availablePermits.addAndGet(releasedPermits);
            this.processQueue();
        }
    }

    @Override
    public long getAvailablePermits() {
        if (this.isUnbounded()) {
            return Long.MAX_VALUE;
        }
        return this.availablePermits.get();
    }

    @Override
    public long getAcquiredPermits() {
        if (this.isUnbounded()) {
            return 0L;
        }
        return this.maxPermits - this.availablePermits.get();
    }

    @Override
    public int getQueueSize() {
        return this.queue.size();
    }

    private SemaphorePermit castToImplementation(AsyncSemaphore.AsyncSemaphorePermit permit) {
        if (permit instanceof SemaphorePermit) {
            SemaphorePermit semaphorePermit = (SemaphorePermit)permit;
            return semaphorePermit;
        }
        throw new IllegalArgumentException("Invalid permit type");
    }

    private void processQueue() {
        if (this.closed.get()) {
            return;
        }
        this.executor.execute(this.processQueueRunnable);
    }

    private synchronized void internalProcessQueue() {
        PendingRequest request;
        long current;
        while (!this.closed.get() && !this.queue.isEmpty() && (current = this.availablePermits.get()) > 0L && (request = this.queue.peek()) != null) {
            if (request.isCancelled.getAsBoolean()) {
                request.cancelTimeoutTask();
                this.queue.remove(request);
                request.future.completeExceptionally(new AsyncSemaphore.PermitAcquireCancelledException("Permit acquisition was cancelled"));
                continue;
            }
            if (request.future.isDone()) {
                request.cancelTimeoutTask();
                this.queue.remove(request);
                continue;
            }
            if (request.getRequiredPermits() > current) break;
            long requiredPermitsReusingPrevious = request.getRequiredPermitsReusingPrevious();
            this.availablePermits.addAndGet(-requiredPermitsReusingPrevious);
            request.cancelTimeoutTask();
            this.queue.remove(request);
            SemaphorePermit permit = new SemaphorePermit(request.permits);
            this.recordQueueLatency(request.getAgeNanos());
            boolean futureCompleted = request.future.complete(permit);
            if (futureCompleted) continue;
            this.availablePermits.addAndGet(requiredPermitsReusingPrevious);
        }
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            if (this.processQueueScheduledFuture != null) {
                this.processQueueScheduledFuture.cancel(false);
            }
            while (!this.queue.isEmpty()) {
                PendingRequest request = this.queue.poll();
                request.cancelTimeoutTask();
                request.future.completeExceptionally(new AsyncSemaphore.PermitAcquireAlreadyClosedException("Semaphore is closed"));
            }
            if (this.shutdownExecutor) {
                this.executor.shutdownNow();
            }
        }
    }

    private static class SemaphorePermit
    implements AsyncSemaphore.AsyncSemaphorePermit {
        private static final AtomicLongFieldUpdater<SemaphorePermit> PERMITS_UPDATER = AtomicLongFieldUpdater.newUpdater(SemaphorePermit.class, "permits");
        private volatile long permits;

        SemaphorePermit(long permits) {
            this.permits = permits;
        }

        @Override
        public long getPermits() {
            return this.permits;
        }

        public long releasePermits() {
            return PERMITS_UPDATER.getAndSet(this, 0L);
        }

        public String toString() {
            return "SemaphorePermit@" + System.identityHashCode(this) + "[permits=" + this.permits + "]";
        }
    }

    private static class PendingRequest {
        final long permits;
        private final SemaphorePermit previousPermit;
        final CompletableFuture<AsyncSemaphore.AsyncSemaphorePermit> future;
        private final BooleanSupplier isCancelled;
        private volatile ScheduledFuture<?> timeoutTask;
        private final long requestCreatedNanos = System.nanoTime();

        PendingRequest(long permits, SemaphorePermit previousPermit, CompletableFuture<AsyncSemaphore.AsyncSemaphorePermit> future, BooleanSupplier isCancelled) {
            this.permits = permits;
            this.previousPermit = previousPermit;
            this.future = future;
            this.isCancelled = isCancelled;
        }

        void setTimeoutTask(ScheduledFuture<?> timeoutTask) {
            this.timeoutTask = timeoutTask;
        }

        void cancelTimeoutTask() {
            if (this.timeoutTask != null) {
                this.timeoutTask.cancel(false);
                this.timeoutTask = null;
            }
        }

        long getAgeNanos() {
            return System.nanoTime() - this.requestCreatedNanos;
        }

        long getRequiredPermits() {
            return this.previousPermit == null ? this.permits : this.permits - this.previousPermit.getPermits();
        }

        long getRequiredPermitsReusingPrevious() {
            return this.previousPermit == null ? this.permits : this.permits - this.previousPermit.releasePermits();
        }
    }
}

