/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import com.google.common.collect.Sets;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.resolver.AddressResolver;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutor;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.limiter.ConnectionController;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.DnsResolverUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.proxy.extensions.ProxyExtensions;
import org.apache.pulsar.proxy.server.BrokerDiscoveryProvider;
import org.apache.pulsar.proxy.server.BrokerProxyValidator;
import org.apache.pulsar.proxy.server.LookupProxyHandler;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyConnection;
import org.apache.pulsar.proxy.server.ServiceChannelInitializer;
import org.apache.pulsar.proxy.stats.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProxyService
implements Closeable {
    private final ProxyConfiguration proxyConfig;
    private final Authentication proxyClientAuthentication;
    private final DnsAddressResolverGroup dnsAddressResolverGroup;
    private final BrokerProxyValidator brokerProxyValidator;
    private String serviceUrl;
    private String serviceUrlTls;
    private final AuthenticationService authenticationService;
    private AuthorizationService authorizationService;
    private MetadataStoreExtended localMetadataStore;
    private MetadataStoreExtended configMetadataStore;
    private PulsarResources pulsarResources;
    private ProxyExtensions proxyExtensions = null;
    private final EventLoopGroup acceptorGroup;
    private final EventLoopGroup workerGroup;
    private final List<EventLoopGroup> extensionsWorkerGroups = new ArrayList<EventLoopGroup>();
    private Channel listenChannel;
    private Channel listenChannelTls;
    private final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-proxy-acceptor");
    private final DefaultThreadFactory workersThreadFactory = new DefaultThreadFactory("pulsar-proxy-io");
    private BrokerDiscoveryProvider discoveryProvider;
    protected final AtomicReference<Semaphore> lookupRequestSemaphore;
    protected int proxyLogLevel;
    protected boolean proxyZeroCopyModeEnabled;
    private final ScheduledExecutorService statsExecutor;
    static final Gauge ACTIVE_CONNECTIONS = (Gauge)Gauge.build((String)"pulsar_proxy_active_connections", (String)"Number of connections currently active in the proxy").create().register();
    static final Counter NEW_CONNECTIONS = (Counter)Counter.build((String)"pulsar_proxy_new_connections", (String)"Counter of connections being opened in the proxy").create().register();
    static final Counter REJECTED_CONNECTIONS = (Counter)Counter.build((String)"pulsar_proxy_rejected_connections", (String)"Counter for connections rejected due to throttling").create().register();
    static final Counter OPS_COUNTER = (Counter)Counter.build((String)"pulsar_proxy_binary_ops", (String)"Counter of proxy operations").create().register();
    static final Counter BYTES_COUNTER = (Counter)Counter.build((String)"pulsar_proxy_binary_bytes", (String)"Counter of proxy bytes").create().register();
    private final Set<ProxyConnection> clientCnxs;
    private final Map<String, TopicStats> topicStats;
    private AdditionalServlets proxyAdditionalServlets;
    private PrometheusMetricsServlet metricsServlet;
    private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
    private final ConnectionController connectionController;
    private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class);

    public ProxyService(ProxyConfiguration proxyConfig, AuthenticationService authenticationService, Authentication proxyClientAuthentication) throws Exception {
        Objects.requireNonNull(proxyConfig);
        this.proxyConfig = proxyConfig;
        this.clientCnxs = Sets.newConcurrentHashSet();
        this.topicStats = new ConcurrentHashMap<String, TopicStats>();
        this.lookupRequestSemaphore = new AtomicReference<Semaphore>(new Semaphore(proxyConfig.getMaxConcurrentLookupRequests(), false));
        this.proxyLogLevel = proxyConfig.getProxyLogLevel().isPresent() ? proxyConfig.getProxyLogLevel().get() : 0;
        this.acceptorGroup = EventLoopUtil.newEventLoopGroup((int)proxyConfig.getNumAcceptorThreads(), (boolean)false, (ThreadFactory)this.acceptorThreadFactory);
        this.workerGroup = EventLoopUtil.newEventLoopGroup((int)proxyConfig.getNumIOThreads(), (boolean)false, (ThreadFactory)this.workersThreadFactory);
        this.authenticationService = authenticationService;
        DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder().channelType(EventLoopUtil.getDatagramChannelClass((EventLoopGroup)this.workerGroup)).socketChannelType(EventLoopUtil.getClientSocketChannelClass((EventLoopGroup)this.workerGroup), true);
        DnsResolverUtil.applyJdkDnsCacheSettings((DnsNameResolverBuilder)dnsNameResolverBuilder);
        this.dnsAddressResolverGroup = new DnsAddressResolverGroup(dnsNameResolverBuilder);
        this.brokerProxyValidator = new BrokerProxyValidator((AddressResolver<InetSocketAddress>)this.dnsAddressResolverGroup.getResolver((EventExecutor)this.workerGroup.next()), proxyConfig.getBrokerProxyAllowedHostNames(), proxyConfig.getBrokerProxyAllowedIPAddresses(), proxyConfig.getBrokerProxyAllowedTargetPorts());
        this.proxyExtensions = ProxyExtensions.load(proxyConfig);
        this.proxyExtensions.initialize(proxyConfig);
        this.statsExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultThreadFactory("proxy-stats-executor"));
        this.statsExecutor.schedule(() -> {
            this.clientCnxs.forEach(cnx -> {
                if (cnx.getDirectProxyHandler() != null && cnx.getDirectProxyHandler().getInboundChannelRequestsRate() != null) {
                    cnx.getDirectProxyHandler().getInboundChannelRequestsRate().calculateRate();
                }
            });
            this.topicStats.forEach((topic, stats) -> stats.calculate());
        }, 60L, TimeUnit.SECONDS);
        this.proxyAdditionalServlets = AdditionalServlets.load((PulsarConfiguration)proxyConfig);
        this.proxyClientAuthentication = proxyClientAuthentication;
        this.connectionController = new ConnectionController.DefaultConnectionController(proxyConfig.getMaxConcurrentInboundConnections(), proxyConfig.getMaxConcurrentInboundConnectionsPerIp());
    }

    public void start() throws Exception {
        if (this.proxyConfig.isAuthorizationEnabled() && !this.proxyConfig.isAuthenticationEnabled()) {
            throw new IllegalStateException("Invalid proxy configuration. Authentication must be enabled with authenticationEnabled=true when authorization is enabled with authorizationEnabled=true.");
        }
        if (!StringUtils.isBlank((CharSequence)this.proxyConfig.getMetadataStoreUrl()) && !StringUtils.isBlank((CharSequence)this.proxyConfig.getConfigurationMetadataStoreUrl())) {
            this.localMetadataStore = this.createLocalMetadataStore();
            this.configMetadataStore = this.createConfigurationMetadataStore();
            this.pulsarResources = new PulsarResources((MetadataStore)this.localMetadataStore, (MetadataStore)this.configMetadataStore);
            this.discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, this.pulsarResources);
            this.authorizationService = new AuthorizationService(PulsarConfigurationLoader.convertFrom((PulsarConfiguration)this.proxyConfig), this.pulsarResources);
        }
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.option(ChannelOption.SO_REUSEADDR, (Object)true);
        bootstrap.childOption(ChannelOption.ALLOCATOR, (Object)PulsarByteBufAllocator.DEFAULT);
        bootstrap.group(this.acceptorGroup, this.workerGroup);
        bootstrap.childOption(ChannelOption.TCP_NODELAY, (Object)true);
        bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, (Object)new AdaptiveRecvByteBufAllocator(1024, 16384, 0x100000));
        Class serverSocketChannelClass = EventLoopUtil.getServerSocketChannelClass((EventLoopGroup)this.workerGroup);
        bootstrap.channel(serverSocketChannelClass);
        EventLoopUtil.enableTriggeredMode((ServerBootstrap)bootstrap);
        if (this.proxyConfig.isProxyZeroCopyModeEnabled() && EpollServerSocketChannel.class.isAssignableFrom(serverSocketChannelClass)) {
            this.proxyZeroCopyModeEnabled = true;
        }
        bootstrap.childHandler((ChannelHandler)new ServiceChannelInitializer(this, this.proxyConfig, false));
        if (this.proxyConfig.getServicePort().isPresent()) {
            try {
                this.listenChannel = bootstrap.bind(this.proxyConfig.getBindAddress(), this.proxyConfig.getServicePort().get().intValue()).sync().channel();
                LOG.info("Started Pulsar Proxy at {}", (Object)this.listenChannel.localAddress());
            }
            catch (Exception e) {
                throw new IOException("Failed to bind Pulsar Proxy on port " + String.valueOf(this.proxyConfig.getServicePort().get()), e);
            }
        }
        if (this.proxyConfig.getServicePortTls().isPresent()) {
            ServerBootstrap tlsBootstrap = bootstrap.clone();
            tlsBootstrap.childHandler((ChannelHandler)new ServiceChannelInitializer(this, this.proxyConfig, true));
            this.listenChannelTls = tlsBootstrap.bind(this.proxyConfig.getBindAddress(), this.proxyConfig.getServicePortTls().get().intValue()).sync().channel();
            LOG.info("Started Pulsar TLS Proxy on {}", (Object)this.listenChannelTls.localAddress());
        }
        String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress((String)this.proxyConfig.getAdvertisedAddress());
        this.serviceUrl = this.proxyConfig.getServicePort().isPresent() ? String.format("pulsar://%s:%d/", hostname, this.getListenPort().get()) : null;
        this.serviceUrlTls = this.proxyConfig.getServicePortTls().isPresent() ? String.format("pulsar+ssl://%s:%d/", hostname, this.getListenPortTls().get()) : null;
        this.createMetricsServlet();
        this.proxyExtensions.start(this);
        Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlerChannelInitializers = this.proxyExtensions.newChannelInitializers();
        this.startProxyExtensions(protocolHandlerChannelInitializers, bootstrap);
    }

    private synchronized void createMetricsServlet() {
        this.metricsServlet = new PrometheusMetricsServlet(this.proxyConfig.getMetricsServletTimeoutMs(), this.proxyConfig.getClusterName());
        if (this.pendingMetricsProviders != null) {
            this.pendingMetricsProviders.forEach(provider -> this.metricsServlet.addRawMetricsProvider(provider));
            this.pendingMetricsProviders = null;
        }
    }

    public void startProxyExtensions(Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlers, ServerBootstrap serverBootstrap) {
        protocolHandlers.forEach((extensionName, initializers) -> initializers.forEach((address, initializer) -> {
            try {
                this.startProxyExtension((String)extensionName, (SocketAddress)address, (ChannelInitializer<SocketChannel>)initializer, serverBootstrap);
            }
            catch (IOException e) {
                LOG.error("{}", (Object)e.getMessage(), (Object)e.getCause());
                throw new RuntimeException(e.getMessage(), e.getCause());
            }
        }));
    }

    private void startProxyExtension(String extensionName, SocketAddress address, ChannelInitializer<SocketChannel> initializer, ServerBootstrap serverBootstrap) throws IOException {
        ServerBootstrap bootstrap;
        boolean useSeparateThreadPool = this.proxyConfig.isUseSeparateThreadPoolForProxyExtensions();
        if (useSeparateThreadPool) {
            bootstrap = new ServerBootstrap();
            bootstrap.option(ChannelOption.SO_REUSEADDR, (Object)true);
            bootstrap.childOption(ChannelOption.ALLOCATOR, (Object)PulsarByteBufAllocator.DEFAULT);
            bootstrap.childOption(ChannelOption.TCP_NODELAY, (Object)true);
            bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, (Object)new AdaptiveRecvByteBufAllocator(1024, 16384, 0x100000));
            EventLoopUtil.enableTriggeredMode((ServerBootstrap)bootstrap);
            DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("pulsar-ext-" + extensionName);
            EventLoopGroup dedicatedWorkerGroup = EventLoopUtil.newEventLoopGroup((int)this.proxyConfig.getNumIOThreads(), (boolean)false, (ThreadFactory)defaultThreadFactory);
            this.extensionsWorkerGroups.add(dedicatedWorkerGroup);
            bootstrap.channel(EventLoopUtil.getServerSocketChannelClass((EventLoopGroup)dedicatedWorkerGroup));
            bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup);
        } else {
            bootstrap = serverBootstrap.clone();
        }
        bootstrap.childHandler(initializer);
        try {
            bootstrap.bind(address).sync();
        }
        catch (Exception e) {
            throw new IOException("Failed to bind extension `" + extensionName + "` on " + String.valueOf(address), e);
        }
        LOG.info("Successfully bound extension `{}` on {}", (Object)extensionName, (Object)address);
    }

    public BrokerDiscoveryProvider getDiscoveryProvider() {
        return this.discoveryProvider;
    }

    @Override
    public void close() throws IOException {
        if (this.listenChannel != null) {
            try {
                this.listenChannel.close().sync();
            }
            catch (InterruptedException e) {
                LOG.info("Shutdown of listenChannel interrupted");
                Thread.currentThread().interrupt();
            }
        }
        if (this.listenChannelTls != null) {
            try {
                this.listenChannelTls.close().sync();
            }
            catch (InterruptedException e) {
                LOG.info("Shutdown of listenChannelTls interrupted");
                Thread.currentThread().interrupt();
            }
        }
        try {
            this.acceptorGroup.shutdownGracefully().sync();
        }
        catch (InterruptedException e) {
            LOG.info("Shutdown of acceptorGroup interrupted");
            Thread.currentThread().interrupt();
        }
        this.closeAllConnections();
        this.dnsAddressResolverGroup.close();
        if (this.discoveryProvider != null) {
            this.discoveryProvider.close();
        }
        if (this.statsExecutor != null) {
            this.statsExecutor.shutdown();
        }
        if (this.proxyAdditionalServlets != null) {
            this.proxyAdditionalServlets.close();
            this.proxyAdditionalServlets = null;
        }
        this.resetMetricsServlet();
        if (this.localMetadataStore != null) {
            try {
                this.localMetadataStore.close();
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }
        if (this.configMetadataStore != null) {
            try {
                this.configMetadataStore.close();
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }
        try {
            this.workerGroup.shutdownGracefully().sync();
        }
        catch (InterruptedException e) {
            LOG.info("Shutdown of workerGroup interrupted");
            Thread.currentThread().interrupt();
        }
        for (EventLoopGroup group : this.extensionsWorkerGroups) {
            try {
                group.shutdownGracefully().sync();
            }
            catch (InterruptedException e) {
                LOG.info("Shutdown of {} interrupted", (Object)group);
                Thread.currentThread().interrupt();
            }
        }
        LOG.info("ProxyService closed.");
    }

    private void closeAllConnections() {
        try {
            this.workerGroup.submit(() -> {
                if (!this.clientCnxs.isEmpty()) {
                    LOG.info("Closing {} proxy connections, including connections to brokers", (Object)this.clientCnxs.size());
                    for (ProxyConnection clientCnx : this.clientCnxs) {
                        clientCnx.ctx().close();
                    }
                } else {
                    LOG.info("No proxy connections to close");
                }
            }).sync();
        }
        catch (InterruptedException e) {
            LOG.info("Closing of connections interrupted");
            Thread.currentThread().interrupt();
        }
    }

    private synchronized void resetMetricsServlet() {
        this.metricsServlet = null;
    }

    public String getServiceUrl() {
        return this.serviceUrl;
    }

    public String getServiceUrlTls() {
        return this.serviceUrlTls;
    }

    public ProxyConfiguration getConfiguration() {
        return this.proxyConfig;
    }

    public AuthenticationService getAuthenticationService() {
        return this.authenticationService;
    }

    public AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    public Semaphore getLookupRequestSemaphore() {
        return this.lookupRequestSemaphore.get();
    }

    public EventLoopGroup getWorkerGroup() {
        return this.workerGroup;
    }

    public Optional<Integer> getListenPort() {
        if (this.listenChannel != null) {
            return Optional.of(((InetSocketAddress)this.listenChannel.localAddress()).getPort());
        }
        return Optional.empty();
    }

    public Optional<Integer> getListenPortTls() {
        if (this.listenChannelTls != null) {
            return Optional.of(((InetSocketAddress)this.listenChannelTls.localAddress()).getPort());
        }
        return Optional.empty();
    }

    public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
        return PulsarResources.createLocalMetadataStore((String)this.proxyConfig.getMetadataStoreUrl(), (int)this.proxyConfig.getMetadataStoreSessionTimeoutMillis(), (boolean)this.proxyConfig.isMetadataStoreAllowReadOnlyOperations());
    }

    public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
        return PulsarResources.createConfigMetadataStore((String)this.proxyConfig.getConfigurationMetadataStoreUrl(), (int)this.proxyConfig.getMetadataStoreSessionTimeoutMillis(), (boolean)this.proxyConfig.isMetadataStoreAllowReadOnlyOperations());
    }

    public Authentication getProxyClientAuthenticationPlugin() {
        return this.proxyClientAuthentication;
    }

    public synchronized PrometheusMetricsServlet getMetricsServlet() {
        return this.metricsServlet;
    }

    public synchronized void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
        if (this.metricsServlet == null) {
            if (this.pendingMetricsProviders == null) {
                this.pendingMetricsProviders = new LinkedList<PrometheusRawMetricsProvider>();
            }
            this.pendingMetricsProviders.add(metricsProvider);
        } else {
            this.metricsServlet.addRawMetricsProvider(metricsProvider);
        }
    }

    protected LookupProxyHandler newLookupProxyHandler(ProxyConnection proxyConnection) {
        return new LookupProxyHandler(this, proxyConnection);
    }

    public void setProxyLogLevel(int proxyLogLevel) {
        this.proxyLogLevel = proxyLogLevel;
        if (proxyLogLevel < 2) {
            this.topicStats.clear();
        }
    }

    @Generated
    public DnsAddressResolverGroup getDnsAddressResolverGroup() {
        return this.dnsAddressResolverGroup;
    }

    @Generated
    public BrokerProxyValidator getBrokerProxyValidator() {
        return this.brokerProxyValidator;
    }

    @Generated
    public ProxyExtensions getProxyExtensions() {
        return this.proxyExtensions;
    }

    @Generated
    public int getProxyLogLevel() {
        return this.proxyLogLevel;
    }

    @Generated
    public boolean isProxyZeroCopyModeEnabled() {
        return this.proxyZeroCopyModeEnabled;
    }

    @Generated
    public Set<ProxyConnection> getClientCnxs() {
        return this.clientCnxs;
    }

    @Generated
    public Map<String, TopicStats> getTopicStats() {
        return this.topicStats;
    }

    @Generated
    public AdditionalServlets getProxyAdditionalServlets() {
        return this.proxyAdditionalServlets;
    }

    @Generated
    public ConnectionController getConnectionController() {
        return this.connectionController;
    }
}

