/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core;

import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.CoreKeyspace;
import com.couchbase.client.core.CoreLimiter;
import com.couchbase.client.core.Timer;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.api.CoreCouchbaseOps;
import com.couchbase.client.core.api.kv.CoreKvBinaryOps;
import com.couchbase.client.core.api.kv.CoreKvOps;
import com.couchbase.client.core.api.manager.CoreBucketAndScope;
import com.couchbase.client.core.api.manager.search.ClassicCoreClusterSearchIndexManager;
import com.couchbase.client.core.api.manager.search.ClassicCoreScopeSearchIndexManager;
import com.couchbase.client.core.api.manager.search.CoreSearchIndexManager;
import com.couchbase.client.core.api.query.CoreQueryOps;
import com.couchbase.client.core.api.search.ClassicCoreSearchOps;
import com.couchbase.client.core.api.search.CoreSearchOps;
import com.couchbase.client.core.callbacks.BeforeSendRequestCallback;
import com.couchbase.client.core.classic.kv.ClassicCoreKvBinaryOps;
import com.couchbase.client.core.classic.kv.ClassicCoreKvOps;
import com.couchbase.client.core.classic.manager.ClassicCoreBucketManager;
import com.couchbase.client.core.classic.manager.ClassicCoreCollectionManagerOps;
import com.couchbase.client.core.classic.query.ClassicCoreQueryOps;
import com.couchbase.client.core.cnc.CbTracing;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.ValueRecorder;
import com.couchbase.client.core.cnc.events.core.BucketClosedEvent;
import com.couchbase.client.core.cnc.events.core.BucketOpenFailedEvent;
import com.couchbase.client.core.cnc.events.core.BucketOpenInitiatedEvent;
import com.couchbase.client.core.cnc.events.core.BucketOpenedEvent;
import com.couchbase.client.core.cnc.events.core.CoreCreatedEvent;
import com.couchbase.client.core.cnc.events.core.InitGlobalConfigFailedEvent;
import com.couchbase.client.core.cnc.events.core.ReconfigurationCompletedEvent;
import com.couchbase.client.core.cnc.events.core.ReconfigurationErrorDetectedEvent;
import com.couchbase.client.core.cnc.events.core.ServiceReconfigurationFailedEvent;
import com.couchbase.client.core.cnc.events.core.ShutdownCompletedEvent;
import com.couchbase.client.core.cnc.events.core.ShutdownInitiatedEvent;
import com.couchbase.client.core.cnc.events.core.WatchdogInvalidStateIdentifiedEvent;
import com.couchbase.client.core.cnc.events.core.WatchdogRunFailedEvent;
import com.couchbase.client.core.cnc.events.transaction.TransactionsStartedEvent;
import com.couchbase.client.core.cnc.metrics.LoggingMeter;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.config.ConfigurationProvider;
import com.couchbase.client.core.config.DefaultConfigurationProvider;
import com.couchbase.client.core.config.GlobalConfig;
import com.couchbase.client.core.diagnostics.ClusterState;
import com.couchbase.client.core.diagnostics.EndpointDiagnostics;
import com.couchbase.client.core.diagnostics.InternalEndpointDiagnostics;
import com.couchbase.client.core.diagnostics.WaitUntilReadyHelper;
import com.couchbase.client.core.endpoint.http.CoreHttpClient;
import com.couchbase.client.core.env.Authenticator;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.env.SeedNode;
import com.couchbase.client.core.error.AlreadyShutdownException;
import com.couchbase.client.core.error.ConfigException;
import com.couchbase.client.core.error.GlobalConfigNotFoundException;
import com.couchbase.client.core.error.InvalidArgumentException;
import com.couchbase.client.core.error.RequestCanceledException;
import com.couchbase.client.core.error.UnsupportedConfigMechanismException;
import com.couchbase.client.core.manager.CoreBucketManagerOps;
import com.couchbase.client.core.manager.CoreCollectionManager;
import com.couchbase.client.core.msg.CancellationReason;
import com.couchbase.client.core.msg.Request;
import com.couchbase.client.core.msg.RequestContext;
import com.couchbase.client.core.msg.RequestTarget;
import com.couchbase.client.core.msg.Response;
import com.couchbase.client.core.msg.kv.KeyValueRequest;
import com.couchbase.client.core.msg.query.QueryRequest;
import com.couchbase.client.core.msg.search.ServerSearchRequest;
import com.couchbase.client.core.node.AnalyticsLocator;
import com.couchbase.client.core.node.KeyValueLocator;
import com.couchbase.client.core.node.Locator;
import com.couchbase.client.core.node.Node;
import com.couchbase.client.core.node.RoundRobinLocator;
import com.couchbase.client.core.node.ViewLocator;
import com.couchbase.client.core.service.ServiceScope;
import com.couchbase.client.core.service.ServiceState;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.topology.NodeIdentifier;
import com.couchbase.client.core.transaction.cleanup.CoreTransactionsCleanup;
import com.couchbase.client.core.transaction.components.CoreTransactionRequest;
import com.couchbase.client.core.transaction.context.CoreTransactionsContext;
import com.couchbase.client.core.util.ConnectionString;
import com.couchbase.client.core.util.ConnectionStringUtil;
import com.couchbase.client.core.util.CoreIdGenerator;
import com.couchbase.client.core.util.LatestStateSubscription;
import com.couchbase.client.core.util.NanoTimestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

