/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core;

import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.Timer;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.callbacks.BeforeSendRequestCallback;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.ValueRecorder;
import com.couchbase.client.core.cnc.events.core.BucketClosedEvent;
import com.couchbase.client.core.cnc.events.core.BucketOpenFailedEvent;
import com.couchbase.client.core.cnc.events.core.BucketOpenInitiatedEvent;
import com.couchbase.client.core.cnc.events.core.BucketOpenedEvent;
import com.couchbase.client.core.cnc.events.core.CoreCreatedEvent;
import com.couchbase.client.core.cnc.events.core.InitGlobalConfigFailedEvent;
import com.couchbase.client.core.cnc.events.core.ReconfigurationCompletedEvent;
import com.couchbase.client.core.cnc.events.core.ReconfigurationErrorDetectedEvent;
import com.couchbase.client.core.cnc.events.core.ReconfigurationIgnoredEvent;
import com.couchbase.client.core.cnc.events.core.ServiceReconfigurationFailedEvent;
import com.couchbase.client.core.cnc.events.core.ShutdownCompletedEvent;
import com.couchbase.client.core.cnc.events.core.ShutdownInitiatedEvent;
import com.couchbase.client.core.config.AlternateAddress;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.config.ConfigurationProvider;
import com.couchbase.client.core.config.DefaultConfigurationProvider;
import com.couchbase.client.core.config.GlobalConfig;
import com.couchbase.client.core.diagnostics.EndpointDiagnostics;
import com.couchbase.client.core.endpoint.http.CoreHttpClient;
import com.couchbase.client.core.env.Authenticator;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.env.SeedNode;
import com.couchbase.client.core.error.AlreadyShutdownException;
import com.couchbase.client.core.error.ConfigException;
import com.couchbase.client.core.error.GlobalConfigNotFoundException;
import com.couchbase.client.core.error.InvalidArgumentException;
import com.couchbase.client.core.error.RequestCanceledException;
import com.couchbase.client.core.error.UnsupportedConfigMechanismException;
import com.couchbase.client.core.msg.CancellationReason;
import com.couchbase.client.core.msg.Request;
import com.couchbase.client.core.msg.RequestContext;
import com.couchbase.client.core.msg.RequestTarget;
import com.couchbase.client.core.msg.Response;
import com.couchbase.client.core.node.AnalyticsLocator;
import com.couchbase.client.core.node.KeyValueLocator;
import com.couchbase.client.core.node.Locator;
import com.couchbase.client.core.node.Node;
import com.couchbase.client.core.node.NodeIdentifier;
import com.couchbase.client.core.node.RoundRobinLocator;
import com.couchbase.client.core.node.ViewLocator;
import com.couchbase.client.core.service.ServiceScope;
import com.couchbase.client.core.service.ServiceState;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.util.CbCollections;
import java.security.SecureRandom;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

@Stability.Volatile
public class Core {
    private static final int GLOBAL_ID = new SecureRandom().nextInt();
    private static final AtomicInteger CORE_IDS = new AtomicInteger();
    private static final KeyValueLocator KEY_VALUE_LOCATOR = new KeyValueLocator();
    private static final RoundRobinLocator MANAGER_LOCATOR = new RoundRobinLocator(ServiceType.MANAGER);
    private static final RoundRobinLocator QUERY_LOCATOR = new RoundRobinLocator(ServiceType.QUERY);
    private static final RoundRobinLocator ANALYTICS_LOCATOR = new AnalyticsLocator();
    private static final RoundRobinLocator SEARCH_LOCATOR = new RoundRobinLocator(ServiceType.SEARCH);
    private static final RoundRobinLocator VIEWS_LOCATOR = new ViewLocator();
    private static final RoundRobinLocator EVENTING_LOCATOR = new RoundRobinLocator(ServiceType.EVENTING);
    private final CoreContext coreContext;
    private final ConfigurationProvider configurationProvider;
    private volatile ClusterConfig currentConfig;
    private final CopyOnWriteArrayList<Node> nodes;
    private final AtomicBoolean reconfigureInProgress = new AtomicBoolean(false);
    private final AtomicBoolean moreConfigsPending = new AtomicBoolean(false);
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final EventBus eventBus;
    private final Timer timer;
    private final Set<SeedNode> seedNodes;
    private final List<BeforeSendRequestCallback> beforeSendRequestCallbacks;
    private final Map<ResponseMetricIdentifier, ValueRecorder> responseMetrics = new ConcurrentHashMap<ResponseMetricIdentifier, ValueRecorder>();

