/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.diagnostics;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.AbstractEvent;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.events.core.WaitUntilReadyCompletedEvent;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.ArrayNode;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.ObjectNode;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.DefaultFullHttpRequest;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.HttpMethod;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.HttpVersion;
import com.couchbase.client.core.diagnostics.ClusterState;
import com.couchbase.client.core.diagnostics.DiagnosticsResult;
import com.couchbase.client.core.diagnostics.EndpointDiagnostics;
import com.couchbase.client.core.diagnostics.HealthPinger;
import com.couchbase.client.core.diagnostics.PingResult;
import com.couchbase.client.core.diagnostics.WaitUntilReadyContext;
import com.couchbase.client.core.endpoint.EndpointState;
import com.couchbase.client.core.endpoint.http.CoreHttpPath;
import com.couchbase.client.core.error.UnambiguousTimeoutException;
import com.couchbase.client.core.error.context.CancellationErrorContext;
import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.core.msg.RequestTarget;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.msg.manager.GenericManagerRequest;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.util.CbCollections;
import com.couchbase.client.core.util.CbThrowables;
import com.couchbase.client.core.util.NanoTimestamp;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

@Stability.Internal
public class WaitUntilReadyHelper {
    @Stability.Internal
    public static CompletableFuture<Void> waitUntilReady(Core core, Set<ServiceType> serviceTypes, Duration timeout, ClusterState desiredState, Optional<String> bucketName) {
        if (core.isProtostellar()) {
            return core.protostellar().waitUntilReady(serviceTypes, timeout, desiredState, bucketName.orElse(null));
        }
        WaitUntilReadyLogger log = WaitUntilReadyLogger.create(core.environment().eventBus());
        log.message("Starting WaitUntilReady. serviceTypes=" + serviceTypes + ", timeout=" + timeout + ", desiredState=" + (Object)((Object)desiredState) + ", bucketName=" + bucketName);
        WaitUntilReadyState state = new WaitUntilReadyState(log);
        state.transition(WaitUntilReadyStage.CONFIG_LOAD);
        return Flux.interval((Duration)Duration.ofMillis(10L), (Scheduler)core.context().environment().scheduler()).onBackpressureDrop().filter(i -> {
            try {
                if (core.configurationProvider().bucketConfigLoadInProgress()) {
                    log.waitingBecause("bucket config load is in progress");
                    return false;
                }
                if (core.configurationProvider().globalConfigLoadInProgress()) {
                    log.waitingBecause("global config load is in progress");
                    return false;
                }
                if (bucketName.isPresent() && core.configurationProvider().collectionRefreshInProgress()) {
                    log.waitingBecause("collection refresh is in progress for bucket " + (String)bucketName.get());
                    return false;
                }
                if (bucketName.isPresent() && core.clusterConfig().bucketConfig((String)bucketName.get()) == null) {
                    log.waitingBecause("cluster config does not yet have config for bucket " + (String)bucketName.get());
                    return false;
                }
                return true;
            }
            catch (Throwable t) {
                log.message("Unexpected exception while waiting for config load: " + CbThrowables.getStackTraceAsString(t));
                throw t;
            }
        }).flatMap(i -> {
            if (!bucketName.isPresent()) {
                log.message("Skipping node health check because no bucket name was specified.");
                return Flux.just((Object)i);
            }
            state.transition(WaitUntilReadyStage.BUCKET_NODES_HEALTHY);
            String httpPath = CoreHttpPath.formatPath("/pools/default/buckets/{}", (String)bucketName.get());
            GenericManagerRequest request = new GenericManagerRequest(core.context(), () -> new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, httpPath), true, null);
            log.message("Sending manager request to check bucket health; httpPath=" + httpPath);
            core.send(request);
            return Reactor.wrap(request, request.response(), true).filter(response -> {
                try {
                    if (response.status() != ResponseStatus.SUCCESS) {
                        log.waitingBecause("Manager request to check bucket health failed with response status " + (Object)((Object)response.status()) + "; httpStatusCode=" + response.httpStatus() + ", responseBody=" + new String(response.content(), StandardCharsets.UTF_8) + ", requestContext=" + request.context());
                        return false;
                    }
                    ObjectNode root = (ObjectNode)Mapper.decodeIntoTree(response.content());
                    ArrayNode nodes = (ArrayNode)root.get("nodes");
                    long healthy = StreamSupport.stream(nodes.spliterator(), false).filter(node -> node.get("status").asText().equals("healthy")).count();
                    if ((long)nodes.size() == healthy) {
                        log.message("All " + healthy + " nodes are healthy");
                        return true;
                    }
                    log.waitingBecause(healthy + " of " + nodes.size() + " nodes are healthy");
                    return false;
                }
                catch (Throwable t) {
                    log.message("Unexpected error while checking bucket health: " + CbThrowables.getStackTraceAsString(t));
                    throw t;
                }
            }).map(ignored -> i);
        }).take(1L).flatMap(aLong -> {
            if (!bucketName.isPresent() && !core.clusterConfig().hasClusterOrBucketConfig()) {
                log.message("No bucket name is present, and no cluster or bucket config is present. This usually indicates cluster.waitUntilReady() was called against a pre-6.5 cluster. There's nothing to wait for, so consider this operation complete!");
                state.transition(WaitUntilReadyStage.COMPLETE);
                WaitUntilReadyContext waitUntilReadyContext = new WaitUntilReadyContext(WaitUntilReadyHelper.servicesToCheck(core, serviceTypes, bucketName, log), timeout, desiredState, bucketName, core.diagnostics().collect(Collectors.groupingBy(EndpointDiagnostics::type)), state);
                core.context().environment().eventBus().publish(new WaitUntilReadyCompletedEvent(waitUntilReadyContext, WaitUntilReadyCompletedEvent.Reason.CLUSTER_LEVEL_NOT_SUPPORTED));
                return Flux.empty();
            }
            state.transition(WaitUntilReadyStage.PING);
            Flux diagnostics = Flux.interval((Duration)Duration.ofMillis(10L), (Scheduler)core.context().environment().scheduler()).onBackpressureDrop().map(i -> WaitUntilReadyHelper.diagnosticsCurrentState(core, log)).takeUntil(s -> {
                boolean done;
                boolean bl = done = s == desiredState;
                if (!done) {
                    log.waitingBecause("current cluster state " + (Object)s + " does not satisfy desired state " + (Object)((Object)desiredState));
                } else {
                    log.message("Done waiting for diagnostics! Current cluster state " + (Object)s + " satisfies desired state " + (Object)((Object)desiredState));
                }
                return done;
            });
            return Flux.concat((Publisher[])new Publisher[]{WaitUntilReadyHelper.ping(core, WaitUntilReadyHelper.servicesToCheck(core, serviceTypes, bucketName, log), timeout, bucketName, log), diagnostics});
        }).then().timeout(timeout, Mono.defer(() -> {
            log.message("WaitUntilReady timed out :-(");
            WaitUntilReadyContext waitUntilReadyContext = new WaitUntilReadyContext(WaitUntilReadyHelper.servicesToCheck(core, serviceTypes, bucketName, log), timeout, desiredState, bucketName, core.diagnostics().collect(Collectors.groupingBy(EndpointDiagnostics::type)), state);
            CancellationErrorContext errorContext = new CancellationErrorContext(waitUntilReadyContext);
            return Mono.error((Throwable)new UnambiguousTimeoutException("WaitUntilReady timed out", errorContext));
        }), core.context().environment().scheduler()).doOnSuccess(unused -> {
            log.message("WaitUntilReady succeeded :-)");
            state.transition(WaitUntilReadyStage.COMPLETE);
            WaitUntilReadyContext waitUntilReadyContext = new WaitUntilReadyContext(WaitUntilReadyHelper.servicesToCheck(core, serviceTypes, bucketName, log), timeout, desiredState, bucketName, core.diagnostics().collect(Collectors.groupingBy(EndpointDiagnostics::type)), state);
            core.context().environment().eventBus().publish(new WaitUntilReadyCompletedEvent(waitUntilReadyContext, WaitUntilReadyCompletedEvent.Reason.SUCCESS));
        }).toFuture();
    }

    private static Set<ServiceType> servicesToCheck(Core core, Set<ServiceType> serviceTypes, Optional<String> bucketName, WaitUntilReadyLogger log) {
        return !CbCollections.isNullOrEmpty(serviceTypes) ? serviceTypes : HealthPinger.extractPingTargets(core.clusterConfig(), bucketName, log).stream().map(RequestTarget::serviceType).collect(Collectors.toSet());
    }

    private static ClusterState diagnosticsCurrentState(Core core, WaitUntilReadyLogger log) {
        List diagnosticsResult = core.diagnostics().collect(Collectors.toList());
        Map<ServiceType, List<EndpointDiagnostics>> groupedByService = diagnosticsResult.stream().collect(Collectors.groupingBy(EndpointDiagnostics::type));
        log.message("diagnostics: Raw results: " + diagnosticsResult);
        log.message("diagnostics: Raw results, grouped by service: " + groupedByService);
        log.message("diagnostics: Endpoints not in CONNECTED state: " + CbCollections.filter(diagnosticsResult, it -> it.state() != EndpointState.CONNECTED));
        return DiagnosticsResult.aggregateClusterState(groupedByService.values());
    }

    private static Flux<PingResult> ping(Core core, Set<ServiceType> serviceTypes, Duration timeout, Optional<String> bucketName, WaitUntilReadyLogger log) {
        return HealthPinger.ping(core, Optional.of(timeout), core.context().environment().retryStrategy(), serviceTypes, Optional.empty(), bucketName, log).doOnNext(it -> log.message("ping: PingResult = " + it)).flux();
    }

    @Stability.Internal
    public static interface WaitUntilReadyLogger {
        public static final WaitUntilReadyLogger dummy = new WaitUntilReadyLogger(){};

        public static WaitUntilReadyLogger create(final EventBus eventBus) {
            final String id = UUID.randomUUID().toString();
            return new WaitUntilReadyLogger(){

                @Override
                public void message(String message) {
                    message = id + " " + message;
                    if (EventBus.PublishResult.SUCCESS != eventBus.publish(new WaitUntilReadyDiagnostic(message))) {
                        System.err.println("[WaitUntilReadyDiagnostic] " + message);
                    }
                }
            };
        }

        default public void message(String message) {
        }

        default public void waitingBecause(String message) {
            this.message("Waiting because " + message);
        }
    }

    @Stability.Internal
    public static class WaitUntilReadyState {
        private final Map<WaitUntilReadyStage, Long> timings = new ConcurrentHashMap<WaitUntilReadyStage, Long>();
        private final AtomicLong totalDuration = new AtomicLong();
        private volatile WaitUntilReadyStage currentStage = WaitUntilReadyStage.INITIAL;
        private volatile NanoTimestamp currentStart = NanoTimestamp.now();
        private final WaitUntilReadyLogger log;

        public WaitUntilReadyState(WaitUntilReadyLogger log) {
            this.log = Objects.requireNonNull(log);
        }

        void transition(WaitUntilReadyStage next) {
            long timing = this.currentStart.elapsed().toMillis();
            if (this.currentStage != WaitUntilReadyStage.INITIAL) {
                this.timings.put(this.currentStage, timing);
                this.log.message("Stage '" + (Object)((Object)this.currentStage) + "' took " + this.currentStart.elapsed());
            }
            this.totalDuration.addAndGet(timing);
            this.log.message("Transitioning from stage " + (Object)((Object)this.currentStage) + " to stage " + (Object)((Object)next) + ". Total elapsed time since waiting started: " + Duration.ofMillis(this.totalDuration.get()));
            this.currentStage = next;
            this.currentStart = NanoTimestamp.now();
        }

        public Map<String, Object> export() {
            TreeMap<String, Object> toExport = new TreeMap<String, Object>();
            toExport.put("current_stage", (Object)this.currentStage);
            if (this.currentStage != WaitUntilReadyStage.COMPLETE) {
                long currentMs = this.currentStart.elapsed().toMillis();
                toExport.put("current_stage_since_ms", currentMs);
                toExport.put("total_ms", this.totalDuration.get() + currentMs);
            } else {
                toExport.put("total_ms", this.totalDuration.get());
            }
            toExport.put("timings_ms", this.timings);
            return toExport;
        }

        public long totalDuration() {
            return this.totalDuration.get();
        }
    }

    private static enum WaitUntilReadyStage {
        INITIAL,
        CONFIG_LOAD,
        BUCKET_NODES_HEALTHY,
        PING,
        COMPLETE;

    }

    private static class WaitUntilReadyDiagnostic
    extends AbstractEvent {
        private final String message;

        protected WaitUntilReadyDiagnostic(String message) {
            super(Event.Severity.DEBUG, Event.Category.CORE.path() + ".WaitUntilReady", Duration.ZERO, null);
            this.message = Objects.requireNonNull(message);
        }

        @Override
        public String description() {
            return this.message;
        }
    }
}

