/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.amqp.connection.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.opentracing.Tracer;
import io.opentracing.noop.NoopTracerFactory;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxInternal;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonMessageHandler;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.ProtonSession;
import io.vertx.proton.sasl.MechanismMismatchException;
import io.vertx.proton.sasl.SaslSystemException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLException;
import javax.security.sasl.AuthenticationException;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.amqp.config.ClientConfigProperties;
import org.eclipse.hono.client.amqp.connection.AmqpUtils;
import org.eclipse.hono.client.amqp.connection.ConnectionFactory;
import org.eclipse.hono.client.amqp.connection.DisconnectListener;
import org.eclipse.hono.client.amqp.connection.ErrorConverter;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.HonoProtonHelper;
import org.eclipse.hono.client.amqp.connection.ReconnectListener;
import org.eclipse.hono.client.amqp.connection.impl.DeferredConnectionCheckHandler;
import org.eclipse.hono.util.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class HonoConnectionImpl
implements HonoConnection {
    private static final String MSG_NOT_CONNECTED = "not connected";
    private static final Logger LOG = LoggerFactory.getLogger(HonoConnectionImpl.class);
    private final ClientConfigProperties clientConfigProperties;
    private final Vertx vertx;
    private final List<DisconnectListener<HonoConnection>> disconnectListeners = new ArrayList<DisconnectListener<HonoConnection>>();
    private final List<DisconnectListener<HonoConnection>> oneTimeDisconnectListeners = Collections.synchronizedList(new ArrayList());
    private final List<ReconnectListener<HonoConnection>> reconnectListeners = new ArrayList<ReconnectListener<HonoConnection>>();
    private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
    private final AtomicBoolean disconnecting = new AtomicBoolean(false);
    private final ConnectionFactory connectionFactory;
    private final Object connectionLock = new Object();
    private final AtomicReference<ConnectionAttempt> currentConnectionAttempt = new AtomicReference();
    private final String containerId;
    private final DeferredConnectionCheckHandler deferredConnectionCheckHandler;
    private Context context;
    private ProtonClientOptions lastUsedClientOptions;
    private List<Symbol> offeredCapabilities = Collections.emptyList();
    private Tracer tracer = NoopTracerFactory.create();
    private ProtonSession session;
    private ProtonConnection connection;

    public HonoConnectionImpl(Vertx vertx, ClientConfigProperties clientConfigProperties) {
        this(vertx, clientConfigProperties, null);
    }

    public HonoConnectionImpl(Vertx vertx, ClientConfigProperties clientConfigProperties, ConnectionFactory connectionFactory) {
        Objects.requireNonNull(vertx);
        Objects.requireNonNull(clientConfigProperties);
        this.vertx = vertx;
        this.deferredConnectionCheckHandler = new DeferredConnectionCheckHandler(vertx);
        this.connectionFactory = connectionFactory != null ? connectionFactory : ConnectionFactory.newConnectionFactory(this.vertx, clientConfigProperties);
        this.containerId = ConnectionFactory.createContainerId(clientConfigProperties.getName(), clientConfigProperties.getServerRole(), UUID.randomUUID());
        this.clientConfigProperties = new ClientConfigProperties(clientConfigProperties);
    }

    @Override
    public Vertx getVertx() {
        return this.vertx;
    }

    public void setTracer(Tracer opentracingTracer) {
        this.tracer = Objects.requireNonNull(opentracingTracer);
    }

    @Override
    public Tracer getTracer() {
        return this.tracer;
    }

    @Override
    public ClientConfigProperties getConfig() {
        return this.clientConfigProperties;
    }

    @Override
    public void addDisconnectListener(DisconnectListener<HonoConnection> listener) {
        this.disconnectListeners.add(listener);
    }

    @Override
    public void addReconnectListener(ReconnectListener<HonoConnection> listener) {
        this.reconnectListeners.add(listener);
    }

    @Override
    public <T> Future<T> executeOnContext(Handler<Promise<T>> codeToRun) {
        if (this.context == null) {
            return Future.failedFuture((Throwable)new ServerErrorException(503, MSG_NOT_CONNECTED));
        }
        return Futures.executeOnContextWithSameRoot((Context)this.context, codeToRun);
    }

    @Override
    public Future<Void> isConnected() {
        return this.executeOnContext(this::checkConnected);
    }

    private Future<Void> checkConnected() {
        Promise result = Promise.promise();
        this.checkConnected((Handler<AsyncResult<Void>>)result);
        return result.future();
    }

    private void checkConnected(Handler<AsyncResult<Void>> resultHandler) {
        if (this.isConnectedInternal()) {
            resultHandler.handle((Object)Future.succeededFuture());
        } else {
            resultHandler.handle((Object)Future.failedFuture((Throwable)new ServerErrorException(503, MSG_NOT_CONNECTED)));
        }
    }

    @Override
    public Future<Void> isConnected(long waitForCurrentConnectAttemptTimeout) {
        return this.executeOnContext(result -> this.checkConnected((Handler<AsyncResult<Void>>)result, waitForCurrentConnectAttemptTimeout));
    }

    private void checkConnected(Handler<AsyncResult<Void>> resultHandler, long waitForCurrentConnectAttemptTimeout) {
        if (this.isConnectedInternal()) {
            resultHandler.handle((Object)Future.succeededFuture());
        } else if (waitForCurrentConnectAttemptTimeout > 0L && this.deferredConnectionCheckHandler.isConnectionAttemptInProgress()) {
            LOG.debug("connection attempt to server [{}:{}] in progress, connection check will be completed with its result", (Object)this.connectionFactory.getHost(), (Object)this.connectionFactory.getPort());
            boolean added = this.deferredConnectionCheckHandler.addConnectionCheck(resultHandler, waitForCurrentConnectAttemptTimeout);
            if (!added) {
                this.checkConnected(resultHandler);
            }
        } else {
            resultHandler.handle((Object)Future.failedFuture((Throwable)new ServerErrorException(503, MSG_NOT_CONNECTED)));
        }
    }

    private boolean isConnectedInternal() {
        return this.connection != null && !this.connection.isDisconnected() && this.session != null;
    }

    @Override
    public boolean isShutdown() {
        return this.shuttingDown.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setConnection(ProtonConnection connection, ProtonSession session) {
        Object object = this.connectionLock;
        synchronized (object) {
            this.connection = connection;
            this.session = session;
            if (connection == null) {
                this.offeredCapabilities = Collections.emptyList();
                this.context = null;
            } else {
                this.offeredCapabilities = Optional.ofNullable(connection.getRemoteOfferedCapabilities()).map(caps -> Collections.unmodifiableList(Arrays.asList(caps))).orElse(Collections.emptyList());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean supportsCapability(Symbol capability) {
        if (capability == null) {
            return false;
        }
        Object object = this.connectionLock;
        synchronized (object) {
            return this.offeredCapabilities.contains(capability);
        }
    }

    @Override
    public Future<HonoConnection> connect() {
        return this.connect(null);
    }

    @Override
    public Future<HonoConnection> connect(ProtonClientOptions options) {
        Promise result = Promise.promise();
        this.connect(options, (Handler<AsyncResult<HonoConnection>>)result, false);
        return result.future();
    }

    private void connect(ProtonClientOptions options, Handler<AsyncResult<HonoConnection>> connectionHandler, boolean isReconnect) {
        if (this.shuttingDown.get()) {
            connectionHandler.handle((Object)Future.failedFuture((Throwable)new ClientErrorException(409, "client is already shut down")));
            return;
        }
        this.context = this.vertx.getOrCreateContext();
        LOG.trace("running on vert.x context [event-loop context: {}]", (Object)this.context.isEventLoopContext());
        this.executeOnContext(ignore -> {
            if (this.isConnectedInternal()) {
                LOG.debug("already connected to server [{}:{}, role: {}]", new Object[]{this.connectionFactory.getHost(), this.connectionFactory.getPort(), this.connectionFactory.getServerRole()});
                connectionHandler.handle((Object)Future.succeededFuture((Object)this));
            } else {
                ConnectionAttempt connectionAttempt = new ConnectionAttempt(options, connectionHandler);
                if (connectionAttempt.start(isReconnect)) {
                    this.lastUsedClientOptions = options;
                } else {
                    LOG.debug("already trying to connect to server ...");
                    connectionHandler.handle((Object)Future.failedFuture((Throwable)new ClientErrorException(409, "already connecting to server")));
                }
            }
        });
    }

    private void onRemoteClose(AsyncResult<ProtonConnection> remoteClose) {
        if (remoteClose.failed()) {
            LOG.info("remote server [{}:{}, role: {}] closed connection: {}", new Object[]{this.connectionFactory.getHost(), this.connectionFactory.getPort(), this.connectionFactory.getServerRole(), remoteClose.cause().getMessage()});
        } else {
            LOG.info("remote server [{}:{}, role: {}] closed connection", new Object[]{this.connectionFactory.getHost(), this.connectionFactory.getPort(), this.connectionFactory.getServerRole()});
        }
        this.connection.disconnectHandler(null);
        this.connection.close();
        this.handleConnectionLoss();
    }

    private void onRemoteDisconnect(ProtonConnection con) {
        if (con != this.connection) {
            LOG.warn("cannot handle failure of unknown connection");
        } else {
            LOG.debug("lost connection to server [{}:{}, role: {}]", new Object[]{this.connectionFactory.getHost(), this.connectionFactory.getPort(), this.connectionFactory.getServerRole()});
            this.handleConnectionLoss();
        }
    }

    private void handleConnectionLoss() {
        if (this.isConnectedInternal()) {
            this.connection.disconnect();
        }
        this.notifyDisconnectHandlers();
        this.clearState();
        if (!this.shuttingDown.get() && this.clientConfigProperties.getReconnectAttempts() != 0) {
            this.connect(this.lastUsedClientOptions, (Handler<AsyncResult<HonoConnection>>)((Handler)this::notifyReconnectHandlers), true);
        }
    }

    private void notifyReconnectHandlers(AsyncResult<HonoConnection> reconnectAttempt) {
        if (reconnectAttempt.succeeded()) {
            for (ReconnectListener<HonoConnection> listener : this.reconnectListeners) {
                listener.onReconnect(this);
            }
        }
    }

    private void clearState() {
        this.setConnection(null, null);
    }

    private void notifyDisconnectHandlers() {
        for (DisconnectListener<HonoConnection> listener : this.disconnectListeners) {
            this.notifyDisconnectHandler(listener);
        }
        Iterator<DisconnectListener<HonoConnection>> iter = this.oneTimeDisconnectListeners.iterator();
        while (iter.hasNext()) {
            this.notifyDisconnectHandler(iter.next());
            iter.remove();
        }
    }

    private void notifyDisconnectHandler(DisconnectListener<HonoConnection> listener) {
        try {
            listener.onDisconnect(this);
        }
        catch (Exception ex) {
            LOG.warn("error executing disconnectHandler", (Throwable)ex);
        }
    }

    long getReconnectMaxDelay(int reconnectAttempt) {
        if (reconnectAttempt <= 0) {
            return 0L;
        }
        if (reconnectAttempt <= 31) {
            long currentMaxDelay = (long)(1 << reconnectAttempt - 1) * this.clientConfigProperties.getReconnectDelayIncrement();
            return currentMaxDelay >= 0L ? Math.min(this.clientConfigProperties.getReconnectMaxDelay(), currentMaxDelay) : this.clientConfigProperties.getReconnectMaxDelay();
        }
        return this.clientConfigProperties.getReconnectMaxDelay();
    }

    @Override
    public void closeAndFree(ProtonLink<?> link, Handler<Void> closeHandler) {
        if (this.context == null) {
            closeHandler.handle(null);
        } else {
            HonoProtonHelper.closeAndFree(this.context, link, closeHandler);
        }
    }

    @Override
    public void closeAndFree(ProtonLink<?> link, long detachTimeOut, Handler<Void> closeHandler) {
        if (this.context == null) {
            closeHandler.handle(null);
        } else {
            HonoProtonHelper.closeAndFree(this.context, link, detachTimeOut, closeHandler);
        }
    }

    @Override
    public Future<ProtonSender> createSender(String targetAddress, ProtonQoS qos, Handler<String> closeHook) {
        Objects.requireNonNull(qos);
        return this.executeOnContext(result -> this.checkConnected().compose(v -> {
            if (targetAddress == null && !this.supportsCapability(AmqpUtils.CAP_ANONYMOUS_RELAY)) {
                return Future.failedFuture((Throwable)new ServerErrorException(501, "server does not support anonymous terminus"));
            }
            Promise senderPromise = Promise.promise();
            ProtonSender sender = this.session.createSender(targetAddress);
            sender.setQoS(qos);
            sender.setAutoSettle(true);
            DisconnectListener<HonoConnection> disconnectBeforeOpenListener = con -> {
                LOG.debug("opening sender [{}] failed: got disconnected", (Object)targetAddress);
                senderPromise.tryFail((Throwable)new ServerErrorException(503, MSG_NOT_CONNECTED));
            };
            this.oneTimeDisconnectListeners.add(disconnectBeforeOpenListener);
            sender.openHandler(senderOpen -> {
                this.oneTimeDisconnectListeners.remove(disconnectBeforeOpenListener);
                if (senderPromise.future().isComplete()) {
                    LOG.debug("ignoring server response for opening sender [{}]: sender creation already timed out", (Object)targetAddress);
                } else if (senderOpen.failed()) {
                    ErrorCondition error = sender.getRemoteCondition();
                    if (error == null) {
                        LOG.debug("opening sender [{}] failed", (Object)targetAddress, (Object)senderOpen.cause());
                        senderPromise.tryFail((Throwable)new ClientErrorException(404, "cannot open sender", senderOpen.cause()));
                    } else {
                        LOG.debug("opening sender [{}] failed: {} - {}", new Object[]{targetAddress, error.getCondition(), error.getDescription()});
                        senderPromise.tryFail((Throwable)ErrorConverter.fromAttachError(error));
                    }
                } else if (HonoProtonHelper.isLinkEstablished(sender)) {
                    LOG.debug("sender open [target: {}, sendQueueFull: {}, remote max-message-size: {}]", new Object[]{targetAddress, sender.sendQueueFull(), sender.getRemoteMaxMessageSize()});
                    long remoteMaxMessageSize = Optional.ofNullable(sender.getRemoteMaxMessageSize()).map(UnsignedLong::longValue).orElse(0L);
                    if (remoteMaxMessageSize > 0L && remoteMaxMessageSize < this.clientConfigProperties.getMinMaxMessageSize()) {
                        sender.close();
                        String msg = String.format("peer does not support minimum max-message-size [required: %d, supported: %d", this.clientConfigProperties.getMinMaxMessageSize(), remoteMaxMessageSize);
                        LOG.debug(msg);
                        senderPromise.tryFail((Throwable)new ClientErrorException(412, msg));
                    } else if (sender.getCredit() <= 0) {
                        long waitOnCreditsTimerId = this.vertx.setTimer(this.clientConfigProperties.getFlowLatency(), timerID -> {
                            LOG.debug("sender [target: {}] has {} credits after grace period of {}ms", new Object[]{targetAddress, sender.getCredit(), this.clientConfigProperties.getFlowLatency()});
                            sender.sendQueueDrainHandler(null);
                            senderPromise.tryComplete((Object)sender);
                        });
                        sender.sendQueueDrainHandler(replenishedSender -> {
                            LOG.debug("sender [target: {}] has received {} initial credits", (Object)targetAddress, (Object)replenishedSender.getCredit());
                            if (this.vertx.cancelTimer(waitOnCreditsTimerId)) {
                                result.tryComplete(replenishedSender);
                                replenishedSender.sendQueueDrainHandler(null);
                            }
                        });
                    } else {
                        senderPromise.tryComplete((Object)sender);
                    }
                } else {
                    LOG.debug("peer did not create terminus for target [{}] and will detach the link", (Object)targetAddress);
                    senderPromise.tryFail((Throwable)new ServerErrorException(503));
                }
            });
            HonoProtonHelper.setDetachHandler(sender, remoteDetached -> this.onRemoteDetach((ProtonLink<?>)sender, this.connection.getRemoteContainer(), false, closeHook));
            HonoProtonHelper.setCloseHandler(sender, remoteClosed -> this.onRemoteDetach((ProtonLink<?>)sender, this.connection.getRemoteContainer(), true, closeHook));
            sender.open();
            this.vertx.setTimer(this.clientConfigProperties.getLinkEstablishmentTimeout(), tid -> {
                boolean notOpenedAndNotDisconnectedYet = this.oneTimeDisconnectListeners.remove(disconnectBeforeOpenListener);
                if (notOpenedAndNotDisconnectedYet) {
                    this.onLinkEstablishmentTimeout((ProtonLink<?>)sender, this.clientConfigProperties, (Promise<?>)senderPromise);
                }
            });
            return senderPromise.future();
        }).onComplete((Handler)result));
    }

    @Override
    public Future<ProtonReceiver> createReceiver(String sourceAddress, ProtonQoS qos, ProtonMessageHandler messageHandler, Handler<String> remoteCloseHook) {
        return this.createReceiver(sourceAddress, qos, messageHandler, this.clientConfigProperties.getInitialCredits(), remoteCloseHook);
    }

    @Override
    public Future<ProtonReceiver> createReceiver(String sourceAddress, ProtonQoS qos, ProtonMessageHandler messageHandler, int preFetchSize, Handler<String> remoteCloseHook) {
        return this.createReceiver(sourceAddress, qos, messageHandler, preFetchSize, true, remoteCloseHook);
    }

    @Override
    public Future<ProtonReceiver> createReceiver(String sourceAddress, ProtonQoS qos, ProtonMessageHandler messageHandler, int preFetchSize, boolean autoAccept, Handler<String> remoteCloseHook) {
        Objects.requireNonNull(sourceAddress);
        Objects.requireNonNull(qos);
        Objects.requireNonNull(messageHandler);
        if (preFetchSize < 0) {
            throw new IllegalArgumentException("pre-fetch size must be >= 0");
        }
        return this.executeOnContext(result -> this.checkConnected().compose(v -> {
            Promise receiverPromise = Promise.promise();
            ProtonReceiver receiver = this.session.createReceiver(sourceAddress);
            if (this.clientConfigProperties.getMaxMessageSize() > -1L) {
                receiver.setMaxMessageSize(new UnsignedLong(this.clientConfigProperties.getMaxMessageSize()));
            }
            receiver.setAutoAccept(autoAccept);
            receiver.setQoS(qos);
            receiver.setPrefetch(preFetchSize);
            receiver.handler((delivery, message) -> {
                HonoProtonHelper.onReceivedMessageDeliveryUpdatedFromRemote(delivery, (Handler<ProtonDelivery>)((Handler)d -> LOG.debug("got unexpected disposition update for received message [remote state: {}]", (Object)delivery.getRemoteState())));
                try {
                    messageHandler.handle(delivery, message);
                    if (LOG.isTraceEnabled()) {
                        int remainingCredits = receiver.getCredit() - receiver.getQueued();
                        LOG.trace("handling message [remotely settled: {}, queued messages: {}, remaining credit: {}]", new Object[]{delivery.remotelySettled(), receiver.getQueued(), remainingCredits});
                    }
                }
                catch (Exception ex) {
                    LOG.warn("error handling message", (Throwable)ex);
                    ProtonHelper.released((ProtonDelivery)delivery, (boolean)true);
                }
            });
            DisconnectListener<HonoConnection> disconnectBeforeOpenListener = con -> {
                LOG.debug("opening receiver [{}] failed: got disconnected", (Object)sourceAddress);
                receiverPromise.tryFail((Throwable)new ServerErrorException(503, MSG_NOT_CONNECTED));
            };
            this.oneTimeDisconnectListeners.add(disconnectBeforeOpenListener);
            receiver.openHandler(recvOpen -> {
                this.oneTimeDisconnectListeners.remove(disconnectBeforeOpenListener);
                if (receiverPromise.future().isComplete()) {
                    LOG.debug("ignoring server response for opening receiver [{}]: receiver creation already timed out", (Object)sourceAddress);
                } else if (recvOpen.failed()) {
                    ErrorCondition error = receiver.getRemoteCondition();
                    if (error == null) {
                        LOG.debug("opening receiver [{}] failed", (Object)sourceAddress, (Object)recvOpen.cause());
                        receiverPromise.tryFail((Throwable)new ClientErrorException(404, "cannot open receiver", recvOpen.cause()));
                    } else {
                        LOG.debug("opening receiver [{}] failed: {} - {}", new Object[]{sourceAddress, error.getCondition(), error.getDescription()});
                        receiverPromise.tryFail((Throwable)ErrorConverter.fromAttachError(error));
                    }
                } else if (HonoProtonHelper.isLinkEstablished(receiver)) {
                    LOG.debug("receiver open [source: {}]", (Object)sourceAddress);
                    receiverPromise.tryComplete((Object)((ProtonReceiver)recvOpen.result()));
                } else {
                    LOG.debug("peer did not create terminus for source [{}] and will detach the link", (Object)sourceAddress);
                    receiverPromise.tryFail((Throwable)new ServerErrorException(503));
                }
            });
            HonoProtonHelper.setDetachHandler(receiver, remoteDetached -> this.onRemoteDetach((ProtonLink<?>)receiver, this.connection.getRemoteContainer(), false, remoteCloseHook));
            HonoProtonHelper.setCloseHandler(receiver, remoteClosed -> this.onRemoteDetach((ProtonLink<?>)receiver, this.connection.getRemoteContainer(), true, remoteCloseHook));
            receiver.open();
            this.vertx.setTimer(this.clientConfigProperties.getLinkEstablishmentTimeout(), tid -> {
                boolean notOpenedAndNotDisconnectedYet = this.oneTimeDisconnectListeners.remove(disconnectBeforeOpenListener);
                if (notOpenedAndNotDisconnectedYet) {
                    this.onLinkEstablishmentTimeout((ProtonLink<?>)receiver, this.clientConfigProperties, (Promise<?>)receiverPromise);
                }
            });
            return receiverPromise.future();
        }).onComplete((Handler)result));
    }

    private void onLinkEstablishmentTimeout(ProtonLink<?> link, ClientConfigProperties clientConfig, Promise<?> result) {
        if (link.isOpen() && !HonoProtonHelper.isLinkEstablished(link)) {
            LOG.info("link establishment [peer: {}] timed out after {}ms", (Object)clientConfig.getHost(), (Object)clientConfig.getLinkEstablishmentTimeout());
            link.close();
            result.tryFail((Throwable)new ServerErrorException(503));
        }
    }

    private void onRemoteDetach(ProtonLink<?> link, String remoteContainer, boolean closed, Handler<String> closeHook) {
        String address;
        ErrorCondition error = link.getRemoteCondition();
        String type = link instanceof ProtonSender ? "sender" : "receiver";
        String string = address = link instanceof ProtonSender ? link.getTarget().getAddress() : link.getSource().getAddress();
        if (error == null) {
            LOG.debug("{} [{}] detached (with closed={}) by peer [{}]", new Object[]{type, address, closed, remoteContainer});
        } else {
            LOG.debug("{} [{}] detached (with closed={}) by peer [{}]: {} - {}", new Object[]{type, address, closed, remoteContainer, error.getCondition(), error.getDescription()});
        }
        link.close();
        if (HonoProtonHelper.isLinkEstablished(link) && closeHook != null) {
            closeHook.handle((Object)address);
        }
    }

    @Override
    public void shutdown() {
        CountDownLatch latch = Context.isOnEventLoopThread() ? null : new CountDownLatch(1);
        this.shutdown((Handler<AsyncResult<Void>>)((Handler)done -> {
            if (!done.succeeded()) {
                LOG.warn("could not close connection to server", done.cause());
            }
            if (latch != null) {
                latch.countDown();
            }
        }));
        if (latch != null) {
            try {
                int timeout = this.clientConfigProperties.getCloseConnectionTimeout() + 20;
                if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
                    LOG.warn("shutdown of client timed out after {}ms", (Object)timeout);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public void shutdown(Handler<AsyncResult<Void>> completionHandler) {
        Objects.requireNonNull(completionHandler);
        this.cancelCurrentConnectionAttempt("client is getting shut down");
        if (this.shuttingDown.compareAndSet(Boolean.FALSE, Boolean.TRUE)) {
            this.closeConnection(completionHandler);
        } else {
            completionHandler.handle((Object)Future.failedFuture((Throwable)new ClientErrorException(409, "already shutting down")));
        }
    }

    @Override
    public void disconnect() {
        CountDownLatch latch = Context.isOnEventLoopThread() ? null : new CountDownLatch(1);
        this.disconnect((Handler<AsyncResult<Void>>)((Handler)disconnectResult -> {
            if (disconnectResult.succeeded()) {
                LOG.info("successfully disconnected from the server [{}:{}]", (Object)this.connectionFactory.getHost(), (Object)this.connectionFactory.getPort());
            } else {
                LOG.warn("could not disconnect from the server [{}:{}]", (Object)this.connectionFactory.getHost(), (Object)this.connectionFactory.getPort());
            }
            if (latch != null) {
                latch.countDown();
            }
        }));
        if (latch != null) {
            try {
                int timeout = this.clientConfigProperties.getCloseConnectionTimeout() + 20;
                if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
                    LOG.warn("Disconnecting from server [{}:{}, role: {}] timed out after {}ms", new Object[]{this.connectionFactory.getHost(), this.connectionFactory.getPort(), this.connectionFactory.getServerRole(), timeout});
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public void disconnect(Handler<AsyncResult<Void>> completionHandler) {
        Objects.requireNonNull(completionHandler);
        this.cancelCurrentConnectionAttempt("client got disconnected");
        if (this.disconnecting.compareAndSet(Boolean.FALSE, Boolean.TRUE)) {
            this.closeConnection(completionHandler);
        } else {
            completionHandler.handle((Object)Future.failedFuture((Throwable)new ClientErrorException(409, "already disconnecting")));
        }
    }

    private void cancelCurrentConnectionAttempt(String errorMessage) {
        Optional.ofNullable(this.currentConnectionAttempt.get()).ifPresent(attempt -> attempt.cancel(errorMessage));
    }

    @Override
    public String getRemoteContainerId() {
        if (!this.isConnectedInternal()) {
            return null;
        }
        return this.connection.getRemoteContainer();
    }

    @Override
    public String getContainerId() {
        return this.containerId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeConnection(Handler<AsyncResult<Void>> completionHandler) {
        Handler handler = attempt -> {
            this.disconnecting.compareAndSet(Boolean.TRUE, Boolean.FALSE);
            completionHandler.handle(attempt);
        };
        Object object = this.connectionLock;
        synchronized (object) {
            if (this.isConnectedInternal()) {
                ProtonConnection connectionToClose = this.connection;
                LOG.info("closing connection to container [{}] at [{}:{}, role: {}] ...", new Object[]{connectionToClose.getRemoteContainer(), this.connectionFactory.getHost(), this.connectionFactory.getPort(), this.connectionFactory.getServerRole()});
                HonoProtonHelper.closeConnection(connectionToClose, this.clientConfigProperties.getCloseConnectionTimeout(), this.context).onSuccess(con -> LOG.info("closed connection to container [{}] at [{}:{}, role: {}]", new Object[]{connectionToClose.getRemoteContainer(), this.connectionFactory.getHost(), this.connectionFactory.getPort(), this.connectionFactory.getServerRole()})).onFailure(t -> LOG.info("closed connection to container [{}] at [{}:{}, role: {}]", new Object[]{connectionToClose.getRemoteContainer(), this.connectionFactory.getHost(), this.connectionFactory.getPort(), this.connectionFactory.getServerRole(), t})).map((Object)null).onComplete(result -> {
                    this.notifyDisconnectHandlers();
                    this.clearState();
                }).onComplete(completionHandler);
            } else {
                LOG.info("connection to server [{}:{}, role: {}] already closed", new Object[]{this.connectionFactory.getHost(), this.connectionFactory.getPort(), this.connectionFactory.getServerRole()});
                handler.handle((Object)Future.succeededFuture());
            }
        }
    }

    private class ConnectionAttempt {
        private final ProtonClientOptions clientOptions;
        private final Handler<AsyncResult<HonoConnection>> connectionHandler;
        private final AtomicInteger connectAttempts = new AtomicInteger(0);
        private final AtomicBoolean cancelled = new AtomicBoolean();
        private Long reconnectTimerId;

        ConnectionAttempt(ProtonClientOptions clientOptions, Handler<AsyncResult<HonoConnection>> connectionHandler) {
            this.clientOptions = clientOptions;
            this.connectionHandler = ar -> {
                if (ar.failed()) {
                    HonoConnectionImpl.this.clearState();
                }
                HonoConnectionImpl.this.currentConnectionAttempt.compareAndSet(this, null);
                connectionHandler.handle(ar);
                HonoConnectionImpl.this.deferredConnectionCheckHandler.setConnectionAttemptFinished((AsyncResult<HonoConnection>)ar);
            };
        }

        public boolean start(boolean isReconnect) {
            if (!HonoConnectionImpl.this.currentConnectionAttempt.compareAndSet(null, this)) {
                return false;
            }
            HonoConnectionImpl.this.deferredConnectionCheckHandler.setConnectionAttemptInProgress();
            if (isReconnect) {
                this.reconnect(null);
            } else {
                this.connect();
            }
            return true;
        }

        public void cancel(String errorMessage) {
            if (HonoConnectionImpl.this.currentConnectionAttempt.get() != this || !this.cancelled.compareAndSet(false, true)) {
                return;
            }
            boolean timerCancelled = Optional.ofNullable(this.reconnectTimerId).map(arg_0 -> ((Vertx)HonoConnectionImpl.this.vertx).cancelTimer(arg_0)).orElse(false);
            LOG.debug("cancelled {} connection attempt [#{}] to server [{}:{}, role: {}]", new Object[]{timerCancelled ? "upcoming" : "ongoing", this.connectAttempts.get() + 1, HonoConnectionImpl.this.connectionFactory.getHost(), HonoConnectionImpl.this.connectionFactory.getPort(), HonoConnectionImpl.this.connectionFactory.getServerRole()});
            ClientErrorException ex = new ClientErrorException(409, errorMessage);
            this.connectionHandler.handle((Object)Future.failedFuture((Throwable)ex));
        }

        private void connect() {
            if (this.cancelled.get()) {
                return;
            }
            LOG.debug("starting attempt [#{}] to connect to server [{}:{}, role: {}]", new Object[]{this.connectAttempts.get() + 1, HonoConnectionImpl.this.connectionFactory.getHost(), HonoConnectionImpl.this.connectionFactory.getPort(), HonoConnectionImpl.this.connectionFactory.getServerRole()});
            HonoConnectionImpl.this.connectionFactory.connect(this.clientOptions, null, null, HonoConnectionImpl.this.containerId, (Handler<AsyncResult<ProtonConnection>>)((Handler)HonoConnectionImpl.this::onRemoteClose), (Handler<ProtonConnection>)((Handler)HonoConnectionImpl.this::onRemoteDisconnect)).onFailure(this::reconnect).onSuccess(newConnection -> {
                if (this.cancelled.get()) {
                    LOG.debug("attempt [#{}]: connected but will directly be closed because attempt got cancelled; server [{}:{}, role: {}]", new Object[]{this.connectAttempts.get() + 1, HonoConnectionImpl.this.connectionFactory.getHost(), HonoConnectionImpl.this.connectionFactory.getPort(), HonoConnectionImpl.this.connectionFactory.getServerRole()});
                    newConnection.closeHandler(null);
                    newConnection.disconnectHandler(null);
                    newConnection.close();
                } else {
                    LOG.debug("attempt [#{}]: connected to server [{}:{}, role: {}]; remote container: {}", new Object[]{this.connectAttempts.get() + 1, HonoConnectionImpl.this.connectionFactory.getHost(), HonoConnectionImpl.this.connectionFactory.getPort(), HonoConnectionImpl.this.connectionFactory.getServerRole(), newConnection.getRemoteContainer()});
                    ProtonSession defaultSession = this.createDefaultSession((ProtonConnection)newConnection);
                    HonoConnectionImpl.this.setConnection((ProtonConnection)newConnection, defaultSession);
                    this.connectionHandler.handle((Object)Future.succeededFuture((Object)HonoConnectionImpl.this));
                }
            });
        }

        private ProtonSession createDefaultSession(ProtonConnection connection) {
            if (connection == null) {
                throw new IllegalStateException("no connection to create session for");
            }
            LOG.debug("establishing AMQP session with server [{}:{}, role: {}]", new Object[]{HonoConnectionImpl.this.connectionFactory.getHost(), HonoConnectionImpl.this.connectionFactory.getPort(), HonoConnectionImpl.this.connectionFactory.getServerRole()});
            ProtonSession newSession = connection.createSession();
            newSession.closeHandler(remoteClose -> {
                StringBuilder msgBuilder = new StringBuilder("the connection's session closed unexpectedly");
                Optional.ofNullable(newSession.getRemoteCondition()).ifPresent(error -> msgBuilder.append(String.format(" [condition: %s, description: %s]", error.getCondition(), error.getDescription())));
                newSession.close();
                HonoConnectionImpl.this.onRemoteClose((AsyncResult<ProtonConnection>)Future.failedFuture((String)msgBuilder.toString()));
            });
            newSession.setIncomingCapacity(HonoConnectionImpl.this.clientConfigProperties.getMaxSessionWindowSize());
            newSession.open();
            return newSession;
        }

        @SuppressFBWarnings(value={"PREDICTABLE_RANDOM"}, justification="The values returned by the ThreadLocalRandom are only used for calculating a\nrandom amount of time to wait before trying to reconnect.\n")
        private void reconnect(Throwable connectionFailureCause) {
            VertxInternal vertxInternal;
            Vertx vertx;
            if (this.cancelled.get()) {
                return;
            }
            if (connectionFailureCause != null) {
                this.logConnectionError(connectionFailureCause);
            }
            if ((vertx = HonoConnectionImpl.this.vertx) instanceof VertxInternal && (vertxInternal = (VertxInternal)vertx).closeFuture().isClosed()) {
                LOG.info("stopping attempts to re-connect to server [{}:{}, role: {}], vertx instance is closed", new Object[]{HonoConnectionImpl.this.connectionFactory.getHost(), HonoConnectionImpl.this.connectionFactory.getPort(), HonoConnectionImpl.this.connectionFactory.getServerRole()});
                this.connectionHandler.handle((Object)Future.failedFuture((Throwable)this.mapConnectionAttemptFailure(connectionFailureCause)));
            } else if (HonoConnectionImpl.this.clientConfigProperties.getReconnectAttempts() - this.connectAttempts.get() == 0) {
                LOG.info("max number of attempts [{}] to re-connect to server [{}:{}, role: {}] have been made, giving up", new Object[]{HonoConnectionImpl.this.clientConfigProperties.getReconnectAttempts(), HonoConnectionImpl.this.connectionFactory.getHost(), HonoConnectionImpl.this.connectionFactory.getPort(), HonoConnectionImpl.this.connectionFactory.getServerRole()});
                this.connectionHandler.handle((Object)Future.failedFuture((Throwable)this.mapConnectionAttemptFailure(connectionFailureCause)));
            } else {
                long reconnectDelay;
                int reconnectAttempt = this.connectAttempts.getAndIncrement();
                long reconnectMaxDelay = HonoConnectionImpl.this.getReconnectMaxDelay(reconnectAttempt);
                long l = reconnectDelay = reconnectMaxDelay > HonoConnectionImpl.this.clientConfigProperties.getReconnectMinDelay() ? ThreadLocalRandom.current().nextLong(HonoConnectionImpl.this.clientConfigProperties.getReconnectMinDelay(), reconnectMaxDelay) : HonoConnectionImpl.this.clientConfigProperties.getReconnectMinDelay();
                if (reconnectDelay > 0L) {
                    LOG.trace("scheduling new connection attempt in {}ms ...", (Object)reconnectDelay);
                    this.reconnectTimerId = HonoConnectionImpl.this.vertx.setTimer(reconnectDelay, tid -> {
                        this.reconnectTimerId = null;
                        this.connect();
                    });
                } else {
                    this.connect();
                }
            }
        }

        private ServiceInvocationException mapConnectionAttemptFailure(Throwable connectionFailureCause) {
            Object serviceInvocationException = connectionFailureCause == null ? new ServerErrorException(503, "failed to connect") : (connectionFailureCause instanceof AuthenticationException ? new ClientErrorException(401, "failed to authenticate with server") : (connectionFailureCause instanceof MechanismMismatchException ? new ClientErrorException(401, "no suitable SASL mechanism found for authentication with server") : (connectionFailureCause instanceof SSLException ? new ClientErrorException(400, "TLS handshake with server failed: " + connectionFailureCause.getMessage(), connectionFailureCause) : new ServerErrorException(503, "failed to connect", connectionFailureCause))));
            return serviceInvocationException;
        }

        private void logConnectionError(Throwable connectionFailureCause) {
            if (this.isNoteworthyConnectionError(connectionFailureCause)) {
                LOG.warn("attempt [#{}] to connect to server [{}:{}, role: {}] failed", new Object[]{this.connectAttempts.get() + 1, HonoConnectionImpl.this.clientConfigProperties.getHost(), HonoConnectionImpl.this.clientConfigProperties.getPort(), HonoConnectionImpl.this.connectionFactory.getServerRole(), connectionFailureCause});
            } else {
                LOG.debug("attempt [#{}] to connect to server [{}:{}, role: {}] failed", new Object[]{this.connectAttempts.get() + 1, HonoConnectionImpl.this.clientConfigProperties.getHost(), HonoConnectionImpl.this.clientConfigProperties.getPort(), HonoConnectionImpl.this.connectionFactory.getServerRole(), connectionFailureCause});
            }
        }

        private boolean isNoteworthyConnectionError(Throwable connectionFailureCause) {
            SaslSystemException saslSystemEx;
            return connectionFailureCause instanceof SSLException || connectionFailureCause instanceof AuthenticationException || connectionFailureCause instanceof MechanismMismatchException || connectionFailureCause instanceof SaslSystemException && (saslSystemEx = (SaslSystemException)connectionFailureCause).isPermanent();
        }
    }
}