    public static Core create(CoreEnvironment environment, Authenticator authenticator, Set<SeedNode> seedNodes) {
        return new Core(environment, authenticator, seedNodes);
    }

    protected Core(CoreEnvironment environment, Authenticator authenticator, Set<SeedNode> seedNodes) {
        if (environment.securityConfig().tlsEnabled() && !authenticator.supportsTls()) {
            throw new InvalidArgumentException("TLS enabled but the Authenticator does not support TLS!", null, null);
        }
        if (!environment.securityConfig().tlsEnabled() && !authenticator.supportsNonTls()) {
            throw new InvalidArgumentException("TLS not enabled but the Authenticator does only support TLS!", null, null);
        }
        this.seedNodes = seedNodes;
        this.coreContext = new CoreContext(this, this.createInstanceId(), environment, authenticator);
        this.configurationProvider = this.createConfigurationProvider();
        this.nodes = new CopyOnWriteArrayList();
        this.eventBus = environment.eventBus();
        this.timer = environment.timer();
        this.currentConfig = this.configurationProvider.config();
        this.configurationProvider.configs().subscribe(c -> {
            this.currentConfig = c;
            this.reconfigure();
        });
        this.beforeSendRequestCallbacks = environment.requestCallbacks().stream().filter(c -> c instanceof BeforeSendRequestCallback).map(c -> (BeforeSendRequestCallback)c).collect(Collectors.toList());
        this.eventBus.publish(new CoreCreatedEvent(this.coreContext, environment, seedNodes));
    }

    private long createInstanceId() {
        return (long)GLOBAL_ID << 32 | (long)CORE_IDS.incrementAndGet() & 0xFFFFFFFFL;
    }

    ConfigurationProvider createConfigurationProvider() {
        return new DefaultConfigurationProvider(this, this.seedNodes);
    }

    @Stability.Internal
    public ConfigurationProvider configurationProvider() {
        return this.configurationProvider;
    }

    public <R extends Response> void send(Request<R> request) {
        this.send(request, true);
    }

    @Stability.Internal
    public <R extends Response> void send(Request<R> request, boolean registerForTimeout) {
        if (this.shutdown.get()) {
            request.cancel(CancellationReason.SHUTDOWN);
            return;
        }
        if (registerForTimeout) {
            this.timer.register(request);
            for (BeforeSendRequestCallback cb : this.beforeSendRequestCallbacks) {
                cb.beforeSend(request);
            }
        }
        Core.locator(request.serviceType()).dispatch(request, this.nodes, this.currentConfig, this.context());
    }

    public CoreContext context() {
        return this.coreContext;
    }

    @Stability.Internal
    public CoreHttpClient httpClient(RequestTarget target) {
        return new CoreHttpClient(this, target);
    }

    @Stability.Internal
    public Stream<EndpointDiagnostics> diagnostics() {
        return this.nodes.stream().flatMap(Node::diagnostics);
    }

    @Stability.Internal
    public Optional<Flux<ServiceState>> serviceState(NodeIdentifier nodeIdentifier, ServiceType type, Optional<String> bucket) {
        for (Node node : this.nodes) {
            if (!node.identifier().equals(nodeIdentifier)) continue;
            return node.serviceState(type, bucket);
        }
        return Optional.empty();
    }

    @Stability.Internal
    public void initGlobalConfig() {
        long start = System.nanoTime();
        this.configurationProvider.loadAndRefreshGlobalConfig().subscribe(v -> {}, throwable -> {
            InitGlobalConfigFailedEvent.Reason reason = InitGlobalConfigFailedEvent.Reason.UNKNOWN;
            if (throwable instanceof UnsupportedConfigMechanismException) {
                reason = InitGlobalConfigFailedEvent.Reason.UNSUPPORTED;
            } else if (throwable instanceof GlobalConfigNotFoundException) {
                reason = InitGlobalConfigFailedEvent.Reason.NO_CONFIG_FOUND;
            } else if (throwable instanceof ConfigException) {
                if (throwable.getCause() instanceof RequestCanceledException) {
                    RequestContext ctx = ((RequestCanceledException)throwable.getCause()).context().requestContext();
                    if (ctx.request().cancellationReason() == CancellationReason.SHUTDOWN) {
                        reason = InitGlobalConfigFailedEvent.Reason.SHUTDOWN;
                    }
                } else if (throwable.getMessage().contains("NO_ACCESS")) {
                    reason = InitGlobalConfigFailedEvent.Reason.NO_ACCESS;
                }
            } else if (throwable instanceof AlreadyShutdownException) {
                reason = InitGlobalConfigFailedEvent.Reason.SHUTDOWN;
            }
            this.eventBus.publish(new InitGlobalConfigFailedEvent(reason.severity(), Duration.ofNanos(System.nanoTime() - start), this.context(), reason, (Throwable)throwable));
        });
    }