@Stability.Volatile
public class Core
implements CoreCouchbaseOps,
AutoCloseable {
    private static final KeyValueLocator KEY_VALUE_LOCATOR = new KeyValueLocator();
    private static final RoundRobinLocator MANAGER_LOCATOR = new RoundRobinLocator(ServiceType.MANAGER);
    private static final RoundRobinLocator QUERY_LOCATOR = new RoundRobinLocator(ServiceType.QUERY);
    private static final RoundRobinLocator ANALYTICS_LOCATOR = new AnalyticsLocator();
    private static final RoundRobinLocator SEARCH_LOCATOR = new RoundRobinLocator(ServiceType.SEARCH);
    private static final RoundRobinLocator VIEWS_LOCATOR = new ViewLocator();
    private static final RoundRobinLocator EVENTING_LOCATOR = new RoundRobinLocator(ServiceType.EVENTING);
    private static final RoundRobinLocator BACKUP_LOCATOR = new RoundRobinLocator(ServiceType.BACKUP);
    private static final Duration INVALID_STATE_WATCHDOG_INTERVAL = Duration.ofSeconds(5L);
    private final CoreContext coreContext;
    private final ConfigurationProvider configurationProvider;
    private volatile ClusterConfig currentConfig;
    private final CopyOnWriteArrayList<Node> nodes;
    private final LatestStateSubscription<ClusterConfig> configurationProcessor;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final EventBus eventBus;
    private final Timer timer;
    private final List<BeforeSendRequestCallback> beforeSendRequestCallbacks;
    private final Disposable invalidStateWatchdog;
    private final Map<ResponseMetricIdentifier, ValueRecorder> responseMetrics = new ConcurrentHashMap<ResponseMetricIdentifier, ValueRecorder>();
    private final CoreTransactionsCleanup transactionsCleanup;
    private final CoreTransactionsContext transactionsContext;
    private final ConnectionString connectionString;

    @Deprecated
    public static Core create(CoreEnvironment environment, Authenticator authenticator, Set<SeedNode> seedNodes) {
        return Core.create(environment, authenticator, ConnectionStringUtil.asConnectionString(seedNodes));
    }

    public static Core create(CoreEnvironment environment, Authenticator authenticator, ConnectionString connectionString) {
        return new Core(environment, authenticator, connectionString);
    }

    protected Core(CoreEnvironment environment, Authenticator authenticator, ConnectionString connectionString) {
        if (environment.securityConfig().tlsEnabled() && !authenticator.supportsTls()) {
            throw new InvalidArgumentException("TLS enabled but the Authenticator does not support TLS!", null, null);
        }
        if (!environment.securityConfig().tlsEnabled() && !authenticator.supportsNonTls()) {
            throw new InvalidArgumentException("TLS not enabled but the Authenticator does only support TLS!", null, null);
        }
        CoreCouchbaseOps.checkConnectionStringScheme(connectionString, ConnectionString.Scheme.COUCHBASE, ConnectionString.Scheme.COUCHBASES);
        ConnectionStringUtil.sanityCheckPorts(connectionString);
        CoreLimiter.incrementAndVerifyNumInstances(environment.eventBus());
        this.connectionString = Objects.requireNonNull(connectionString);
        this.coreContext = new CoreContext(this, CoreIdGenerator.nextId(), environment, authenticator);
        this.configurationProvider = this.createConfigurationProvider();
        this.nodes = new CopyOnWriteArrayList();
        this.eventBus = environment.eventBus();
        this.timer = environment.timer();
        this.currentConfig = this.configurationProvider.config();
        Flux configs = this.configurationProvider.configs().concatWith((Publisher)Mono.just((Object)new ClusterConfig()));
        this.configurationProcessor = new LatestStateSubscription<ClusterConfig>(configs, environment.scheduler(), (config, doFinally) -> {
            this.currentConfig = config;
            this.reconfigure(doFinally);
        });
        this.beforeSendRequestCallbacks = environment.requestCallbacks().stream().filter(c -> c instanceof BeforeSendRequestCallback).map(c -> (BeforeSendRequestCallback)c).collect(Collectors.toList());
        this.eventBus.publish(new CoreCreatedEvent(this.coreContext, environment, Collections.emptySet(), CoreLimiter.numInstances(), connectionString));
        long watchdogInterval = INVALID_STATE_WATCHDOG_INTERVAL.getSeconds();
        if (watchdogInterval <= 1L) {
            throw InvalidArgumentException.fromMessage("The Watchdog Interval cannot be smaller than 1 second!");
        }
        this.invalidStateWatchdog = environment.scheduler().schedulePeriodically((Runnable)new InvalidStateWatchdog(), watchdogInterval, watchdogInterval, TimeUnit.SECONDS);
        this.transactionsCleanup = new CoreTransactionsCleanup(this, environment.transactionsConfig());
        this.transactionsContext = new CoreTransactionsContext(environment.meter());
        this.context().environment().eventBus().publish(new TransactionsStartedEvent(environment.transactionsConfig().cleanupConfig().runLostAttemptsCleanupThread(), environment.transactionsConfig().cleanupConfig().runRegularAttemptsCleanupThread()));
    }

    ConfigurationProvider createConfigurationProvider() {
        return new DefaultConfigurationProvider(this, this.connectionString);
    }

    @Stability.Internal
    public ConfigurationProvider configurationProvider() {
        return this.configurationProvider;
    }

    public <R extends Response> void send(Request<R> request) {
        this.send(request, true);
    }

    @Stability.Internal
    public <R extends Response> void send(Request<R> request, boolean registerForTimeout) {
        if (this.shutdown.get()) {
            request.cancel(CancellationReason.SHUTDOWN);
            return;
        }
        if (registerForTimeout) {
            this.timer.register(request);
            for (BeforeSendRequestCallback cb : this.beforeSendRequestCallbacks) {
                cb.beforeSend(request);
            }
        }
        Core.locator(request.serviceType()).dispatch(request, this.nodes, this.currentConfig, this.context());
    }

    public CoreContext context() {
        return this.coreContext;
    }

    @Stability.Internal
    public CoreHttpClient httpClient(RequestTarget target) {
        return new CoreHttpClient(this, target);
    }

    @Stability.Internal
    public Stream<EndpointDiagnostics> diagnostics() {
        return this.nodes.stream().flatMap(Node::diagnostics);
    }

    @Stability.Internal
    public Stream<InternalEndpointDiagnostics> internalDiagnostics() {
        return this.nodes.stream().flatMap(Node::internalDiagnostics);
    }

    @Stability.Internal
    public Optional<Flux<ServiceState>> serviceState(NodeIdentifier nodeIdentifier, ServiceType type, Optional<String> bucket) {
        for (Node node : this.nodes) {
            if (!node.identifier().equals(nodeIdentifier)) continue;
            return node.serviceState(type, bucket);
        }
        return Optional.empty();
    }

    @Stability.Internal
    public void initGlobalConfig() {
        NanoTimestamp start = NanoTimestamp.now();
        this.configurationProvider.loadAndRefreshGlobalConfig().subscribe(v -> {}, throwable -> {
            InitGlobalConfigFailedEvent.Reason reason = InitGlobalConfigFailedEvent.Reason.UNKNOWN;
            if (throwable instanceof UnsupportedConfigMechanismException) {
                reason = InitGlobalConfigFailedEvent.Reason.UNSUPPORTED;
            } else if (throwable instanceof GlobalConfigNotFoundException) {
                reason = InitGlobalConfigFailedEvent.Reason.NO_CONFIG_FOUND;
            } else if (throwable instanceof ConfigException) {
                if (throwable.getCause() instanceof RequestCanceledException) {
                    RequestContext ctx = ((RequestCanceledException)throwable.getCause()).context().requestContext();
                    if (ctx.request().cancellationReason() == CancellationReason.SHUTDOWN) {
                        reason = InitGlobalConfigFailedEvent.Reason.SHUTDOWN;
                    }
                } else if (throwable.getMessage().contains("NO_ACCESS")) {
                    reason = InitGlobalConfigFailedEvent.Reason.NO_ACCESS;
                }
            } else if (throwable instanceof AlreadyShutdownException) {
                reason = InitGlobalConfigFailedEvent.Reason.SHUTDOWN;
            }
            this.eventBus.publish(new InitGlobalConfigFailedEvent(reason.severity(), start.elapsed(), this.context(), reason, (Throwable)throwable));
        });
    }

    @Stability.Internal
    public void openBucket(String name) {
        this.eventBus.publish(new BucketOpenInitiatedEvent(this.coreContext, name));
        NanoTimestamp start = NanoTimestamp.now();
        this.configurationProvider.openBucket(name).subscribe(v -> {}, t -> {
            Event.Severity severity = t instanceof AlreadyShutdownException ? Event.Severity.DEBUG : Event.Severity.WARN;
            this.eventBus.publish(new BucketOpenFailedEvent(name, severity, start.elapsed(), this.coreContext, (Throwable)t));
        }, () -> this.eventBus.publish(new BucketOpenedEvent(start.elapsed(), this.coreContext, name)));
    }

    @Stability.Internal
    public ClusterConfig clusterConfig() {
        return this.configurationProvider.config();
    }

    private Mono<Void> closeBucket(String name) {
        return Mono.defer(() -> {
            NanoTimestamp start = NanoTimestamp.now();
            return this.configurationProvider.closeBucket(name, !this.shutdown.get()).doOnSuccess(ignored -> this.eventBus.publish(new BucketClosedEvent(start.elapsed(), this.coreContext, name)));
        });
    }

    @Stability.Internal
    public Mono<Void> ensureServiceAt(NodeIdentifier identifier, ServiceType serviceType, int port, Optional<String> bucket) {
        if (this.shutdown.get()) {
            return Mono.empty();
        }
        return Flux.fromIterable(this.nodes).filter(n -> n.identifier().equals(identifier)).switchIfEmpty((Publisher)Mono.defer(() -> {
            Node node = this.createNode(identifier);
            this.nodes.add(node);
            return Mono.just((Object)node);
        })).flatMap(node -> node.addService(serviceType, port, bucket)).then();
    }

    @Stability.Internal
    public ValueRecorder responseMetric(Request<?> request, @Nullable Throwable err) {
        boolean isDefaultLoggingMeter = this.coreContext.environment().meter() instanceof LoggingMeter;
        String exceptionSimpleName = null;
        if (!isDefaultLoggingMeter) {
            if (err instanceof CompletionException) {
                exceptionSimpleName = err.getCause().getClass().getSimpleName().replace("Exception", "");
            } else if (err != null) {
                exceptionSimpleName = err.getClass().getSimpleName().replace("Exception", "");
            }
        }
        String finalExceptionSimpleName = exceptionSimpleName;
        return this.responseMetrics.computeIfAbsent(new ResponseMetricIdentifier(request, exceptionSimpleName), key -> {
            HashMap<String, String> tags = new HashMap<String, String>(7);
            if (((ResponseMetricIdentifier)key).serviceType == null) {
                if (request instanceof CoreTransactionRequest) {
                    tags.put("db.couchbase.service", "transactions");
                }
            } else {
                tags.put("db.couchbase.service", ((ResponseMetricIdentifier)key).serviceType);
            }
            tags.put("db.operation", ((ResponseMetricIdentifier)key).requestName);
            if (!isDefaultLoggingMeter) {
                tags.put("db.name", ((ResponseMetricIdentifier)key).bucketName);
                tags.put("db.couchbase.scope", ((ResponseMetricIdentifier)key).scopeName);
                tags.put("db.couchbase.collection", ((ResponseMetricIdentifier)key).collectionName);
                if (finalExceptionSimpleName != null) {
                    tags.put("outcome", finalExceptionSimpleName);
                } else {
                    tags.put("outcome", "Success");
                }
            }
            return this.coreContext.environment().meter().valueRecorder("db.couchbase.operations", tags);
        });
    }

    protected Node createNode(NodeIdentifier identifier) {
        return Node.create(this.coreContext, identifier);
    }

    private Mono<Void> maybeRemoveNode(Node node, ClusterConfig config) {
        return Mono.defer(() -> {
            boolean stillPresentInBuckets = config.bucketConfigs().values().stream().flatMap(bc -> bc.nodes().stream()).anyMatch(ni -> ni.id().equals(node.identifier()));
            boolean stillPresentInGlobal = config.globalConfig() != null ? config.globalConfig().portInfos().stream().anyMatch(ni -> ni.id().equals(node.identifier())) : false;
            if (!stillPresentInBuckets && !stillPresentInGlobal || !node.hasServicesEnabled()) {
                return node.disconnect().doOnTerminate(() -> this.nodes.remove(node));
            }
            return Mono.empty();
        });
    }

    private Mono<Void> removeServiceFrom(NodeIdentifier identifier, ServiceType serviceType, Optional<String> bucket) {
        return Flux.fromIterable(new ArrayList<Node>(this.nodes)).filter(n -> n.identifier().equals(identifier)).filter(node -> node.serviceEnabled(serviceType)).flatMap(node -> node.removeService(serviceType, bucket)).then();
    }

    @Stability.Internal
    public Mono<Void> shutdown() {
        return this.shutdown(this.coreContext.environment().timeoutConfig().disconnectTimeout());
    }

    @Override
    @Stability.Internal
    public Mono<Void> shutdown(Duration timeout) {
        return this.transactionsCleanup.shutdown(timeout).then(Mono.defer(() -> {
            NanoTimestamp start = NanoTimestamp.now();
            if (this.shutdown.compareAndSet(false, true)) {
                this.eventBus.publish(new ShutdownInitiatedEvent(this.coreContext));
                this.invalidStateWatchdog.dispose();
                return Flux.fromIterable(this.currentConfig.bucketConfigs().keySet()).flatMap(this::closeBucket).then(this.configurationProvider.shutdown()).then(this.configurationProcessor.awaitTermination()).doOnTerminate(() -> {
                    CoreLimiter.decrement();
                    this.eventBus.publish(new ShutdownCompletedEvent(start.elapsed(), this.coreContext));
                }).then();
            }
            return Mono.empty();
        })).timeout(timeout, this.coreContext.environment().scheduler());
    }

    private void reconfigure(Runnable doFinally) {
        ClusterConfig configForThisAttempt = this.currentConfig;
        if (configForThisAttempt.bucketConfigs().isEmpty() && configForThisAttempt.globalConfig() == null) {
            this.reconfigureDisconnectAll(doFinally);
            return;
        }
        NanoTimestamp start = NanoTimestamp.now();
        Flux bucketConfigFlux = Flux.just((Object)configForThisAttempt).flatMap(cc -> Flux.fromIterable(cc.bucketConfigs().values()));
        this.reconfigureBuckets((Flux<BucketConfig>)bucketConfigFlux).then(this.reconfigureGlobal(configForThisAttempt.globalConfig())).then(Mono.defer(() -> Flux.fromIterable(new ArrayList<Node>(this.nodes)).flatMap(n -> this.maybeRemoveNode((Node)n, configForThisAttempt)).then())).subscribe(v -> {}, e -> {
            doFinally.run();
            this.eventBus.publish(new ReconfigurationErrorDetectedEvent(this.context(), (Throwable)e));
        }, () -> {
            doFinally.run();
            this.eventBus.publish(new ReconfigurationCompletedEvent(start.elapsed(), this.coreContext));
        });
    }

    private void reconfigureDisconnectAll(Runnable doFinally) {
        NanoTimestamp start = NanoTimestamp.now();
        Flux.fromIterable(new ArrayList<Node>(this.nodes)).flatMap(Node::disconnect).doOnComplete(this.nodes::clear).subscribe(v -> {}, e -> {
            doFinally.run();
            this.eventBus.publish(new ReconfigurationErrorDetectedEvent(this.context(), (Throwable)e));
        }, () -> {
            doFinally.run();
            this.eventBus.publish(new ReconfigurationCompletedEvent(start.elapsed(), this.coreContext));
        });
    }

    private Mono<Void> reconfigureGlobal(GlobalConfig config) {
        return Mono.defer(() -> {
            if (config == null) {
                return Mono.empty();
            }
            return Flux.fromIterable(config.portInfos()).flatMap(ni -> {
                boolean tls = this.coreContext.environment().securityConfig().tlsEnabled();
                Map<ServiceType, Integer> services = tls ? ni.sslPorts() : ni.ports();
                Flux serviceRemoveFlux = Flux.fromArray((Object[])ServiceType.values()).filter(s -> !services.containsKey(s)).flatMap(s -> this.removeServiceFrom(ni.id(), (ServiceType)((Object)((Object)((Object)s))), Optional.empty()).onErrorResume(throwable -> {
                    this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, ni.hostname(), (ServiceType)((Object)((Object)((Object)s))), (Throwable)throwable));
                    return Mono.empty();
                }));
                Flux serviceAddFlux = Flux.fromIterable(services.entrySet()).flatMap(s -> this.ensureServiceAt(ni.id(), (ServiceType)((Object)((Object)((Object)((Object)s.getKey())))), (Integer)s.getValue(), Optional.empty()).onErrorResume(throwable -> {
                    this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, ni.hostname(), (ServiceType)((Object)((Object)((Object)((Object)((Object)s.getKey()))))), (Throwable)throwable));
                    return Mono.empty();
                }));
                return Flux.merge((Publisher[])new Publisher[]{serviceAddFlux, serviceRemoveFlux});
            }).then();
        });
    }

    private Mono<Void> reconfigureBuckets(Flux<BucketConfig> bucketConfigs) {
        return bucketConfigs.flatMap(bc -> Flux.fromIterable(bc.nodes()).flatMap(ni -> {
            boolean tls = this.coreContext.environment().securityConfig().tlsEnabled();
            Map<ServiceType, Integer> services = tls ? ni.sslServices() : ni.services();
            Flux serviceRemoveFlux = Flux.fromArray((Object[])ServiceType.values()).filter(s -> !services.containsKey(s)).flatMap(s -> this.removeServiceFrom(ni.id(), (ServiceType)((Object)((Object)((Object)s))), s.scope() == ServiceScope.BUCKET ? Optional.of(bc.name()) : Optional.empty()).onErrorResume(throwable -> {
                this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, ni.hostname(), (ServiceType)((Object)((Object)((Object)s))), (Throwable)throwable));
                return Mono.empty();
            }));
            Flux serviceAddFlux = Flux.fromIterable(services.entrySet()).flatMap(s -> this.ensureServiceAt(ni.id(), (ServiceType)((Object)((Object)((Object)((Object)s.getKey())))), (Integer)s.getValue(), ((ServiceType)((Object)((Object)((Object)((Object)s.getKey()))))).scope() == ServiceScope.BUCKET ? Optional.of(bc.name()) : Optional.empty()).onErrorResume(throwable -> {
                this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, ni.hostname(), (ServiceType)((Object)((Object)((Object)((Object)((Object)s.getKey()))))), (Throwable)throwable));
                return Mono.empty();
            }));
            return Flux.merge((Publisher[])new Publisher[]{serviceAddFlux, serviceRemoveFlux});
        })).then();
    }

    private static Locator locator(ServiceType serviceType) {
        switch (serviceType) {
            case KV: {
                return KEY_VALUE_LOCATOR;
            }
            case MANAGER: {
                return MANAGER_LOCATOR;
            }
            case QUERY: {
                return QUERY_LOCATOR;
            }
            case ANALYTICS: {
                return ANALYTICS_LOCATOR;
            }
            case SEARCH: {
                return SEARCH_LOCATOR;
            }
            case VIEWS: {
                return VIEWS_LOCATOR;
            }
            case EVENTING: {
                return EVENTING_LOCATOR;
            }
            case BACKUP: {
                return BACKUP_LOCATOR;
            }
        }
        throw new IllegalStateException("Unsupported ServiceType: " + (Object)((Object)serviceType));
    }

    @Stability.Internal
    public CoreTransactionsCleanup transactionsCleanup() {
        return this.transactionsCleanup;
    }

    @Stability.Internal
    public CoreTransactionsContext transactionsContext() {
        return this.transactionsContext;
    }

    @Override
    public void close() {
        this.shutdown().block();
    }

    @Override
    @Stability.Internal
    public CoreKvOps kvOps(CoreKeyspace keyspace) {
        return new ClassicCoreKvOps(this, keyspace);
    }

    @Override
    @Stability.Internal
    public CoreQueryOps queryOps() {
        return new ClassicCoreQueryOps(this);
    }

    @Override
    @Stability.Internal
    public CoreSearchOps searchOps(@Nullable CoreBucketAndScope scope) {
        return new ClassicCoreSearchOps(this, scope);
    }

    @Override
    @Stability.Internal
    public CoreKvBinaryOps kvBinaryOps(CoreKeyspace keyspace) {
        return new ClassicCoreKvBinaryOps(this, keyspace);
    }

    @Override
    @Stability.Internal
    public CoreBucketManagerOps bucketManager() {
        return new ClassicCoreBucketManager(this);
    }

    @Override
    @Stability.Internal
    public CoreCollectionManager collectionManager(String bucketName) {
        return new ClassicCoreCollectionManagerOps(this, bucketName);
    }

    @Override
    public CoreSearchIndexManager clusterSearchIndexManager() {
        return new ClassicCoreClusterSearchIndexManager(this);
    }

    @Override
    public CoreSearchIndexManager scopeSearchIndexManager(CoreBucketAndScope scope) {
        return new ClassicCoreScopeSearchIndexManager(this, scope);
    }

    @Override
    public CoreEnvironment environment() {
        return this.context().environment();
    }

    @Override
    public CompletableFuture<Void> waitUntilReady(Set<ServiceType> serviceTypes, Duration timeout, ClusterState desiredState, @Nullable String bucketName) {
        return WaitUntilReadyHelper.waitUntilReady(this, serviceTypes, timeout, desiredState, Optional.ofNullable(bucketName));
    }

    class InvalidStateWatchdog
    implements Runnable {
        InvalidStateWatchdog() {
        }

        @Override
        public void run() {
            try {
                int numConfigNodes;
                int numNodes;
                if (Core.this.currentConfig != null && Core.this.currentConfig.hasClusterOrBucketConfig() && (numNodes = Core.this.nodes.size()) != (numConfigNodes = Core.this.currentConfig.allNodeAddresses().size())) {
                    String message = "Number of managed nodes (" + numNodes + ") differs from the current config (" + numConfigNodes + "), triggering reconfiguration.";
                    Core.this.eventBus.publish(new WatchdogInvalidStateIdentifiedEvent(Core.this.context(), message));
                    Core.this.configurationProvider.republishCurrentConfig();
                }
            }
            catch (Throwable ex) {
                Core.this.eventBus.publish(new WatchdogRunFailedEvent(Core.this.context(), ex));
            }
        }
    }

    @Stability.Internal
    public static class ResponseMetricIdentifier {
        private final String serviceType;
        private final String requestName;
        @Nullable
        private final String bucketName;
        @Nullable
        private final String scopeName;
        @Nullable
        private final String collectionName;
        @Nullable
        private final String exceptionSimpleName;

        ResponseMetricIdentifier(Request<?> request, @Nullable String exceptionSimpleName) {
            this.exceptionSimpleName = exceptionSimpleName;
            this.serviceType = request.serviceType() == null ? (request instanceof CoreTransactionRequest ? "transactions" : "unknown") : CbTracing.getTracingId(request.serviceType());
            this.requestName = request.name();
            if (request instanceof KeyValueRequest) {
                KeyValueRequest kv = (KeyValueRequest)request;
                this.bucketName = request.bucket();
                this.scopeName = kv.collectionIdentifier().scope().orElse("_default");
                this.collectionName = kv.collectionIdentifier().collection().orElse("_default");
            } else if (request instanceof QueryRequest) {
                QueryRequest query = (QueryRequest)request;
                this.bucketName = request.bucket();
                this.scopeName = query.scope();
                this.collectionName = null;
            } else if (request instanceof ServerSearchRequest) {
                ServerSearchRequest search = (ServerSearchRequest)request;
                if (search.scope() != null) {
                    this.bucketName = search.scope().bucketName();
                    this.scopeName = search.scope().scopeName();
                } else {
                    this.bucketName = null;
                    this.scopeName = null;
                }
                this.collectionName = null;
            } else {
                this.bucketName = null;
                this.scopeName = null;
                this.collectionName = null;
            }
        }

        public ResponseMetricIdentifier(String serviceType, String requestName) {
            this.serviceType = serviceType;
            this.requestName = requestName;
            this.bucketName = null;
            this.scopeName = null;
            this.collectionName = null;
            this.exceptionSimpleName = null;
        }

        public String serviceType() {
            return this.serviceType;
        }

        public String requestName() {
            return this.requestName;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ResponseMetricIdentifier that = (ResponseMetricIdentifier)o;
            return this.serviceType.equals(that.serviceType) && Objects.equals(this.requestName, that.requestName) && Objects.equals(this.bucketName, that.bucketName) && Objects.equals(this.scopeName, that.scopeName) && Objects.equals(this.collectionName, that.collectionName) && Objects.equals(this.exceptionSimpleName, that.exceptionSimpleName);
        }

        public int hashCode() {
            return Objects.hash(this.serviceType, this.requestName, this.bucketName, this.scopeName, this.collectionName, this.exceptionSimpleName);
        }
    }
}

