/*
 * Decompiled with CFR 0.152.
 */
package com.linecorp.centraldogma.server.internal.api;

import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.common.util.TimeoutMode;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.centraldogma.common.Entry;
import com.linecorp.centraldogma.common.Query;
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.internal.api.v1.WatchTimeout;
import com.linecorp.centraldogma.server.internal.storage.RequestAlreadyTimedOutException;
import com.linecorp.centraldogma.server.storage.repository.Repository;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class WatchService {
    private static final Logger logger = LoggerFactory.getLogger(WatchService.class);
    private static final CancellationException CANCELLATION_EXCEPTION = (CancellationException)Exceptions.clearTrace((Throwable)new CancellationException("watch timed out"));
    private static final double JITTER_RATE = 0.2;
    private final Set<CompletableFuture<?>> pendingFutures = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Counter wakeupCounter;
    private final Counter timeoutCounter;
    private final Counter failureCounter;

    public WatchService(MeterRegistry meterRegistry) {
        Objects.requireNonNull(meterRegistry, "meterRegistry");
        Gauge.builder((String)"watches.active", (Object)this, self -> self.pendingFutures.size()).register(meterRegistry);
        this.wakeupCounter = Counter.builder((String)"watches.processed").tag("result", "wakeup").register(meterRegistry);
        this.timeoutCounter = Counter.builder((String)"watches.processed").tag("result", "timeout").register(meterRegistry);
        this.failureCounter = Counter.builder((String)"watches.processed").tag("result", "failure").register(meterRegistry);
    }

    public CompletableFuture<Revision> watchRepository(Repository repo, Revision lastKnownRevision, String pathPattern, long timeoutMillis) {
        ServiceRequestContext ctx = (ServiceRequestContext)RequestContext.current();
        WatchService.updateRequestTimeout(ctx, timeoutMillis);
        CompletableFuture<Revision> result = repo.watch(lastKnownRevision, pathPattern);
        if (result.isDone()) {
            return result;
        }
        this.scheduleTimeout(ctx, result, timeoutMillis);
        return result;
    }

    private static void updateRequestTimeout(ServiceRequestContext ctx, long timeoutMillis) {
        long adjustmentMillis = WatchTimeout.availableTimeout((long)timeoutMillis, (long)ctx.requestTimeoutMillis());
        ctx.setRequestTimeoutMillis(TimeoutMode.EXTEND, adjustmentMillis);
    }

    public <T> CompletableFuture<Entry<T>> watchFile(Repository repo, Revision lastKnownRevision, Query<T> query, long timeoutMillis) {
        ServiceRequestContext ctx = (ServiceRequestContext)RequestContext.current();
        WatchService.updateRequestTimeout(ctx, timeoutMillis);
        CompletableFuture<Entry<T>> result = repo.watch(lastKnownRevision, query);
        if (result.isDone()) {
            return result;
        }
        this.scheduleTimeout(ctx, result, timeoutMillis);
        return result;
    }

    private <T> void scheduleTimeout(ServiceRequestContext ctx, CompletableFuture<T> result, long timeoutMillis) {
        ScheduledFuture timeoutFuture;
        long watchTimeoutMillis;
        this.pendingFutures.add(result);
        if (timeoutMillis > 0L) {
            watchTimeoutMillis = WatchService.applyJitter(WatchTimeout.availableTimeout((long)timeoutMillis));
            timeoutFuture = ctx.eventLoop().schedule(() -> result.completeExceptionally(CANCELLATION_EXCEPTION), watchTimeoutMillis, TimeUnit.MILLISECONDS);
        } else {
            watchTimeoutMillis = 0L;
            timeoutFuture = null;
        }
        result.whenComplete((revision, cause) -> {
            if (timeoutFuture != null) {
                if (timeoutFuture.cancel(true)) {
                    this.wakeupCounter.increment();
                    if (cause instanceof RequestAlreadyTimedOutException) {
                        logger.warn("Request has timed out before watch timeout: watchTimeoutMillis={}, log={}", (Object)watchTimeoutMillis, (Object)ctx.log());
                    }
                } else {
                    this.timeoutCounter.increment();
                }
            } else if (cause == null) {
                this.wakeupCounter.increment();
            } else {
                this.failureCounter.increment();
            }
            this.pendingFutures.remove(result);
        });
    }

    private static long applyJitter(long timeoutMillis) {
        double rate = ThreadLocalRandom.current().nextDouble(0.8, 1.001);
        if (rate < 1.0) {
            return (long)((double)timeoutMillis * rate);
        }
        return timeoutMillis;
    }
}