    @Stability.Internal
    public void openBucket(String name) {
        this.eventBus.publish(new BucketOpenInitiatedEvent(this.coreContext, name));
        long start = System.nanoTime();
        this.configurationProvider.openBucket(name).subscribe(v -> {}, t -> {
            Event.Severity severity = t instanceof AlreadyShutdownException ? Event.Severity.DEBUG : Event.Severity.WARN;
            this.eventBus.publish(new BucketOpenFailedEvent(name, severity, Duration.ofNanos(System.nanoTime() - start), this.coreContext, (Throwable)t));
        }, () -> this.eventBus.publish(new BucketOpenedEvent(Duration.ofNanos(System.nanoTime() - start), this.coreContext, name)));
    }

    @Stability.Internal
    public ClusterConfig clusterConfig() {
        return this.configurationProvider.config();
    }

    private Mono<Void> closeBucket(String name) {
        return Mono.defer(() -> {
            long start = System.nanoTime();
            return this.configurationProvider.closeBucket(name).doOnSuccess(ignored -> this.eventBus.publish(new BucketClosedEvent(Duration.ofNanos(System.nanoTime() - start), this.coreContext, name)));
        });
    }

    @Stability.Internal
    public Mono<Void> ensureServiceAt(NodeIdentifier identifier, ServiceType serviceType, int port, Optional<String> bucket, Optional<String> alternateAddress) {
        if (this.shutdown.get()) {
            return Mono.empty();
        }
        return Flux.fromIterable(this.nodes).filter(n -> n.identifier().equals(identifier)).switchIfEmpty((Publisher)Mono.defer(() -> {
            Node node = this.createNode(identifier, alternateAddress);
            this.nodes.add(node);
            return Mono.just((Object)node);
        })).flatMap(node -> node.addService(serviceType, port, bucket)).then();
    }

    @Stability.Internal
    public ValueRecorder responseMetric(Request<?> request) {
        return this.responseMetrics.computeIfAbsent(new ResponseMetricIdentifier(request), key -> {
            HashMap<String, String> tags = new HashMap<String, String>(4);
            tags.put("db.couchbase.service", ((ResponseMetricIdentifier)key).serviceType.ident());
            tags.put("db.operation", ((ResponseMetricIdentifier)key).requestName);
            return this.coreContext.environment().meter().valueRecorder("db.couchbase.operations", tags);
        });
    }

    protected Node createNode(NodeIdentifier identifier, Optional<String> alternateAddress) {
        return Node.create(this.coreContext, identifier, alternateAddress);
    }

    private Mono<Void> maybeRemoveNode(Node node, ClusterConfig config) {
        return Mono.defer(() -> {
            boolean stillPresentInBuckets = config.bucketConfigs().values().stream().flatMap(bc -> bc.nodes().stream()).anyMatch(ni -> ni.identifier().equals(node.identifier()));
            boolean stillPresentInGlobal = config.globalConfig() != null ? config.globalConfig().portInfos().stream().anyMatch(ni -> ni.identifier().equals(node.identifier())) : false;
            if (!stillPresentInBuckets && !stillPresentInGlobal || !node.hasServicesEnabled()) {
                return node.disconnect().doOnTerminate(() -> this.nodes.remove(node));
            }
            return Mono.empty();
        });
    }

    private Mono<Void> removeServiceFrom(NodeIdentifier identifier, ServiceType serviceType, Optional<String> bucket) {
        return Flux.fromIterable(new ArrayList<Node>(this.nodes)).filter(n -> n.identifier().equals(identifier)).filter(node -> node.serviceEnabled(serviceType)).flatMap(node -> node.removeService(serviceType, bucket)).then();
    }

    @Stability.Internal
    public Mono<Void> shutdown() {
        return this.shutdown(this.coreContext.environment().timeoutConfig().disconnectTimeout());
    }

