/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.transaction.util;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.error.transaction.AttemptExpiredException;
import com.couchbase.client.core.error.transaction.TransactionOperationFailedException;
import com.couchbase.client.core.transaction.AccessorUtil;
import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

@Stability.Internal
public class ReactiveLock {
    private final CoreTransactionAttemptContext ctx;
    private final ArrayList<Waiter> waiting = new ArrayList();
    private final boolean debugMode;
    @Nullable
    private Waiter lockedBy = null;
    private final boolean debugAsSingleThreaded = Boolean.parseBoolean(System.getProperty("com.couchbase.transactions.lockDebugAsSingleThreaded", "false"));

    public ReactiveLock(CoreTransactionAttemptContext ctx, boolean debugMode) {
        this.debugMode = debugMode;
        this.ctx = Objects.requireNonNull(ctx);
    }

    public Mono<Waiter> lock(String dbg, Duration timeout) {
        return Mono.defer(() -> {
            Waiter waiter = new Waiter(dbg);
            ReactiveLock reactiveLock = this;
            synchronized (reactiveLock) {
                if (this.debugAsSingleThreaded && this.isLocked()) {
                    String msg = "LOCK: Internal bug: [" + dbg + "] needs to lock mutex, which is already locked";
                    this.ctx.logger().info(this.ctx.attemptId(), msg);
                    throw new IllegalStateException(msg);
                }
                if (this.lockedBy == null) {
                    if (this.debugMode) {
                        this.ctx.logger().info(this.ctx.attemptId(), "LOCK: [{}] is locking, {} waiting", waiter.dbg, this.waiting.size());
                    }
                    this.lockedBy = waiter;
                    return Mono.just((Object)waiter);
                }
                if (this.lockedBy == waiter) {
                    String msg = "LOCK: internal bug [" + dbg + "] wants a lock currently held by itself";
                    this.ctx.logger().info(this.ctx.attemptId(), msg);
                    throw new IllegalStateException(msg);
                }
                if (this.debugMode) {
                    this.ctx.logger().info(this.ctx.attemptId(), "LOCK: [{}] will wait for lock currently held by [{}], {} other waiters", dbg, this.lockedBy.dbg, this.waiting.size());
                }
                this.waiting.add(waiter);
            }
            return waiter.notifier.asMono().publishOn(this.ctx.scheduler()).timeout(timeout).publishOn(this.ctx.scheduler()).onErrorResume(err -> {
                if (err instanceof TimeoutException) {
                    String msg = String.format("Attempt expired while [%s] waiting for lock on timeout of %sms, lock currently held by [%s], %d other waiters", dbg, timeout.toMillis(), this.lockedBy == null ? "none" : this.lockedBy.dbg, this.waiting.size());
                    if (this.ctx != null) {
                        this.ctx.logger().info(this.ctx.attemptId(), msg);
                        return Mono.error((Throwable)AccessorUtil.operationFailed(this.ctx, TransactionOperationFailedException.Builder.createError().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED).doNotRollbackAttempt().cause(new AttemptExpiredException(msg, (Throwable)err)).build()));
                    }
                    return Mono.error((Throwable)new AttemptExpiredException("Expired " + dbg, (Throwable)err));
                }
                return Mono.error((Throwable)err);
            }).doFinally(v -> {
                if (v == SignalType.CANCEL) {
                    this.ctx.logger().info(this.ctx.attemptId(), "cancel signal while waiter {} is waiting for lock", dbg);
                    this.unlock(waiter, "onCancel", true).block();
                }
            });
        });
    }

    public Mono<Void> unlock(Waiter waiter) {
        return this.unlock(waiter, null);
    }

    public Mono<Void> unlock(Waiter waiter, @Nullable String extraDbg) {
        return this.unlock(waiter, extraDbg, false);
    }

    public Mono<Void> unlock(Waiter waiter, @Nullable String extraDbg, boolean removeFromWaiters) {
        return Mono.defer(() -> {
            Waiter next = null;
            ReactiveLock reactiveLock = this;
            synchronized (reactiveLock) {
                if (waiter == null) {
                    this.ctx.logger().info(this.ctx.attemptId(), "LOCK: internal bug, waiter is null {}", extraDbg);
                }
                if (this.lockedBy != waiter) {
                    if (removeFromWaiters) {
                        this.waiting.remove(waiter);
                        if (this.debugMode) {
                            this.ctx.logger().info(this.ctx.attemptId(), "LOCK: [{}: {}] is unlocking, but does not have the lock - removing from waiters, leaving {} others", waiter == null ? "-" : waiter.dbg, extraDbg == null ? "-" : extraDbg, this.waiting.size());
                        }
                    } else if (this.debugMode) {
                        this.ctx.logger().info(this.ctx.attemptId(), "LOCK: [{}: {}] is unlocking, but does not have the lock", waiter == null ? "-" : waiter.dbg, extraDbg == null ? "-" : extraDbg);
                    }
                    return Mono.empty();
                }
                if (!this.waiting.isEmpty()) {
                    next = this.waiting.remove(0);
                    if (this.debugMode) {
                        this.ctx.logger().info(this.ctx.attemptId(), "LOCK: [{}: {}] is unlocking, [{}] now has lock, {} left waiting", waiter == null ? "-" : waiter.dbg, extraDbg == null ? "-" : extraDbg, next.dbg, this.waiting.size());
                    }
                    this.lockedBy = next;
                } else {
                    this.lockedBy = null;
                    if (this.debugMode) {
                        this.ctx.logger().info(this.ctx.attemptId(), "LOCK: [{}: {}] is unlocking, nothing waiting", waiter == null ? "-" : waiter.dbg, extraDbg == null ? "-" : extraDbg);
                    }
                }
            }
            if (next != null) {
                next.notifier.tryEmitValue((Object)next).orThrow();
            }
            return Mono.empty();
        });
    }

    public boolean debugAsSingleThreaded() {
        return this.debugAsSingleThreaded;
    }

    public synchronized boolean isLocked() {
        return this.lockedBy != null;
    }

    public static class Waiter {
        private final Sinks.One<Waiter> notifier = Sinks.one();
        public final String dbg;

        public Waiter(String dbg) {
            this.dbg = Objects.requireNonNull(dbg);
        }
    }
}

