/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.commandrouter.app;

import com.github.benmanes.caffeine.cache.Cache;
import io.opentracing.Tracer;
import io.smallrye.config.ConfigMapping;
import io.smallrye.health.api.HealthRegistry;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.proton.sasl.ProtonSaslAuthenticatorFactory;
import jakarta.inject.Inject;
import java.util.HashMap;
import java.util.Optional;
import org.eclipse.hono.client.amqp.config.ClientConfigProperties;
import org.eclipse.hono.client.amqp.config.ClientOptions;
import org.eclipse.hono.client.amqp.config.RequestResponseClientConfigProperties;
import org.eclipse.hono.client.amqp.config.RequestResponseClientOptions;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.SendMessageSampler;
import org.eclipse.hono.client.kafka.CommonKafkaClientOptions;
import org.eclipse.hono.client.kafka.KafkaAdminClientConfigProperties;
import org.eclipse.hono.client.kafka.KafkaAdminClientOptions;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerOptions;
import org.eclipse.hono.client.kafka.consumer.MessagingKafkaConsumerConfigProperties;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
import org.eclipse.hono.client.kafka.producer.CachingKafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.KafkaProducerOptions;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties;
import org.eclipse.hono.client.pubsub.PubSubConfigProperties;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.PubSubPublisherOptions;
import org.eclipse.hono.client.pubsub.publisher.CachingPubSubPublisherFactory;
import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherFactory;
import org.eclipse.hono.client.pubsub.subscriber.CachingPubSubSubscriberFactory;
import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberFactory;
import org.eclipse.hono.client.registry.DeviceRegistrationClient;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.client.registry.amqp.ProtonBasedDeviceRegistrationClient;
import org.eclipse.hono.client.registry.amqp.ProtonBasedTenantClient;
import org.eclipse.hono.client.telemetry.EventSender;
import org.eclipse.hono.client.telemetry.amqp.ProtonBasedDownstreamSender;
import org.eclipse.hono.client.telemetry.kafka.KafkaBasedEventSender;
import org.eclipse.hono.client.telemetry.pubsub.PubSubBasedDownstreamSender;
import org.eclipse.hono.client.util.MessagingClientProvider;
import org.eclipse.hono.commandrouter.AdapterInstanceStatusService;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.commandrouter.CommandRouterAmqpServer;
import org.eclipse.hono.commandrouter.CommandRouterMetrics;
import org.eclipse.hono.commandrouter.CommandRouterService;
import org.eclipse.hono.commandrouter.CommandTargetMapper;
import org.eclipse.hono.commandrouter.impl.CommandRouterServiceImpl;
import org.eclipse.hono.commandrouter.impl.DelegatingCommandRouterAmqpEndpoint;
import org.eclipse.hono.commandrouter.impl.UnknownStatusProvidingService;
import org.eclipse.hono.commandrouter.impl.amqp.ProtonBasedCommandConsumerFactoryImpl;
import org.eclipse.hono.commandrouter.impl.kafka.InternalKafkaTopicCleanupService;
import org.eclipse.hono.commandrouter.impl.kafka.KafkaBasedCommandConsumerFactoryImpl;
import org.eclipse.hono.commandrouter.impl.pubsub.PubSubBasedCommandConsumerFactoryImpl;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.config.ServiceOptions;
import org.eclipse.hono.deviceconnection.common.DeviceConnectionInfo;
import org.eclipse.hono.notification.NotificationReceiver;
import org.eclipse.hono.service.HealthCheckProvider;
import org.eclipse.hono.service.HealthCheckServer;
import org.eclipse.hono.service.NotificationSupportingServiceApplication;
import org.eclipse.hono.service.amqp.AmqpEndpoint;
import org.eclipse.hono.service.auth.AuthenticationService;
import org.eclipse.hono.service.cache.Caches;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.MessagingClient;
import org.eclipse.hono.util.RegistrationResult;
import org.eclipse.hono.util.TenantObject;
import org.eclipse.hono.util.TenantResult;
import org.eclipse.hono.util.WrappedLifecycleComponentVerticle;
import org.eclipse.microprofile.health.Readiness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractApplication
extends NotificationSupportingServiceApplication {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractApplication.class);
    @Inject
    ProtonSaslAuthenticatorFactory saslAuthenticatorFactory;
    @Inject
    AuthenticationService authenticationService;
    @Inject
    AdapterInstanceStatusService adapterInstanceStatusService;
    @Inject
    CommandRouterMetrics metrics;
    @Inject
    KafkaClientMetricsSupport kafkaClientMetricsSupport;
    @Readiness
    @Inject
    HealthRegistry readinessChecks;
    private ServiceConfigProperties amqpServerProperties;
    private ClientConfigProperties commandConsumerConnectionConfig;
    private RequestResponseClientConfigProperties deviceRegistrationClientConfig;
    private ClientConfigProperties downstreamSenderConfig;
    private RequestResponseClientConfigProperties tenantClientConfig;
    private MessagingKafkaProducerConfigProperties commandInternalKafkaProducerConfig;
    private MessagingKafkaProducerConfigProperties commandResponseKafkaProducerConfig;
    private MessagingKafkaConsumerConfigProperties kafkaConsumerConfig;
    private MessagingKafkaProducerConfigProperties kafkaEventConfig;
    private KafkaAdminClientConfigProperties kafkaAdminClientConfig;
    private NotificationKafkaConsumerConfigProperties kafkaNotificationConfig;
    private Cache<Object, RegistrationResult> registrationResponseCache;
    private Cache<Object, TenantResult<TenantObject>> tenantResponseCache;
    private PubSubConfigProperties pubSubConfigProperties;

    @Inject
    void setPubSubClientOptions(PubSubPublisherOptions options) {
        this.pubSubConfigProperties = new PubSubConfigProperties(options);
    }

    @Inject
    void setAmqpServerOptions(@ConfigMapping(prefix="hono.commandRouter.amqp") ServiceOptions options) {
        this.amqpServerProperties = new ServiceConfigProperties(options);
    }

    @Inject
    void setCommandClientOptions(@ConfigMapping(prefix="hono.command") ClientOptions options) {
        ClientConfigProperties props = new ClientConfigProperties(options);
        props.setServerRoleIfUnknown("Command & Control");
        props.setNameIfNotSet(this.componentName);
        this.commandConsumerConnectionConfig = props;
    }

    @Inject
    void setDownstreamSenderOptions(@ConfigMapping(prefix="hono.messaging") ClientOptions options) {
        ClientConfigProperties props = new ClientConfigProperties(options);
        props.setServerRoleIfUnknown("Downstream");
        props.setNameIfNotSet(this.componentName);
        this.downstreamSenderConfig = props;
    }

    @Inject
    void setTenantServiceClientConfig(@ConfigMapping(prefix="hono.tenant") RequestResponseClientOptions options) {
        RequestResponseClientConfigProperties props = new RequestResponseClientConfigProperties(options);
        props.setServerRoleIfUnknown("Tenant");
        props.setNameIfNotSet(this.componentName);
        this.tenantClientConfig = props;
    }

    @Inject
    void setDeviceRegistrationClientConfig(@ConfigMapping(prefix="hono.registration") RequestResponseClientOptions options) {
        RequestResponseClientConfigProperties props = new RequestResponseClientConfigProperties(options);
        props.setServerRoleIfUnknown("Device Registration");
        props.setNameIfNotSet(this.componentName);
        this.deviceRegistrationClientConfig = props;
    }

    @Inject
    void setCommandInternalKafkaClientOptions(@ConfigMapping(prefix="hono.kafka") CommonKafkaClientOptions commonOptions, @ConfigMapping(prefix="hono.kafka.commandInternal") KafkaProducerOptions commandInternalProducerOptions) {
        this.commandInternalKafkaProducerConfig = new MessagingKafkaProducerConfigProperties(commonOptions, commandInternalProducerOptions);
    }

    @Inject
    void setCommandResponseKafkaClientOptions(@ConfigMapping(prefix="hono.kafka") CommonKafkaClientOptions commonOptions, @ConfigMapping(prefix="hono.kafka.commandResponse") KafkaProducerOptions commandResponseProducerOptions) {
        this.commandResponseKafkaProducerConfig = new MessagingKafkaProducerConfigProperties(commonOptions, commandResponseProducerOptions);
    }

    @Inject
    void setCommandConsumerKafkaClientOptions(@ConfigMapping(prefix="hono.kafka") CommonKafkaClientOptions commonOptions, @ConfigMapping(prefix="hono.kafka.command") KafkaConsumerOptions consumerOptions) {
        this.kafkaConsumerConfig = new MessagingKafkaConsumerConfigProperties(commonOptions, consumerOptions);
    }

    @Inject
    void setEventKafkaClientOptions(@ConfigMapping(prefix="hono.kafka") CommonKafkaClientOptions commonOptions, @ConfigMapping(prefix="hono.kafka.event") KafkaProducerOptions eventOptions) {
        this.kafkaEventConfig = new MessagingKafkaProducerConfigProperties(commonOptions, eventOptions);
    }

    @Inject
    void setAdminKafkaClientOptions(@ConfigMapping(prefix="hono.kafka") CommonKafkaClientOptions commonOptions, @ConfigMapping(prefix="hono.kafka.cleanup") KafkaAdminClientOptions adminClientOptions) {
        this.kafkaAdminClientConfig = new KafkaAdminClientConfigProperties(commonOptions, adminClientOptions);
    }

    @Inject
    void setNotificationKafkaClientOptions(@ConfigMapping(prefix="hono.kafka") CommonKafkaClientOptions commonOptions, @ConfigMapping(prefix="hono.kafka.notification") KafkaConsumerOptions notificationOptions) {
        this.kafkaNotificationConfig = new NotificationKafkaConsumerConfigProperties(commonOptions, notificationOptions);
    }

    protected void doStart() {
        if (!(this.authenticationService instanceof Verticle)) {
            throw new IllegalStateException("Authentication service must be a vert.x Verticle");
        }
        int instancesToDeploy = this.appConfig.getMaxInstances();
        LOG.info("deploying {} {} instances ...", (Object)instancesToDeploy, (Object)this.componentName);
        HashMap deploymentResult = new HashMap();
        Future authServiceDeploymentTracker = this.vertx.deployVerticle((Verticle)this.authenticationService).onSuccess(ok -> {
            LOG.info("successfully deployed authentication service verticle");
            deploymentResult.put("authentication service verticle", "successfully deployed");
            this.registerHealthCheckProvider(this.authenticationService);
        });
        Future amqpServerDeploymentTracker = this.vertx.deployVerticle(this::amqpServer, new DeploymentOptions().setInstances(instancesToDeploy)).onSuccess(ok -> {
            LOG.info("successfully deployed AMQP server verticle(s)");
            deploymentResult.put("AMQP server verticle(s)", "successfully deployed");
        });
        NotificationReceiver notificationReceiver = this.notificationReceiver(this.kafkaNotificationConfig, this.commandConsumerConnectionConfig, this.pubSubConfigProperties);
        Future notificationReceiverTracker = this.vertx.deployVerticle((Verticle)new WrappedLifecycleComponentVerticle((Lifecycle)notificationReceiver)).onSuccess(ok -> {
            LOG.info("successfully deployed notification receiver verticle(s)");
            deploymentResult.put("notification receiver verticle", "successfully deployed");
        });
        Future topicCleanUpServiceDeploymentTracker = this.createKafkaTopicCleanUpService().map(service -> this.vertx.deployVerticle((Verticle)service).onSuccess(ok -> {
            LOG.info("successfully deployed Kafka topic clean-up service verticle");
            deploymentResult.put("Kafka topic clean-up service verticle", "successfully deployed");
            this.readinessChecks.register(service::checkReadiness);
        })).orElse(Future.succeededFuture());
        Future.all((Future)authServiceDeploymentTracker, (Future)amqpServerDeploymentTracker, (Future)notificationReceiverTracker, (Future)topicCleanUpServiceDeploymentTracker).map(deploymentResult).onComplete((Handler)this.deploymentCheck);
    }

    private CommandRouterAmqpServer amqpServer() {
        CommandRouterAmqpServer server = new CommandRouterAmqpServer();
        server.setConfig(this.amqpServerProperties);
        server.setHealthCheckServer((HealthCheckServer)this.healthCheckServer);
        server.setSaslAuthenticatorFactory(this.saslAuthenticatorFactory);
        server.setTracer(this.tracer);
        server.addEndpoint(this.commandRouterAmqpEndpoint());
        return server;
    }

    private AmqpEndpoint commandRouterAmqpEndpoint() {
        CommandRouterService service = this.commandRouterService();
        DelegatingCommandRouterAmqpEndpoint<CommandRouterService> endpoint = new DelegatingCommandRouterAmqpEndpoint<CommandRouterService>(this.vertx, service){

            public void registerLivenessChecks(HealthCheckHandler handler) {
                Object object = this.service;
                if (object instanceof HealthCheckProvider) {
                    HealthCheckProvider provider = (HealthCheckProvider)object;
                    provider.registerLivenessChecks(handler);
                }
            }

            public void registerReadinessChecks(HealthCheckHandler handler) {
                Object object = this.service;
                if (object instanceof HealthCheckProvider) {
                    HealthCheckProvider provider = (HealthCheckProvider)object;
                    provider.registerReadinessChecks(handler);
                }
            }
        };
        endpoint.setConfiguration(this.amqpServerProperties);
        endpoint.setTracer(this.tracer);
        return endpoint;
    }

    protected abstract DeviceConnectionInfo getDeviceConnectionInfo();

    private CommandRouterService commandRouterService() {
        DeviceRegistrationClient registrationClient = this.registrationClient();
        TenantClient tenantClient = this.tenantClient();
        CommandTargetMapper commandTargetMapper = CommandTargetMapper.create(registrationClient, this.getDeviceConnectionInfo(), this.tracer);
        return new CommandRouterServiceImpl(this.amqpServerProperties, registrationClient, tenantClient, this.getDeviceConnectionInfo(), this.commandConsumerFactoryProvider(tenantClient, commandTargetMapper), this.eventSenderProvider(), this.adapterInstanceStatusService, this.tracer);
    }

    private Optional<InternalKafkaTopicCleanupService> createKafkaTopicCleanUpService() {
        if (!this.appConfig.isKafkaMessagingDisabled() && this.kafkaAdminClientConfig.isConfigured() && !(this.adapterInstanceStatusService instanceof UnknownStatusProvidingService)) {
            return Optional.of(new InternalKafkaTopicCleanupService(this.adapterInstanceStatusService, this.kafkaAdminClientConfig));
        }
        return Optional.empty();
    }

    private MessagingClientProvider<CommandConsumerFactory> commandConsumerFactoryProvider(TenantClient tenantClient, CommandTargetMapper commandTargetMapper) {
        MessagingClientProvider commandConsumerFactoryProvider = new MessagingClientProvider();
        if (!this.appConfig.isKafkaMessagingDisabled() && this.kafkaConsumerConfig.isConfigured() && this.commandResponseKafkaProducerConfig.isConfigured() && this.commandInternalKafkaProducerConfig.isConfigured()) {
            CachingKafkaProducerFactory kafkaProducerFactory = CachingKafkaProducerFactory.sharedFactory((Vertx)this.vertx);
            kafkaProducerFactory.setMetricsSupport(this.kafkaClientMetricsSupport);
            commandConsumerFactoryProvider.setClient((MessagingClient)new KafkaBasedCommandConsumerFactoryImpl(this.vertx, tenantClient, commandTargetMapper, (KafkaProducerFactory<String, Buffer>)kafkaProducerFactory, this.commandInternalKafkaProducerConfig, this.commandResponseKafkaProducerConfig, this.kafkaConsumerConfig, this.metrics, this.kafkaClientMetricsSupport, this.tracer));
        }
        if (!this.appConfig.isAmqpMessagingDisabled() && this.commandConsumerConnectionConfig.isHostConfigured()) {
            commandConsumerFactoryProvider.setClient((MessagingClient)new ProtonBasedCommandConsumerFactoryImpl(HonoConnection.newConnection((Vertx)this.vertx, (ClientConfigProperties)this.commandConsumerConnectionConfig, (Tracer)this.tracer), tenantClient, commandTargetMapper, this.metrics, SendMessageSampler.Factory.noop()));
        }
        if (!this.appConfig.isPubSubMessagingDisabled() && this.pubSubConfigProperties.isProjectIdConfigured()) {
            PubSubMessageHelper.getCredentialsProvider().ifPresentOrElse(provider -> {
                LOG.debug("Configuring Pub/Sub based command consumer factory");
                CachingPubSubPublisherFactory publisherFactory = new CachingPubSubPublisherFactory(this.vertx, this.pubSubConfigProperties.getProjectId(), provider);
                CachingPubSubSubscriberFactory subscriberFactory = new CachingPubSubSubscriberFactory(this.vertx, this.pubSubConfigProperties.getProjectId(), provider);
                commandConsumerFactoryProvider.setClient((MessagingClient)new PubSubBasedCommandConsumerFactoryImpl(this.vertx, tenantClient, this.tracer, (PubSubPublisherFactory)publisherFactory, this.pubSubConfigProperties.getProjectId(), commandTargetMapper, this.metrics, (PubSubSubscriberFactory)subscriberFactory));
            }, () -> LOG.error("Could not initialize Pub/Sub based command consumer factory, no Credentials Provider present."));
        }
        return commandConsumerFactoryProvider;
    }

    private MessagingClientProvider<EventSender> eventSenderProvider() {
        MessagingClientProvider eventSenderProvider = new MessagingClientProvider();
        if (!this.appConfig.isKafkaMessagingDisabled() && this.kafkaEventConfig.isConfigured()) {
            CachingKafkaProducerFactory kafkaProducerFactory = CachingKafkaProducerFactory.sharedFactory((Vertx)this.vertx);
            kafkaProducerFactory.setMetricsSupport(this.kafkaClientMetricsSupport);
            eventSenderProvider.setClient((MessagingClient)new KafkaBasedEventSender(this.vertx, (KafkaProducerFactory)kafkaProducerFactory, this.kafkaEventConfig, true, this.tracer));
        }
        if (!this.appConfig.isAmqpMessagingDisabled() && this.downstreamSenderConfig.isHostConfigured()) {
            eventSenderProvider.setClient((MessagingClient)new ProtonBasedDownstreamSender(HonoConnection.newConnection((Vertx)this.vertx, (ClientConfigProperties)this.downstreamSenderConfig, (Tracer)this.tracer), SendMessageSampler.Factory.noop(), true, false));
        }
        if (!this.appConfig.isPubSubMessagingDisabled() && this.pubSubConfigProperties.isProjectIdConfigured()) {
            PubSubMessageHelper.getCredentialsProvider().ifPresentOrElse(provider -> {
                CachingPubSubPublisherFactory pubSubFactory = new CachingPubSubPublisherFactory(this.vertx, this.pubSubConfigProperties.getProjectId(), provider);
                eventSenderProvider.setClient((MessagingClient)new PubSubBasedDownstreamSender(this.vertx, (PubSubPublisherFactory)pubSubFactory, "event", this.pubSubConfigProperties.getProjectId(), true, this.tracer));
            }, () -> LOG.error("Could not initialize Pub/Sub based downstream sender, no Credentials Provider present."));
        }
        return eventSenderProvider;
    }

    private Cache<Object, RegistrationResult> registrationResponseCache() {
        if (this.registrationResponseCache == null) {
            this.registrationResponseCache = Caches.newCaffeineCache((RequestResponseClientConfigProperties)this.deviceRegistrationClientConfig);
        }
        return this.registrationResponseCache;
    }

    private Cache<Object, TenantResult<TenantObject>> tenantResponseCache() {
        if (this.tenantResponseCache == null) {
            this.tenantResponseCache = Caches.newCaffeineCache((RequestResponseClientConfigProperties)this.tenantClientConfig);
        }
        return this.tenantResponseCache;
    }

    protected DeviceRegistrationClient registrationClient() {
        return new ProtonBasedDeviceRegistrationClient(HonoConnection.newConnection((Vertx)this.vertx, (ClientConfigProperties)this.deviceRegistrationClientConfig, (Tracer)this.tracer), SendMessageSampler.Factory.noop(), this.registrationResponseCache());
    }

    protected TenantClient tenantClient() {
        return new ProtonBasedTenantClient(HonoConnection.newConnection((Vertx)this.vertx, (ClientConfigProperties)this.tenantClientConfig, (Tracer)this.tracer), SendMessageSampler.Factory.noop(), this.tenantResponseCache());
    }
}

