/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.vertx.example.base;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import java.io.IOException;
import java.lang.invoke.CallSite;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.eclipse.hono.application.client.ApplicationClient;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageConsumer;
import org.eclipse.hono.application.client.MessageContext;
import org.eclipse.hono.application.client.TimeUntilDisconnectNotification;
import org.eclipse.hono.application.client.amqp.AmqpApplicationClient;
import org.eclipse.hono.application.client.amqp.ProtonBasedApplicationClient;
import org.eclipse.hono.application.client.kafka.impl.KafkaApplicationClientImpl;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.amqp.config.ClientConfigProperties;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.kafka.CommonKafkaClientConfigProperties;
import org.eclipse.hono.client.kafka.consumer.MessagingKafkaConsumerConfigProperties;
import org.eclipse.hono.client.kafka.producer.CachingKafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.vertx.example.base.HonoExampleConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressFBWarnings(value={"HARD_CODE_PASSWORD", "PREDICTABLE_RANDOM"}, justification="We use the default passwords of the Hono Sandbox installation throughout this class\nfor ease of use. The passwords are publicly documented and do not affect any\nprivate installations of Hono.\nThe values returned by the Random are only used as arbitrary values in example message\npayload.\n")
public class HonoExampleApplicationBase {
    public static final String HONO_CLIENT_USER = System.getProperty("username", "consumer@HONO");
    public static final String HONO_CLIENT_PASSWORD = System.getProperty("password", "verysecret");
    public static final Boolean USE_PLAIN_CONNECTION = Boolean.valueOf(System.getProperty("plain.connection", "false"));
    public static final Boolean SEND_ONE_WAY_COMMANDS = Boolean.valueOf(System.getProperty("sendOneWayCommands", "false"));
    public static final Boolean USE_KAFKA = Boolean.valueOf(System.getProperty("kafka", "false"));
    private static final Logger LOG = LoggerFactory.getLogger(HonoExampleApplicationBase.class);
    private static final String KAFKA_CONSUMER_GROUP_ID = "hono-example-application";
    private static final String COMMAND_SEND_LIFECYCLE_INFO = "sendLifecycleInfo";
    private static final Random RAND = new Random();
    private final Vertx vertx = Vertx.vertx();
    private final ApplicationClient<? extends MessageContext> client;
    private final int port;
    private final Map<String, Handler<Void>> periodicCommandSenderTimerCancelerMap = new HashMap<String, Handler<Void>>();
    private final Map<String, TimeUntilDisconnectNotification> pendingTtdNotification = new HashMap<String, TimeUntilDisconnectNotification>();
    private MessageConsumer eventConsumer;
    private MessageConsumer telemetryConsumer;

    public HonoExampleApplicationBase() {
        if (USE_KAFKA.booleanValue()) {
            this.port = HonoExampleConstants.HONO_KAFKA_CONSUMER_PORT;
            this.client = this.createKafkaApplicationClient();
        } else {
            this.port = HonoExampleConstants.HONO_AMQP_CONSUMER_PORT;
            this.client = this.createAmqpApplicationClient();
        }
    }

    private ApplicationClient<? extends MessageContext> createAmqpApplicationClient() {
        ClientConfigProperties props = new ClientConfigProperties();
        props.setLinkEstablishmentTimeout(5000L);
        props.setHost(HonoExampleConstants.HONO_MESSAGING_HOST);
        props.setPort(this.port);
        if (!USE_PLAIN_CONNECTION.booleanValue()) {
            props.setUsername(HONO_CLIENT_USER);
            props.setPassword(HONO_CLIENT_PASSWORD);
            props.setTlsEnabled(true);
            props.setServerRole("AMQP Messaging Network");
            props.setTrustStorePath("target/config/hono-demo-certs-jar/trusted-certs.pem");
            props.setHostnameVerificationRequired(false);
        }
        return new ProtonBasedApplicationClient(HonoConnection.newConnection((Vertx)this.vertx, (ClientConfigProperties)props));
    }

