/*
 * Decompiled with CFR 0.152.
 */
package ai.vespa.feed.client;

import ai.vespa.feed.client.ApacheCluster;
import ai.vespa.feed.client.BenchmarkingCluster;
import ai.vespa.feed.client.Cluster;
import ai.vespa.feed.client.DocumentId;
import ai.vespa.feed.client.FeedClient;
import ai.vespa.feed.client.FeedClientBuilder;
import ai.vespa.feed.client.HttpRequest;
import ai.vespa.feed.client.HttpResponse;
import ai.vespa.feed.client.OperationStats;
import ai.vespa.feed.client.RequestStrategy;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

class HttpRequestStrategy
implements RequestStrategy {
    private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName());
    private final Cluster cluster;
    private final Map<DocumentId, CompletableFuture<?>> inflightById = new ConcurrentHashMap();
    private final FeedClient.RetryStrategy strategy;
    private final FeedClient.CircuitBreaker breaker;
    private final Queue<Runnable> queue = new ConcurrentLinkedQueue<Runnable>();
    private final long maxInflight;
    private final long minInflight;
    private final AtomicLong targetInflightX10;
    private final AtomicLong inflight = new AtomicLong(0L);
    private final AtomicBoolean destroyed = new AtomicBoolean(false);
    private final AtomicLong delayedCount = new AtomicLong(0L);
    private final AtomicLong retries = new AtomicLong(0L);
    private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(runnable -> {
        Thread thread = new Thread(runnable, "feed-client-result-executor");
        thread.setDaemon(true);
        return thread;
    });

    HttpRequestStrategy(FeedClientBuilder builder) throws IOException {
        this(builder, new BenchmarkingCluster(new ApacheCluster(builder)));
    }

    HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) {
        this.cluster = cluster;
        this.strategy = builder.retryStrategy;
        this.breaker = builder.circuitBreaker;
        this.maxInflight = (long)builder.connectionsPerEndpoint * (long)builder.maxStreamsPerConnection;
        this.minInflight = (long)builder.connectionsPerEndpoint * (long)Math.min(16, builder.maxStreamsPerConnection);
        this.targetInflightX10 = new AtomicLong(10L * (long)(Math.sqrt(this.minInflight) * Math.sqrt(this.maxInflight)));
        Thread dispatcher = new Thread(this::dispatch, "feed-client-dispatcher");
        dispatcher.setDaemon(true);
        dispatcher.start();
    }

    @Override
    public OperationStats stats() {
        return this.cluster.stats();
    }

    private void dispatch() {
        try {
            while (this.breaker.state() != FeedClient.CircuitBreaker.State.OPEN && !this.destroyed.get()) {
                while (!this.isInExcess() && this.poll() && this.breaker.state() == FeedClient.CircuitBreaker.State.CLOSED) {
                }
                Thread.sleep(this.breaker.state() == FeedClient.CircuitBreaker.State.HALF_OPEN ? 1000L : 10L);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.log(Level.WARNING, "Dispatch thread interrupted; shutting down");
        }
        this.destroy();
    }

    private void offer(Runnable task) {
        this.delayedCount.incrementAndGet();
        this.queue.offer(task);
    }

    private boolean poll() {
        Runnable task = this.queue.poll();
        if (task == null) {
            return false;
        }
        this.delayedCount.decrementAndGet();
        task.run();
        return true;
    }

    private boolean isInExcess() {
        return this.inflight.get() - this.delayedCount.get() > this.targetInflight();
    }

    private boolean retry(HttpRequest request, int attempt) {
        if (attempt > this.strategy.retries()) {
            return false;
        }
        switch (request.method().toUpperCase()) {
            case "POST": {
                return this.strategy.retry(FeedClient.OperationType.PUT);
            }
            case "PUT": {
                return this.strategy.retry(FeedClient.OperationType.UPDATE);
            }
            case "DELETE": {
                return this.strategy.retry(FeedClient.OperationType.REMOVE);
            }
        }
        throw new IllegalStateException("Unexpected HTTP method: " + request.method());
    }

    private boolean retry(HttpRequest request, Throwable thrown, int attempt) {
        this.breaker.failure();
        log.log(Level.FINE, thrown, () -> "Failed attempt " + attempt + " at " + request);
        if (!(thrown instanceof IOException)) {
            return false;
        }
        return this.retry(request, attempt);
    }

    private void incrementTargetInflight() {
        this.targetInflightX10.incrementAndGet();
    }

    private void decreaseTargetInflight() {
        this.targetInflightX10.set(Math.max((this.inflight.get() - this.delayedCount.get()) * 9L, this.minInflight * 10L));
    }

    private long targetInflight() {
        return Math.min(this.targetInflightX10.get() / 10L, this.maxInflight);
    }

    private boolean retry(HttpRequest request, HttpResponse response, int attempt) {
        if (response.code() / 100 == 2) {
            this.breaker.success();
            this.incrementTargetInflight();
            return false;
        }
        log.log(Level.FINE, () -> "Status code " + response.code() + " (" + new String(response.body(), StandardCharsets.UTF_8) + ") on attempt " + attempt + " at " + request);
        if (response.code() == 429 || response.code() == 503) {
            this.decreaseTargetInflight();
            return true;
        }
        this.breaker.failure();
        if (response.code() == 500 || response.code() == 502 || response.code() == 504) {
            return this.retry(request, attempt);
        }
        return false;
    }

    private void acquireSlot() {
        try {
            while (this.inflight.get() >= this.targetInflight()) {
                Thread.sleep(1L);
            }
            this.inflight.incrementAndGet();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void releaseSlot() {
        this.inflight.decrementAndGet();
    }

    @Override
    public boolean hasFailed() {
        return this.breaker.state() == FeedClient.CircuitBreaker.State.OPEN;
    }

    @Override
    public void await() {
        try {
            while (this.inflight.get() > 0L) {
                Thread.sleep(10L);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) {
        CompletableFuture<HttpResponse> result = new CompletableFuture<HttpResponse>();
        CompletableFuture<HttpResponse> vessel = new CompletableFuture<HttpResponse>();
        CompletableFuture previous = this.inflightById.put(documentId, result);
        if (this.destroyed.get()) {
            result.cancel(true);
            return result;
        }
        if (previous == null) {
            this.acquireSlot();
            this.offer(() -> this.cluster.dispatch(request, vessel));
        } else {
            previous.whenComplete((__, ___) -> this.offer(() -> this.cluster.dispatch(request, vessel)));
        }
        this.handleAttempt(vessel, request, result, 1);
        result.whenComplete((__, ___) -> {
            if (this.inflightById.compute(documentId, (____, current) -> current == result ? null : current) == null) {
                this.releaseSlot();
            }
        });
        return result;
    }

    private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, CompletableFuture<HttpResponse> result, int attempt) {
        vessel.whenCompleteAsync((response, thrown) -> {
            if (thrown != null ? this.retry(request, (Throwable)thrown, attempt) : this.retry(request, (HttpResponse)response, attempt)) {
                this.retries.incrementAndGet();
                FeedClient.CircuitBreaker.State state = this.breaker.state();
                CompletableFuture<HttpResponse> retry = new CompletableFuture<HttpResponse>();
                this.offer(() -> this.cluster.dispatch(request, retry));
                this.handleAttempt(retry, request, result, attempt + (state == FeedClient.CircuitBreaker.State.HALF_OPEN ? 0 : 1));
            } else if (thrown == null) {
                result.complete((HttpResponse)response);
            } else {
                result.completeExceptionally((Throwable)thrown);
            }
        }, (Executor)this.resultExecutor);
    }

    @Override
    public void destroy() {
        if (!this.destroyed.getAndSet(true)) {
            this.inflightById.values().forEach(result -> result.cancel(true));
        }
        this.cluster.close();
        this.resultExecutor.shutdown();
    }
}

