/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.service.command;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonDelivery;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.impl.HonoClientImpl;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.service.auth.device.Device;
import org.eclipse.hono.service.command.CommandConfigProperties;
import org.eclipse.hono.service.command.CommandConnection;
import org.eclipse.hono.service.command.CommandConsumer;
import org.eclipse.hono.service.command.CommandResponseSender;
import org.eclipse.hono.service.command.CommandResponseSenderImpl;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandConnectionImpl
extends HonoClientImpl
implements CommandConnection {
    private final Logger LOG = LoggerFactory.getLogger(this.getClass());
    private final Map<String, MessageConsumer> commandReceivers = new HashMap<String, MessageConsumer>();

    public CommandConnectionImpl(Vertx vertx, CommandConfigProperties clientConfigProperties) {
        super(vertx, (ClientConfigProperties)clientConfigProperties);
    }

    protected void clearState() {
        super.clearState();
        this.commandReceivers.clear();
    }

    @Override
    public final Future<MessageConsumer> getOrCreateCommandConsumer(String tenantId, String deviceId, BiConsumer<ProtonDelivery, Message> commandConsumer, Handler<Void> closeHandler) {
        MessageConsumer messageConsumer = this.commandReceivers.get(Device.asAddress(tenantId, deviceId));
        if (messageConsumer != null) {
            Future result = Future.future();
            result.complete((Object)messageConsumer);
            return result;
        }
        return this.createConsumer(tenantId, () -> this.newCommandConsumer(tenantId, deviceId, commandConsumer, closeHandler));
    }

    private Future<MessageConsumer> newCommandConsumer(String tenantId, String deviceId, BiConsumer<ProtonDelivery, Message> messageConsumer, Handler<Void> closeHandler) {
        return this.checkConnected().compose(con -> {
            Future result = Future.future();
            CommandConsumer.create(this.context, this.clientConfigProperties, this.connection, tenantId, deviceId, messageConsumer, (Handler<String>)((Handler)closeHook -> this.closeCommandConsumer(tenantId, deviceId)), (Handler<AsyncResult<MessageConsumer>>)((Handler)creation -> {
                if (creation.succeeded()) {
                    this.commandReceivers.put(Device.asAddress(tenantId, deviceId), (MessageConsumer)creation.result());
                }
                result.complete(creation.result());
            }));
            return result;
        });
    }

    @Override
    public Future<Void> closeCommandConsumer(String tenantId, String deviceId) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        String deviceAddress = Device.asAddress(tenantId, deviceId);
        MessageConsumer commandReceiverLink = this.commandReceivers.get(deviceAddress);
        Future future = Future.future();
        if (commandReceiverLink != null) {
            commandReceiverLink.close(closeHandler -> {
                if (closeHandler.failed()) {
                    this.LOG.error("Command receiver link close failed: {}", closeHandler.cause());
                    future.fail(closeHandler.cause());
                } else {
                    future.complete();
                }
            });
            this.commandReceivers.remove(deviceAddress);
        } else {
            this.LOG.debug("Command receiver should be closed but could not be found for tenant: [{}], device: [{}] - possibly already closed?", (Object)tenantId, (Object)deviceId);
            future.fail("Command receiver should be closed but could not be found for tenant");
        }
        return future;
    }

    @Override
    public Future<CommandResponseSender> getOrCreateCommandResponseSender(String tenantId, String replyId) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(replyId);
        Future result = Future.future();
        this.getOrCreateSender(CommandResponseSenderImpl.getTargetAddress(tenantId, replyId), () -> this.createCommandResponseSender(tenantId, replyId)).setHandler(h -> {
            if (h.succeeded()) {
                result.complete((Object)((CommandResponseSender)h.result()));
            } else {
                result.fail(h.cause());
            }
        });
        return result;
    }

    private Future<MessageSender> createCommandResponseSender(String tenantId, String replyId) {
        return this.checkConnected().compose(connected -> {
            Future result = Future.future();
            CommandResponseSenderImpl.create(this.context, this.clientConfigProperties, this.connection, tenantId, replyId, (Handler<String>)((Handler)onSenderClosed -> this.activeSenders.remove(CommandResponseSenderImpl.getTargetAddress(tenantId, replyId))), (Handler<AsyncResult<MessageSender>>)result.completer());
            return result;
        });
    }

    @Override
    public Future<Void> closeCommandResponseSender(String tenantId, String replyId) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(replyId);
        MessageSender commandResponseSender = (MessageSender)this.activeSenders.get(CommandResponseSenderImpl.getTargetAddress(tenantId, replyId));
        Future future = Future.future();
        if (commandResponseSender != null) {
            commandResponseSender.close(closeHandler -> {
                if (closeHandler.failed()) {
                    this.LOG.error("Command response sender link close failed: {}", closeHandler.cause());
                    future.fail(closeHandler.cause());
                } else {
                    future.complete();
                }
            });
        } else {
            this.LOG.error("Command response sender should be closed but could not be found for tenant: [{}], replyId: [{}]", (Object)tenantId, (Object)replyId);
            future.fail("Command response sender should be closed but could not be found");
        }
        return future;
    }

    @Override
    public Future<Void> closeCommandResponseSenders(String tenantId, String deviceId) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Future future = Future.future();
        String controlAddress = ResourceIdentifier.from((String)"control", (String)tenantId, (String)deviceId).toString();
        Set keys = this.activeSenders.keySet();
        for (String key : keys) {
            if (!key.startsWith(controlAddress)) continue;
            Future sub = Future.future();
            ((MessageSender)this.activeSenders.get(key)).close(c -> {
                if (c.succeeded()) {
                    sub.succeeded();
                } else {
                    sub.fail(c.cause());
                }
            });
            future.compose(v -> sub);
        }
        return future;
    }
}