    private ApplicationClient<? extends MessageContext> createKafkaApplicationClient() {
        HashMap<String, CallSite> properties = new HashMap<String, CallSite>();
        properties.put("bootstrap.servers", (CallSite)((Object)(HonoExampleConstants.HONO_MESSAGING_HOST + ":" + this.port)));
        CommonKafkaClientConfigProperties commonClientConfig = new CommonKafkaClientConfigProperties();
        commonClientConfig.setCommonClientConfig(properties);
        MessagingKafkaConsumerConfigProperties consumerConfig = new MessagingKafkaConsumerConfigProperties();
        consumerConfig.setCommonClientConfig(commonClientConfig);
        consumerConfig.setConsumerConfig(Map.of("group.id", KAFKA_CONSUMER_GROUP_ID));
        MessagingKafkaProducerConfigProperties producerConfig = new MessagingKafkaProducerConfigProperties();
        producerConfig.setCommonClientConfig(commonClientConfig);
        CachingKafkaProducerFactory producerFactory = CachingKafkaProducerFactory.sharedFactory((Vertx)this.vertx);
        return new KafkaApplicationClientImpl(this.vertx, consumerConfig, (KafkaProducerFactory)producerFactory, producerConfig);
    }

    protected void consumeData() {
        CompletableFuture startup = new CompletableFuture();
        if (this.client instanceof AmqpApplicationClient) {
            AmqpApplicationClient ac = (AmqpApplicationClient)this.client;
            ac.addDisconnectListener(c -> LOG.info("lost connection to Hono, trying to reconnect ..."));
            ac.addReconnectListener(c -> LOG.info("reconnected to Hono"));
        }
        Promise readyTracker = Promise.promise();
        this.client.addOnClientReadyHandler((Handler)readyTracker);
        this.client.start().compose(ok -> readyTracker.future()).compose(v -> Future.all(this.createEventConsumer(), this.createTelemetryConsumer())).onSuccess(ok -> startup.complete(this.client)).onFailure(startup::completeExceptionally);
        try {
            startup.join();
            LOG.info("Consumer ready for telemetry and event messages");
            System.in.read();
        }
        catch (CompletionException e) {
            LOG.error("{} consumer failed to start [{}:{}]", new Object[]{USE_KAFKA != false ? "Kafka" : "AMQP", HonoExampleConstants.HONO_MESSAGING_HOST, this.port, e.getCause()});
        }
        catch (IOException e) {
            // empty catch block
        }
        CompletableFuture shutDown = new CompletableFuture();
        ArrayList closeFutures = new ArrayList();
        Optional.ofNullable(this.eventConsumer).map(MessageConsumer::close).ifPresent(closeFutures::add);
        Optional.ofNullable(this.telemetryConsumer).map(MessageConsumer::close).ifPresent(closeFutures::add);
        Optional.of(this.client).map(Lifecycle::stop).ifPresent(closeFutures::add);
        Future.join(closeFutures).compose(ok -> this.vertx.close()).recover(t -> this.vertx.close()).onComplete(ar -> shutDown.complete(this.client));
        shutDown.join();
        LOG.info("Consumer has been shut down");
    }

    private Future<MessageConsumer> createEventConsumer() {
        return this.client.createEventConsumer("DEFAULT_TENANT", msg -> {
            msg.getTimeUntilDisconnectNotification().ifPresent(this::handleCommandReadinessNotification);
            HonoExampleApplicationBase.handleEventMessage((DownstreamMessage<? extends MessageContext>)msg);
        }, cause -> LOG.error("event consumer closed by remote", cause)).onSuccess(consumer -> {
            this.eventConsumer = consumer;
        });
    }

    private Future<MessageConsumer> createTelemetryConsumer() {
        return this.client.createTelemetryConsumer("DEFAULT_TENANT", msg -> {
            msg.getTimeUntilDisconnectNotification().ifPresent(this::handleCommandReadinessNotification);
            HonoExampleApplicationBase.handleTelemetryMessage((DownstreamMessage<? extends MessageContext>)msg);
        }, cause -> LOG.error("telemetry consumer closed by remote", cause)).onSuccess(consumer -> {
            this.telemetryConsumer = consumer;
        });
    }