    @Stability.Internal
    public Mono<Void> shutdown(Duration timeout) {
        return Mono.defer(() -> {
            long start = System.nanoTime();
            if (this.shutdown.compareAndSet(false, true)) {
                this.eventBus.publish(new ShutdownInitiatedEvent(this.coreContext));
                return Flux.fromIterable(this.currentConfig.bucketConfigs().keySet()).flatMap(this::closeBucket).then(this.configurationProvider.shutdown()).then(Flux.interval((Duration)Duration.ofMillis(10L), (Scheduler)this.coreContext.environment().scheduler()).takeUntil(i -> this.nodes.isEmpty()).then()).doOnTerminate(() -> this.eventBus.publish(new ShutdownCompletedEvent(Duration.ofNanos(System.nanoTime() - start), this.coreContext))).then();
            }
            return Mono.empty();
        }).timeout(timeout, this.coreContext.environment().scheduler());
    }

    private void reconfigure() {
        if (this.reconfigureInProgress.compareAndSet(false, true)) {
            ClusterConfig configForThisAttempt = this.currentConfig;
            if (configForThisAttempt.bucketConfigs().isEmpty() && configForThisAttempt.globalConfig() == null) {
                this.reconfigureDisconnectAll();
                return;
            }
            long start = System.nanoTime();
            Flux bucketConfigFlux = Flux.just((Object)configForThisAttempt).flatMap(cc -> Flux.fromIterable(cc.bucketConfigs().values()));
            this.reconfigureBuckets((Flux<BucketConfig>)bucketConfigFlux).then(this.reconfigureGlobal(configForThisAttempt.globalConfig())).then(Mono.defer(() -> Flux.fromIterable(new ArrayList<Node>(this.nodes)).flatMap(n -> this.maybeRemoveNode((Node)n, configForThisAttempt)).then())).subscribe(v -> {}, e -> {
                this.clearReconfigureInProgress();
                this.eventBus.publish(new ReconfigurationErrorDetectedEvent(this.context(), (Throwable)e));
            }, () -> {
                this.clearReconfigureInProgress();
                this.eventBus.publish(new ReconfigurationCompletedEvent(Duration.ofNanos(System.nanoTime() - start), this.coreContext));
            });
        } else {
            this.moreConfigsPending.set(true);
            this.eventBus.publish(new ReconfigurationIgnoredEvent(this.coreContext));
        }
    }

    private void reconfigureDisconnectAll() {
        long start = System.nanoTime();
        Flux.fromIterable(new ArrayList<Node>(this.nodes)).flatMap(Node::disconnect).doOnComplete(this.nodes::clear).subscribe(v -> {}, e -> {
            this.clearReconfigureInProgress();
            this.eventBus.publish(new ReconfigurationErrorDetectedEvent(this.context(), (Throwable)e));
        }, () -> {
            this.clearReconfigureInProgress();
            this.eventBus.publish(new ReconfigurationCompletedEvent(Duration.ofNanos(System.nanoTime() - start), this.coreContext));
        });
    }

    private void clearReconfigureInProgress() {
        this.reconfigureInProgress.set(false);
        if (this.moreConfigsPending.compareAndSet(true, false)) {
            this.reconfigure();
        }
    }

