/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.adapter;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.opentracing.Tracer;
import io.smallrye.config.ConfigMapping;
import io.vertx.core.CompositeFuture;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import java.time.Duration;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.inject.Inject;
import org.eclipse.hono.adapter.AbstractProtocolAdapterBase;
import org.eclipse.hono.adapter.MessagingClientProviders;
import org.eclipse.hono.adapter.ProtocolAdapterProperties;
import org.eclipse.hono.adapter.monitoring.ConnectionEventProducer;
import org.eclipse.hono.adapter.monitoring.ConnectionEventProducerConfig;
import org.eclipse.hono.adapter.monitoring.ConnectionEventProducerOptions;
import org.eclipse.hono.adapter.monitoring.HonoEventConnectionEventProducer;
import org.eclipse.hono.adapter.monitoring.LoggingConnectionEventProducer;
import org.eclipse.hono.adapter.resourcelimits.ConnectedDevicesAsyncCacheLoader;
import org.eclipse.hono.adapter.resourcelimits.ConnectionDurationAsyncCacheLoader;
import org.eclipse.hono.adapter.resourcelimits.DataVolumeAsyncCacheLoader;
import org.eclipse.hono.adapter.resourcelimits.LimitedResource;
import org.eclipse.hono.adapter.resourcelimits.LimitedResourceKey;
import org.eclipse.hono.adapter.resourcelimits.NoopResourceLimitChecks;
import org.eclipse.hono.adapter.resourcelimits.PrometheusBasedResourceLimitCheckOptions;
import org.eclipse.hono.adapter.resourcelimits.PrometheusBasedResourceLimitChecks;
import org.eclipse.hono.adapter.resourcelimits.PrometheusBasedResourceLimitChecksConfig;
import org.eclipse.hono.adapter.resourcelimits.ResourceLimitChecks;
import org.eclipse.hono.client.amqp.config.ClientConfigProperties;
import org.eclipse.hono.client.amqp.config.ClientOptions;
import org.eclipse.hono.client.amqp.config.RequestResponseClientConfigProperties;
import org.eclipse.hono.client.amqp.config.RequestResponseClientOptions;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.SendMessageSampler;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.command.CommandRouterClient;
import org.eclipse.hono.client.command.ProtocolAdapterCommandConsumerFactory;
import org.eclipse.hono.client.command.ProtocolAdapterCommandConsumerFactoryImpl;
import org.eclipse.hono.client.command.amqp.ProtonBasedCommandResponseSender;
import org.eclipse.hono.client.command.amqp.ProtonBasedCommandRouterClient;
import org.eclipse.hono.client.command.amqp.ProtonBasedInternalCommandConsumer;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommandResponseSender;
import org.eclipse.hono.client.command.kafka.KafkaBasedInternalCommandConsumer;
import org.eclipse.hono.client.command.pubsub.PubSubBasedCommandResponseSender;
import org.eclipse.hono.client.command.pubsub.PubSubBasedInternalCommandConsumer;
import org.eclipse.hono.client.kafka.CommonKafkaClientOptions;
import org.eclipse.hono.client.kafka.KafkaAdminClientConfigProperties;
import org.eclipse.hono.client.kafka.KafkaAdminClientOptions;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerOptions;
import org.eclipse.hono.client.kafka.consumer.MessagingKafkaConsumerConfigProperties;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
import org.eclipse.hono.client.kafka.producer.CachingKafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.KafkaProducerOptions;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties;
import org.eclipse.hono.client.pubsub.PubSubConfigProperties;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.PubSubPublisherOptions;
import org.eclipse.hono.client.pubsub.publisher.CachingPubSubPublisherFactory;
import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherFactory;
import org.eclipse.hono.client.pubsub.subscriber.CachingPubSubSubscriberFactory;
import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberFactory;
import org.eclipse.hono.client.registry.CredentialsClient;
import org.eclipse.hono.client.registry.DeviceRegistrationClient;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.client.registry.amqp.ProtonBasedCredentialsClient;
import org.eclipse.hono.client.registry.amqp.ProtonBasedDeviceRegistrationClient;
import org.eclipse.hono.client.registry.amqp.ProtonBasedTenantClient;
import org.eclipse.hono.client.telemetry.EventSender;
import org.eclipse.hono.client.telemetry.TelemetrySender;
import org.eclipse.hono.client.telemetry.amqp.ProtonBasedDownstreamSender;
import org.eclipse.hono.client.telemetry.kafka.KafkaBasedEventSender;
import org.eclipse.hono.client.telemetry.kafka.KafkaBasedTelemetrySender;
import org.eclipse.hono.client.telemetry.pubsub.PubSubBasedDownstreamSender;
import org.eclipse.hono.client.util.MessagingClientProvider;
import org.eclipse.hono.notification.NotificationReceiver;
import org.eclipse.hono.service.HealthCheckServer;
import org.eclipse.hono.service.NotificationSupportingServiceApplication;
import org.eclipse.hono.service.cache.Caches;
import org.eclipse.hono.util.CredentialsObject;
import org.eclipse.hono.util.CredentialsResult;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.MessagingClient;
import org.eclipse.hono.util.MessagingType;
import org.eclipse.hono.util.RegistrationResult;
import org.eclipse.hono.util.TenantObject;
import org.eclipse.hono.util.TenantResult;
import org.eclipse.hono.util.WrappedLifecycleComponentVerticle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractProtocolAdapterApplication<C extends ProtocolAdapterProperties>
extends NotificationSupportingServiceApplication {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractProtocolAdapterApplication.class);
    @Inject
    protected SendMessageSampler.Factory messageSamplerFactory;
    @Inject
    protected C protocolAdapterProperties;
    @Inject
    protected KafkaClientMetricsSupport kafkaClientMetricsSupport;
    private ClientConfigProperties commandConsumerConfig;
    private ClientConfigProperties downstreamSenderConfig;
    private RequestResponseClientConfigProperties tenantClientConfig;
    private RequestResponseClientConfigProperties deviceRegistrationClientConfig;
    private RequestResponseClientConfigProperties credentialsClientConfig;
    private RequestResponseClientConfigProperties commandRouterConfig;
    private PrometheusBasedResourceLimitChecksConfig resourceLimitChecksConfig;
    private ConnectionEventProducerConfig connectionEventsConfig;
    private MessagingKafkaProducerConfigProperties kafkaTelemetryConfig;
    private MessagingKafkaProducerConfigProperties kafkaEventConfig;
    private MessagingKafkaProducerConfigProperties kafkaCommandResponseConfig;
    private MessagingKafkaConsumerConfigProperties kafkaCommandConfig;
    private KafkaAdminClientConfigProperties kafkaCommandInternalConfig;
    private NotificationKafkaConsumerConfigProperties kafkaNotificationConfig;
    private Cache<Object, TenantResult<TenantObject>> tenantResponseCache;
    private Cache<Object, RegistrationResult> registrationResponseCache;
    private Cache<Object, CredentialsResult<CredentialsObject>> credentialsResponseCache;
    private PubSubConfigProperties pubSubConfigProperties;

    protected abstract AbstractProtocolAdapterBase<C> adapter();

    @Inject
    void setCommandConsumerClientOptions(@ConfigMapping(prefix="hono.command") ClientOptions options) {
        ClientConfigProperties props = new ClientConfigProperties(options);
        props.setServerRoleIfUnknown("Command & Control");
        props.setNameIfNotSet(this.getComponentName());
        this.commandConsumerConfig = props;
    }

    @Inject
    void setDownstreamSenderOptions(@ConfigMapping(prefix="hono.messaging") ClientOptions options) {
        ClientConfigProperties props = new ClientConfigProperties(options);
        props.setServerRoleIfUnknown("Downstream");
        props.setNameIfNotSet(this.getComponentName());
        this.downstreamSenderConfig = props;
    }

    @Inject
    void setPubSubClientOptions(PubSubPublisherOptions options) {
        this.pubSubConfigProperties = new PubSubConfigProperties(options);
    }

    @Inject
    void setTenantServiceClientConfig(@ConfigMapping(prefix="hono.tenant") RequestResponseClientOptions options) {
        RequestResponseClientConfigProperties props = new RequestResponseClientConfigProperties(options);
        props.setServerRoleIfUnknown("Tenant");
        props.setNameIfNotSet(this.getComponentName());
        this.tenantClientConfig = props;
    }

    @Inject
    void setDeviceRegistrationClientConfig(@ConfigMapping(prefix="hono.registration") RequestResponseClientOptions options) {
        RequestResponseClientConfigProperties props = new RequestResponseClientConfigProperties(options);
        props.setServerRoleIfUnknown("Device Registration");
        props.setNameIfNotSet(this.getComponentName());
        this.deviceRegistrationClientConfig = props;
    }

    @Inject
    void setCredentialsServiceClientConfig(@ConfigMapping(prefix="hono.credentials") RequestResponseClientOptions options) {
        RequestResponseClientConfigProperties props = new RequestResponseClientConfigProperties(options);
        props.setServerRoleIfUnknown("Credentials");
        props.setNameIfNotSet(this.getComponentName());
        this.credentialsClientConfig = props;
    }

    @Inject
    void setCommandRouterClientConfig(@ConfigMapping(prefix="hono.commandRouter") RequestResponseClientOptions options) {
        RequestResponseClientConfigProperties props = new RequestResponseClientConfigProperties(options);
        props.setServerRoleIfUnknown("Command Router");
        props.setNameIfNotSet(this.getComponentName());
        this.commandRouterConfig = props;
    }

    @Inject
    void setResourceLimitCheckConfig(PrometheusBasedResourceLimitCheckOptions options) {
        PrometheusBasedResourceLimitChecksConfig props = new PrometheusBasedResourceLimitChecksConfig(options);
        props.setServerRoleIfUnknown("Prometheus");
        this.resourceLimitChecksConfig = props;
    }

    @Inject
    void setConnectionEventProducerConfig(ConnectionEventProducerOptions options) {
        this.connectionEventsConfig = new ConnectionEventProducerConfig(options);
    }

    @Inject
    void setTelemetryKafkaClientOptions(@ConfigMapping(prefix="hono.kafka") CommonKafkaClientOptions commonOptions, @ConfigMapping(prefix="hono.kafka.telemetry") KafkaProducerOptions telemetryOptions) {
        this.kafkaTelemetryConfig = new MessagingKafkaProducerConfigProperties(commonOptions, telemetryOptions);
    }

    @Inject
    void setEventKafkaClientOptions(@ConfigMapping(prefix="hono.kafka") CommonKafkaClientOptions commonOptions, @ConfigMapping(prefix="hono.kafka.event") KafkaProducerOptions eventOptions) {
        this.kafkaEventConfig = new MessagingKafkaProducerConfigProperties(commonOptions, eventOptions);
    }

    @Inject
    void setCommandResponseKafkaClientOptions(@ConfigMapping(prefix="hono.kafka") CommonKafkaClientOptions commonOptions, @ConfigMapping(prefix="hono.kafka.commandResponse") KafkaProducerOptions commandResponseOptions) {
        this.kafkaCommandResponseConfig = new MessagingKafkaProducerConfigProperties(commonOptions, commandResponseOptions);
    }

    @Inject
    void setCommandConsumerKafkaClientOptions(@ConfigMapping(prefix="hono.kafka") CommonKafkaClientOptions commonOptions, @ConfigMapping(prefix="hono.kafka.command") KafkaConsumerOptions commandOptions) {
        this.kafkaCommandConfig = new MessagingKafkaConsumerConfigProperties(commonOptions, commandOptions);
    }

    @Inject
    void setCommandInternalKafkaClientOptions(@ConfigMapping(prefix="hono.kafka") CommonKafkaClientOptions commonOptions, @ConfigMapping(prefix="hono.kafka.commandInternal") KafkaAdminClientOptions commandInternalOptions) {
        this.kafkaCommandInternalConfig = new KafkaAdminClientConfigProperties(commonOptions, commandInternalOptions);
    }

    @Inject
    void setNotificationKafkaClientOptions(@ConfigMapping(prefix="hono.kafka") CommonKafkaClientOptions commonOptions, @ConfigMapping(prefix="hono.kafka.notification") KafkaConsumerOptions notificationOptions) {
        this.kafkaNotificationConfig = new NotificationKafkaConsumerConfigProperties(commonOptions, notificationOptions);
    }

    protected void doStart() {
        LOG.info("deploying {} {} instances ...", (Object)this.appConfig.getMaxInstances(), (Object)this.getComponentName());
        HashMap deploymentResult = new HashMap();
        Future adapterTracker = this.vertx.deployVerticle(this::adapter, new DeploymentOptions().setInstances(this.appConfig.getMaxInstances())).onSuccess(ok -> {
            LOG.info("successfully deployed adapter verticle(s)");
            deploymentResult.put("adapter verticle(s)", "successfully deployed");
        }).onFailure(t -> LOG.error("failed to deploy adapter verticle(s)", t));
        NotificationReceiver notificationReceiver = this.notificationReceiver(this.kafkaNotificationConfig, this.downstreamSenderConfig, this.pubSubConfigProperties);
        Future notificationReceiverTracker = this.vertx.deployVerticle((Verticle)new WrappedLifecycleComponentVerticle((Lifecycle)notificationReceiver)).onSuccess(ok -> {
            LOG.info("successfully deployed notification receiver verticle(s)");
            deploymentResult.put("notification receiver verticle", "successfully deployed");
        }).onFailure(t -> LOG.error("failed to deploy notification receiver verticle(s)", t));
        CompositeFuture.all((Future)adapterTracker, (Future)notificationReceiverTracker).map(deploymentResult).onComplete((Handler)this.deploymentCheck);
    }

    protected void setCollaborators(AbstractProtocolAdapterBase<?> adapter) {
        ProtocolAdapterCommandConsumerFactoryImpl commandConsumerFactory;
        Objects.requireNonNull(adapter);
        DeviceRegistrationClient registrationClient = this.registrationClient();
        MessagingClientProvider telemetrySenderProvider = new MessagingClientProvider();
        MessagingClientProvider eventSenderProvider = new MessagingClientProvider();
        MessagingClientProvider commandResponseSenderProvider = new MessagingClientProvider();
        TenantClient tenantClient = this.tenantClient();
        if (!this.appConfig.isKafkaMessagingDisabled() && this.kafkaEventConfig.isConfigured()) {
            LOG.info("Kafka client configuration present, adding Kafka messaging clients");
            CachingKafkaProducerFactory factory = CachingKafkaProducerFactory.sharedFactory((Vertx)this.vertx);
            factory.setMetricsSupport(this.kafkaClientMetricsSupport);
            telemetrySenderProvider.setClient((MessagingClient)new KafkaBasedTelemetrySender(this.vertx, (KafkaProducerFactory)factory, this.kafkaTelemetryConfig, ((ProtocolAdapterProperties)((Object)this.protocolAdapterProperties)).isDefaultsEnabled(), this.tracer));
            eventSenderProvider.setClient((MessagingClient)new KafkaBasedEventSender(this.vertx, (KafkaProducerFactory)factory, this.kafkaEventConfig, ((ProtocolAdapterProperties)((Object)this.protocolAdapterProperties)).isDefaultsEnabled(), this.tracer));
            commandResponseSenderProvider.setClient((MessagingClient)new KafkaBasedCommandResponseSender(this.vertx, (KafkaProducerFactory)factory, this.kafkaCommandResponseConfig, this.tracer));
        }
        if (!this.appConfig.isAmqpMessagingDisabled() && this.downstreamSenderConfig.isHostConfigured()) {
            LOG.info("AMQP 1.0 client configuration present, adding AMQP 1.0 based messaging clients");
            telemetrySenderProvider.setClient((MessagingClient)this.downstreamSender());
            eventSenderProvider.setClient((MessagingClient)this.downstreamSender());
            commandResponseSenderProvider.setClient((MessagingClient)new ProtonBasedCommandResponseSender(HonoConnection.newConnection((Vertx)this.vertx, (ClientConfigProperties)this.commandResponseSenderConfig(), (Tracer)this.tracer), this.messageSamplerFactory, ((ProtocolAdapterProperties)((Object)this.protocolAdapterProperties)).isJmsVendorPropsEnabled()));
        }
        if (!this.appConfig.isPubSubMessagingDisabled() && this.pubSubConfigProperties.isProjectIdConfigured()) {
            LOG.info("Pub/Sub client configuration present, adding Pub/Sub messaging clients");
            PubSubMessageHelper.getCredentialsProvider().ifPresentOrElse(provider -> {
                CachingPubSubPublisherFactory pubSubFactory = new CachingPubSubPublisherFactory(this.vertx, this.pubSubConfigProperties.getProjectId(), provider);
                telemetrySenderProvider.setClient((MessagingClient)this.pubSubDownstreamSender((PubSubPublisherFactory)pubSubFactory, "telemetry"));
                eventSenderProvider.setClient((MessagingClient)this.pubSubDownstreamSender((PubSubPublisherFactory)pubSubFactory, "event"));
                commandResponseSenderProvider.setClient((MessagingClient)new PubSubBasedCommandResponseSender(this.vertx, (PubSubPublisherFactory)pubSubFactory, this.pubSubConfigProperties.getProjectId(), this.tracer));
            }, () -> LOG.error("Could not initialize Pub/Sub messaging clients, no Credentials Provider present."));
        }
        MessagingClientProviders messagingClientProviders = new MessagingClientProviders((MessagingClientProvider<TelemetrySender>)telemetrySenderProvider, (MessagingClientProvider<EventSender>)eventSenderProvider, (MessagingClientProvider<CommandResponseSender>)commandResponseSenderProvider);
        if (this.commandRouterConfig.isHostConfigured()) {
            CommandResponseSender kafkaCommandResponseSender;
            CommandRouterClient commandRouterClient = this.commandRouterClient();
            adapter.setCommandRouterClient(commandRouterClient);
            commandConsumerFactory = this.commandConsumerFactory(commandRouterClient);
            if (!this.appConfig.isAmqpMessagingDisabled() && this.commandConsumerConfig.isHostConfigured()) {
                commandConsumerFactory.registerInternalCommandConsumer((id, handlers) -> new ProtonBasedInternalCommandConsumer(this.commandConsumerConnection(), id, handlers));
            }
            if (!this.appConfig.isKafkaMessagingDisabled() && (kafkaCommandResponseSender = (CommandResponseSender)messagingClientProviders.getCommandResponseSenderProvider().getClient(MessagingType.kafka)) != null && this.kafkaCommandInternalConfig.isConfigured() && this.kafkaCommandConfig.isConfigured()) {
                commandConsumerFactory.registerInternalCommandConsumer((id, handlers) -> new KafkaBasedInternalCommandConsumer(this.vertx, this.kafkaCommandInternalConfig, this.kafkaCommandConfig, tenantClient, kafkaCommandResponseSender, id, handlers, this.tracer).setMetricsSupport(this.kafkaClientMetricsSupport));
            }
            if (!this.appConfig.isPubSubMessagingDisabled()) {
                PubSubMessageHelper.getCredentialsProvider().ifPresentOrElse(provider -> {
                    CommandResponseSender pubsubCommandResponseSender = (CommandResponseSender)messagingClientProviders.getCommandResponseSenderProvider().getClient(MessagingType.pubsub);
                    if (pubsubCommandResponseSender != null && this.pubSubConfigProperties.isProjectIdConfigured()) {
                        CachingPubSubSubscriberFactory subscriberFactory = new CachingPubSubSubscriberFactory(this.vertx, this.pubSubConfigProperties.getProjectId(), provider);
                        commandConsumerFactory.registerInternalCommandConsumer((id, handlers) -> new PubSubBasedInternalCommandConsumer(pubsubCommandResponseSender, this.vertx, id, handlers, tenantClient, this.tracer, (PubSubSubscriberFactory)subscriberFactory, this.pubSubConfigProperties.getProjectId(), provider));
                    }
                }, () -> LOG.error("Could not initialize Pub/Sub based internal command consumer, no Credentials Provider present."));
            }
        } else {
            throw new IllegalStateException("No Command Router connection configured");
        }
        adapter.setCommandConsumerFactory((ProtocolAdapterCommandConsumerFactory)commandConsumerFactory);
        adapter.setMessagingClientProviders(messagingClientProviders);
        Optional.ofNullable(this.connectionEventProducer()).ifPresent(adapter::setConnectionEventProducer);
        adapter.setCredentialsClient(this.credentialsClient());
        adapter.setHealthCheckServer((HealthCheckServer)this.healthCheckServer);
        adapter.setRegistrationClient(registrationClient);
        adapter.setResourceLimitChecks(this.prometheusResourceLimitChecks(this.resourceLimitChecksConfig, tenantClient));
        adapter.setTenantClient(tenantClient);
        adapter.setTracer(this.tracer);
    }

    protected ConnectionEventProducer connectionEventProducer() {
        switch (this.connectionEventsConfig.getType()) {
            case LOGGING: {
                return new LoggingConnectionEventProducer(this.connectionEventsConfig);
            }
            case EVENTS: {
                return new HonoEventConnectionEventProducer();
            }
        }
        return null;
    }

    private Cache<Object, TenantResult<TenantObject>> tenantResponseCache() {
        if (this.tenantResponseCache == null) {
            this.tenantResponseCache = Caches.newCaffeineCache((RequestResponseClientConfigProperties)this.tenantClientConfig);
        }
        return this.tenantResponseCache;
    }

    protected TenantClient tenantClient() {
        return new ProtonBasedTenantClient(HonoConnection.newConnection((Vertx)this.vertx, (ClientConfigProperties)this.tenantClientConfig, (Tracer)this.tracer), this.messageSamplerFactory, this.tenantResponseCache());
    }

    private Cache<Object, RegistrationResult> registrationResponseCache() {
        if (this.registrationResponseCache == null) {
            this.registrationResponseCache = Caches.newCaffeineCache((RequestResponseClientConfigProperties)this.deviceRegistrationClientConfig);
        }
        return this.registrationResponseCache;
    }

    protected DeviceRegistrationClient registrationClient() {
        return new ProtonBasedDeviceRegistrationClient(HonoConnection.newConnection((Vertx)this.vertx, (ClientConfigProperties)this.deviceRegistrationClientConfig, (Tracer)this.tracer), this.messageSamplerFactory, this.registrationResponseCache());
    }

    private Cache<Object, CredentialsResult<CredentialsObject>> credentialsResponseCache() {
        if (this.credentialsResponseCache == null) {
            this.credentialsResponseCache = Caches.newCaffeineCache((RequestResponseClientConfigProperties)this.credentialsClientConfig);
        }
        return this.credentialsResponseCache;
    }

    protected CredentialsClient credentialsClient() {
        return new ProtonBasedCredentialsClient(HonoConnection.newConnection((Vertx)this.vertx, (ClientConfigProperties)this.credentialsClientConfig, (Tracer)this.tracer), this.messageSamplerFactory, this.credentialsResponseCache());
    }

    protected CommandRouterClient commandRouterClient() {
        return new ProtonBasedCommandRouterClient(HonoConnection.newConnection((Vertx)this.vertx, (ClientConfigProperties)this.commandRouterConfig, (Tracer)this.tracer), this.messageSamplerFactory);
    }

    private ProtonBasedDownstreamSender downstreamSender() {
        return new ProtonBasedDownstreamSender(HonoConnection.newConnection((Vertx)this.vertx, (ClientConfigProperties)this.downstreamSenderConfig, (Tracer)this.tracer), this.messageSamplerFactory, ((ProtocolAdapterProperties)((Object)this.protocolAdapterProperties)).isDefaultsEnabled(), ((ProtocolAdapterProperties)((Object)this.protocolAdapterProperties)).isJmsVendorPropsEnabled());
    }

    private PubSubBasedDownstreamSender pubSubDownstreamSender(PubSubPublisherFactory pubSubFactory, String topic) {
        return new PubSubBasedDownstreamSender(this.vertx, pubSubFactory, topic, this.pubSubConfigProperties.getProjectId(), ((ProtocolAdapterProperties)((Object)this.protocolAdapterProperties)).isDefaultsEnabled(), this.tracer);
    }

    protected HonoConnection commandConsumerConnection() {
        return HonoConnection.newConnection((Vertx)this.vertx, (ClientConfigProperties)this.commandConsumerConfig, (Tracer)this.tracer);
    }

    protected ProtocolAdapterCommandConsumerFactoryImpl commandConsumerFactory(CommandRouterClient commandRouterClient) {
        LOG.debug("using Command Router service client, configuring CommandConsumerFactory [{}]", (Object)ProtocolAdapterCommandConsumerFactoryImpl.class.getName());
        return new ProtocolAdapterCommandConsumerFactoryImpl(commandRouterClient, this.getComponentName());
    }

    private ClientConfigProperties commandResponseSenderConfig() {
        ClientConfigProperties props = new ClientConfigProperties(this.downstreamSenderConfig);
        props.setServerRole("Command Response");
        return props;
    }

    protected ResourceLimitChecks prometheusResourceLimitChecks(PrometheusBasedResourceLimitChecksConfig config, TenantClient tenantClient) {
        Objects.requireNonNull(config);
        Objects.requireNonNull(tenantClient);
        if (config.isHostConfigured()) {
            WebClientOptions webClientOptions = new WebClientOptions();
            webClientOptions.setConnectTimeout(config.getConnectTimeout());
            webClientOptions.setDefaultHost(config.getHost());
            webClientOptions.setDefaultPort(config.getPort());
            webClientOptions.setTrustOptions(config.getTrustOptions());
            webClientOptions.setKeyCertOptions(config.getKeyCertOptions());
            webClientOptions.setSsl(config.isTlsEnabled());
            WebClient webClient = WebClient.create((Vertx)this.vertx, (WebClientOptions)webClientOptions);
            Duration cacheTimeout = Duration.ofSeconds(config.getCacheTimeout());
            Caffeine builder = Caffeine.newBuilder().executor((Executor)Executors.newSingleThreadExecutor(r -> {
                Thread t = new Thread(r);
                t.setDaemon(true);
                return t;
            })).initialCapacity(config.getCacheMinSize()).maximumSize(config.getCacheMaxSize()).expireAfterWrite(cacheTimeout).refreshAfterWrite(cacheTimeout.dividedBy(2L));
            return new PrometheusBasedResourceLimitChecks((AsyncLoadingCache<LimitedResourceKey, LimitedResource<Long>>)builder.buildAsync((AsyncCacheLoader)new ConnectedDevicesAsyncCacheLoader(webClient, config, this.tracer)), (AsyncLoadingCache<LimitedResourceKey, LimitedResource<Duration>>)builder.buildAsync((AsyncCacheLoader)new ConnectionDurationAsyncCacheLoader(webClient, config, this.tracer)), (AsyncLoadingCache<LimitedResourceKey, LimitedResource<Long>>)builder.buildAsync((AsyncCacheLoader)new DataVolumeAsyncCacheLoader(webClient, config, this.tracer)), tenantClient, this.tracer);
        }
        return new NoopResourceLimitChecks();
    }
}