    private void handleCommandReadinessNotification(TimeUntilDisconnectNotification notification) {
        if (notification.getTtd() <= 0) {
            this.handlePermanentlyConnectedCommandReadinessNotification(notification);
        } else {
            LOG.info("Device is ready to receive a command : [{}].", (Object)notification);
            this.sendCommand(notification);
        }
    }

    private void handlePermanentlyConnectedCommandReadinessNotification(TimeUntilDisconnectNotification notification) {
        String keyForDevice = notification.getTenantAndDeviceId();
        TimeUntilDisconnectNotification previousNotification = this.pendingTtdNotification.get(keyForDevice);
        if (previousNotification != null) {
            if (notification.getCreationTime().isAfter(previousNotification.getCreationTime())) {
                LOG.info("Set new ttd value [{}] of notification for [{}]", (Object)notification.getTtd(), (Object)notification.getTenantAndDeviceId());
                this.pendingTtdNotification.put(keyForDevice, notification);
            } else {
                LOG.trace("Received notification for [{}] that was already superseded by newer [{}]", (Object)notification, (Object)previousNotification);
            }
        } else {
            this.pendingTtdNotification.put(keyForDevice, notification);
            this.vertx.setTimer(1000L, timerId -> {
                LOG.debug("Handle device notification for [{}].", (Object)notification.getTenantAndDeviceId());
                TimeUntilDisconnectNotification notificationToHandle = this.pendingTtdNotification.remove(keyForDevice);
                if (notificationToHandle != null) {
                    if (notificationToHandle.getTtd() == -1) {
                        LOG.info("Device notified as being ready to receive a command until further notice : [{}].", (Object)notificationToHandle);
                        this.cancelPeriodicCommandSender(notification);
                        this.sendCommand(notificationToHandle);
                        this.vertx.setPeriodic((long)HonoExampleConstants.COMMAND_INTERVAL_FOR_DEVICES_CONNECTED_WITH_UNLIMITED_EXPIRY * 1000L, id -> {
                            this.sendCommand(notificationToHandle);
                            this.setPeriodicCommandSenderTimerCanceler((Long)id, notification);
                        });
                    } else {
                        LOG.info("Device notified as not being ready to receive a command (anymore) : [{}].", (Object)notification);
                        this.cancelPeriodicCommandSender(notificationToHandle);
                        LOG.debug("Device will not receive further commands : [{}].", (Object)notification.getTenantAndDeviceId());
                    }
                }
            });
        }
    }

    private void sendCommand(TimeUntilDisconnectNotification notification) {
        if (SEND_ONE_WAY_COMMANDS.booleanValue()) {
            this.sendOneWayCommandToAdapter(notification.getTenantId(), notification.getDeviceId(), notification);
        } else {
            this.sendCommandToAdapter(notification.getTenantId(), notification.getDeviceId(), notification);
        }
    }

    private Duration calculateCommandTimeout(TimeUntilDisconnectNotification notification) {
        if (notification.getTtd() == -1) {
            return Duration.ofMillis((long)HonoExampleConstants.COMMAND_INTERVAL_FOR_DEVICES_CONNECTED_WITH_UNLIMITED_EXPIRY * 1000L);
        }
        return Duration.ofMillis(notification.getMillisecondsUntilExpiry());
    }

    private void setPeriodicCommandSenderTimerCanceler(Long timerId, TimeUntilDisconnectNotification ttdNotification) {
        this.periodicCommandSenderTimerCancelerMap.put(ttdNotification.getTenantAndDeviceId(), (Handler<Void>)((Handler)v -> {
            this.vertx.cancelTimer(timerId.longValue());
            this.periodicCommandSenderTimerCancelerMap.remove(ttdNotification.getTenantAndDeviceId());
        }));
    }