    private Mono<Void> reconfigureGlobal(GlobalConfig config) {
        return Mono.defer(() -> {
            if (config == null) {
                return Mono.empty();
            }
            return Flux.fromIterable(config.portInfos()).flatMap(ni -> {
                boolean tls = this.coreContext.environment().securityConfig().tlsEnabled();
                Set<Map.Entry<ServiceType, Integer>> aServices = null;
                Optional<String> alternateAddress = this.coreContext.alternateAddress();
                String aHost = null;
                if (alternateAddress.isPresent()) {
                    AlternateAddress aa = ni.alternateAddresses().get(alternateAddress.get());
                    aHost = aa.hostname();
                    Set<Map.Entry<ServiceType, Integer>> set = aServices = tls ? aa.sslServices().entrySet() : aa.services().entrySet();
                }
                if (aServices == null || aServices.isEmpty()) {
                    aServices = tls ? ni.sslPorts().entrySet() : ni.ports().entrySet();
                }
                String alternateHost = aHost;
                Set<Map.Entry<ServiceType, Integer>> services = aServices;
                Flux serviceRemoveFlux = Flux.fromIterable(Arrays.asList(ServiceType.values())).filter(s -> {
                    for (Map.Entry inConfig : services) {
                        if (inConfig.getKey() != s) continue;
                        return false;
                    }
                    return true;
                }).flatMap(s -> this.removeServiceFrom(ni.identifier(), (ServiceType)((Object)((Object)((Object)s))), Optional.empty()).onErrorResume(throwable -> {
                    this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, ni.hostname(), (ServiceType)((Object)((Object)((Object)s))), (Throwable)throwable));
                    return Mono.empty();
                }));
                Flux serviceAddFlux = Flux.fromIterable(services).flatMap(s -> this.ensureServiceAt(ni.identifier(), (ServiceType)((Object)((Object)((Object)((Object)s.getKey())))), (Integer)s.getValue(), Optional.empty(), Optional.ofNullable(alternateHost)).onErrorResume(throwable -> {
                    this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, ni.hostname(), (ServiceType)((Object)((Object)((Object)((Object)((Object)s.getKey()))))), (Throwable)throwable));
                    return Mono.empty();
                }));
                return Flux.merge((Publisher[])new Publisher[]{serviceAddFlux, serviceRemoveFlux});
            }).then();
        });
    }

    private Mono<Void> reconfigureBuckets(Flux<BucketConfig> bucketConfigs) {
        return bucketConfigs.flatMap(bc -> Flux.fromIterable(bc.nodes()).flatMap(ni -> {
            boolean tls = this.coreContext.environment().securityConfig().tlsEnabled();
            Set<Map.Entry<ServiceType, Integer>> aServices = null;
            Optional<String> alternateAddress = this.coreContext.alternateAddress();
            String aHost = null;
            if (alternateAddress.isPresent()) {
                AlternateAddress aa = ni.alternateAddresses().get(alternateAddress.get());
                aHost = aa.hostname();
                Set<Map.Entry<ServiceType, Integer>> set = aServices = tls ? aa.sslServices().entrySet() : aa.services().entrySet();
            }
            if (CbCollections.isNullOrEmpty(aServices)) {
                aServices = tls ? ni.sslServices().entrySet() : ni.services().entrySet();
            }
            String alternateHost = aHost;
            Set<Map.Entry<ServiceType, Integer>> services = aServices;
            Flux serviceRemoveFlux = Flux.fromIterable(Arrays.asList(ServiceType.values())).filter(s -> {
                for (Map.Entry inConfig : services) {
                    if (inConfig.getKey() != s) continue;
                    return false;
                }
                return true;
            }).flatMap(s -> this.removeServiceFrom(ni.identifier(), (ServiceType)((Object)((Object)((Object)s))), s.scope() == ServiceScope.BUCKET ? Optional.of(bc.name()) : Optional.empty()).onErrorResume(throwable -> {
                this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, ni.hostname(), (ServiceType)((Object)((Object)((Object)s))), (Throwable)throwable));
                return Mono.empty();
            }));
            Flux serviceAddFlux = Flux.fromIterable(services).flatMap(s -> this.ensureServiceAt(ni.identifier(), (ServiceType)((Object)((Object)((Object)((Object)s.getKey())))), (Integer)s.getValue(), ((ServiceType)((Object)((Object)((Object)((Object)s.getKey()))))).scope() == ServiceScope.BUCKET ? Optional.of(bc.name()) : Optional.empty(), Optional.ofNullable(alternateHost)).onErrorResume(throwable -> {
                this.eventBus.publish(new ServiceReconfigurationFailedEvent(this.coreContext, ni.hostname(), (ServiceType)((Object)((Object)((Object)((Object)((Object)s.getKey()))))), (Throwable)throwable));
                return Mono.empty();
            }));
            return Flux.merge((Publisher[])new Publisher[]{serviceAddFlux, serviceRemoveFlux});
        })).then();
    }

    private static Locator locator(ServiceType serviceType) {
        switch (serviceType) {
            case KV: {
                return KEY_VALUE_LOCATOR;
            }
            case MANAGER: {
                return MANAGER_LOCATOR;
            }
            case QUERY: {
                return QUERY_LOCATOR;
            }
            case ANALYTICS: {
                return ANALYTICS_LOCATOR;
            }
            case SEARCH: {
                return SEARCH_LOCATOR;
            }
            case VIEWS: {
                return VIEWS_LOCATOR;
            }
            case EVENTING: {
                return EVENTING_LOCATOR;
            }
        }
        throw new IllegalStateException("Unsupported ServiceType: " + (Object)((Object)serviceType));
    }

    private static class ResponseMetricIdentifier {
        private final ServiceType serviceType;
        private final String requestName;

        ResponseMetricIdentifier(Request<?> request) {
            this.serviceType = request.serviceType();
            this.requestName = request.name();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ResponseMetricIdentifier that = (ResponseMetricIdentifier)o;
            return this.serviceType == that.serviceType && Objects.equals(this.requestName, that.requestName);
        }

        public int hashCode() {
            return Objects.hash(new Object[]{this.serviceType, this.requestName});
        }
    }
}

