/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.diagnostics;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.ArrayNode;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.ObjectNode;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.DefaultFullHttpRequest;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.HttpMethod;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.HttpVersion;
import com.couchbase.client.core.diagnostics.ClusterState;
import com.couchbase.client.core.diagnostics.DiagnosticsResult;
import com.couchbase.client.core.diagnostics.EndpointDiagnostics;
import com.couchbase.client.core.diagnostics.HealthPinger;
import com.couchbase.client.core.diagnostics.PingResult;
import com.couchbase.client.core.diagnostics.WaitUntilReadyContext;
import com.couchbase.client.core.error.UnambiguousTimeoutException;
import com.couchbase.client.core.error.context.CancellationErrorContext;
import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.msg.manager.GenericManagerRequest;
import com.couchbase.client.core.service.ServiceType;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

@Stability.Internal
public class WaitUntilReadyHelper {
    @Stability.Internal
    public static CompletableFuture<Void> waitUntilReady(Core core, Set<ServiceType> serviceTypes, Duration timeout, ClusterState desiredState, Optional<String> bucketName) {
        boolean hasChance;
        boolean bl = hasChance = core.clusterConfig().hasClusterOrBucketConfig() || core.configurationProvider().globalConfigLoadInProgress() || core.configurationProvider().bucketConfigLoadInProgress();
        if (!hasChance) {
            CompletableFuture<Void> f = new CompletableFuture<Void>();
            f.completeExceptionally(new IllegalStateException("Against pre 6.5 clusters at least a bucket needs to be opened!"));
            return f;
        }
        return Flux.interval((Duration)Duration.ofMillis(10L), (Scheduler)core.context().environment().scheduler()).filter(i -> !(core.configurationProvider().bucketConfigLoadInProgress() || core.configurationProvider().globalConfigLoadInProgress() || bucketName.isPresent() && core.configurationProvider().collectionMapRefreshInProgress() || bucketName.isPresent() && core.clusterConfig().bucketConfig((String)bucketName.get()) == null)).filter(i -> {
            if (bucketName.isPresent()) {
                BucketConfig bucketConfig = core.clusterConfig().bucketConfig((String)bucketName.get());
                long extNodes = bucketConfig.portInfos().stream().filter(p -> p.ports().containsKey((Object)ServiceType.KV)).count();
                long visibleNodes = bucketConfig.nodes().stream().filter(n -> n.services().containsKey((Object)ServiceType.KV)).count();
                return extNodes > 0L && extNodes == visibleNodes;
            }
            return true;
        }).flatMap(i -> {
            if (bucketName.isPresent()) {
                GenericManagerRequest request = new GenericManagerRequest(core.context(), () -> new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/pools/default/buckets/" + (String)bucketName.get()), true, null);
                core.send(request);
                return Reactor.wrap(request, request.response(), true).filter(response -> {
                    if (response.status() != ResponseStatus.SUCCESS) {
                        return false;
                    }
                    ObjectNode root = (ObjectNode)Mapper.decodeIntoTree(response.content());
                    ArrayNode nodes = (ArrayNode)root.get("nodes");
                    long healthy = StreamSupport.stream(nodes.spliterator(), false).filter(node -> node.get("status").asText().equals("healthy")).count();
                    return (long)nodes.size() == healthy;
                }).map(ignored -> i);
            }
            return Flux.just((Object)i);
        }).take(1L).flatMap(aLong -> {
            Flux diagnostics = Flux.interval((Duration)Duration.ofMillis(10L), (Scheduler)core.context().environment().scheduler()).map(i -> WaitUntilReadyHelper.diagnosticsCurrentState(core)).takeUntil(s -> s == desiredState);
            return Flux.concat((Publisher[])new Publisher[]{WaitUntilReadyHelper.ping(core, WaitUntilReadyHelper.servicesToCheck(core, serviceTypes, bucketName), timeout, bucketName), diagnostics});
        }).then().timeout(timeout, Mono.defer(() -> {
            WaitUntilReadyContext waitUntilReadyContext = new WaitUntilReadyContext(WaitUntilReadyHelper.servicesToCheck(core, serviceTypes, bucketName), timeout, desiredState, bucketName, core.diagnostics().collect(Collectors.groupingBy(EndpointDiagnostics::type)));
            CancellationErrorContext errorContext = new CancellationErrorContext(waitUntilReadyContext);
            return Mono.error((Throwable)new UnambiguousTimeoutException("WaitUntilReady timed out", errorContext));
        }), core.context().environment().scheduler()).toFuture();
    }

    private static Set<ServiceType> servicesToCheck(Core core, Set<ServiceType> serviceTypes, Optional<String> bucketName) {
        return serviceTypes != null && !serviceTypes.isEmpty() ? serviceTypes : HealthPinger.extractPingTargets(core.clusterConfig(), bucketName).stream().map(HealthPinger.PingTarget::serviceType).collect(Collectors.toSet());
    }

    private static ClusterState diagnosticsCurrentState(Core core) {
        return DiagnosticsResult.aggregateClusterState(core.diagnostics().collect(Collectors.groupingBy(EndpointDiagnostics::type)).values());
    }

    private static Flux<PingResult> ping(Core core, Set<ServiceType> serviceTypes, Duration timeout, Optional<String> bucketName) {
        return HealthPinger.ping(core, Optional.of(timeout), core.context().environment().retryStrategy(), serviceTypes, Optional.empty(), bucketName).flux();
    }
}