    private void cancelPeriodicCommandSender(TimeUntilDisconnectNotification notification) {
        if (this.isPeriodicCommandSenderActiveForDevice(notification)) {
            LOG.debug("Cancelling periodic sender for {}", (Object)notification.getTenantAndDeviceId());
            this.periodicCommandSenderTimerCancelerMap.get(notification.getTenantAndDeviceId()).handle(null);
        } else {
            LOG.debug("Wanted to cancel periodic sender for {}, but could not find one", (Object)notification.getTenantAndDeviceId());
        }
    }

    private boolean isPeriodicCommandSenderActiveForDevice(TimeUntilDisconnectNotification notification) {
        return this.periodicCommandSenderTimerCancelerMap.containsKey(notification.getTenantAndDeviceId());
    }

    private void sendCommandToAdapter(String tenantId, String deviceId, TimeUntilDisconnectNotification ttdNotification) {
        Duration commandTimeout = this.calculateCommandTimeout(ttdNotification);
        Buffer commandBuffer = HonoExampleApplicationBase.buildCommandPayload();
        String command = "setBrightness";
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending command [{}] to [{}].", (Object)"setBrightness", (Object)ttdNotification.getTenantAndDeviceId());
        }
        this.client.sendCommand(tenantId, deviceId, "setBrightness", commandBuffer, "application/json", null, commandTimeout, null).onSuccess(result -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully sent command payload: [{}].", (Object)commandBuffer.toString());
                LOG.debug("And received response: [{}].", (Object)Optional.ofNullable(result.getPayload()).orElseGet(Buffer::buffer).toString());
            }
        }).onFailure(t -> {
            if (t instanceof ServiceInvocationException) {
                int errorCode = ((ServiceInvocationException)t).getErrorCode();
                LOG.debug("Command was replied with error code [{}].", (Object)errorCode);
            } else {
                LOG.debug("Could not send command : {}.", (Object)t.getMessage());
            }
        });
    }

    private void sendOneWayCommandToAdapter(String tenantId, String deviceId, TimeUntilDisconnectNotification ttdNotification) {
        Buffer commandBuffer = HonoExampleApplicationBase.buildOneWayCommandPayload();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending one-way command [{}] to [{}].", (Object)COMMAND_SEND_LIFECYCLE_INFO, (Object)ttdNotification.getTenantAndDeviceId());
        }
        this.client.sendOneWayCommand(tenantId, deviceId, COMMAND_SEND_LIFECYCLE_INFO, commandBuffer).onSuccess(statusResult -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully sent one-way command payload: [{}] and received status [{}].", (Object)commandBuffer.toString(), statusResult);
            }
        }).onFailure(t -> {
            if (t instanceof ServiceInvocationException) {
                int errorCode = ((ServiceInvocationException)t).getErrorCode();
                LOG.debug("One-way command was replied with error code [{}].", (Object)errorCode);
            } else {
                LOG.debug("Could not send one-way command : {}.", (Object)t.getMessage());
            }
        });
    }

    private static Buffer buildCommandPayload() {
        JsonObject jsonCmd = new JsonObject().put("brightness", (Object)RAND.nextInt(100));
        return Buffer.buffer((String)jsonCmd.encodePrettily());
    }

    private static Buffer buildOneWayCommandPayload() {
        JsonObject jsonCmd = new JsonObject().put("info", (Object)"app restarted.");
        return Buffer.buffer((String)jsonCmd.encodePrettily());
    }

    private static void handleTelemetryMessage(DownstreamMessage<? extends MessageContext> msg) {
        LOG.debug("received telemetry data [tenant: {}, device: {}, content-type: {}]: [{}].", new Object[]{msg.getTenantId(), msg.getDeviceId(), msg.getContentType(), msg.getPayload()});
    }

    private static void handleEventMessage(DownstreamMessage<? extends MessageContext> msg) {
        LOG.debug("received event [tenant: {}, device: {}, content-type: {}]: [{}].", new Object[]{msg.getTenantId(), msg.getDeviceId(), msg.getContentType(), msg.getPayload()});
    }
}

