/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine;

import com.sap.cloud.servicesdk.xbem.adapter.amqp10.MessagingServiceImpl;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.MessagingAmqpException;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.AmqpMessagingUtils;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.AuthSettings;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.MessageBaseHandler;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.MessageEndpointFactory;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.oauth.OAuthHandler;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.proxy.ProxyTransportLayer;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.ssl.SslHelper;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.ws.WebSocketTransportLayer;
import com.sap.cloud.servicesdk.xbem.api.MessagingException;
import com.sap.cloud.servicesdk.xbem.api.MessagingRuntimeException;
import com.sap.cloud.servicesdk.xbem.api.MessagingServiceVersion;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.apache.qpid.proton.engine.impl.TransportLayer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageDriver
extends BaseHandler {
    private static final Logger LOG = LoggerFactory.getLogger(MessageDriver.class);
    private static final int SELECTOR_TIMEOUT_MS = 500;
    private static final int DELAYED_LOCAL_CLOSE_TIMEOUT_MS = 500;
    private final Collector collector;
    private final Handler messageHandler;
    private final Selector selector;
    private final Link link;
    private boolean run;
    private CountDownLatch delayedClose;
    private ExecutorService msgDriverExecutor;
    private Exception caughtRunException;
    private ChannelHandler channelHandler;
    private boolean connected = false;
    private AuthSettings authSettings;
    private String username;
    private String password;
    private String proxyHost;
    private int proxyPort;
    private final CountDownLatch connectionLock = new CountDownLatch(2);

    MessageDriver(Collector collector, Link link, MessageBaseHandler messageHandler) throws IOException {
        this.collector = collector;
        this.link = link;
        this.messageHandler = messageHandler;
        this.selector = Selector.open();
    }

    public static MessageDriver createIncoming(String connectionUrl, String remoteAddress, MessageBaseHandler handler) throws IOException {
        Collector collector = Collector.Factory.create();
        Receiver link = new MessageEndpointFactory(collector).incoming(handler.getName(), connectionUrl, remoteAddress);
        return new MessageDriver(collector, (Link)link, handler);
    }

    public static MessageDriver createOutgoing(String connectionUrl, String remoteAddress, MessageBaseHandler handler) throws IOException {
        Collector collector = Collector.Factory.create();
        Sender link = new MessageEndpointFactory(collector).outgoing(handler.getName(), connectionUrl, remoteAddress);
        return new MessageDriver(collector, (Link)link, handler);
    }

    public void close() {
        if (this.isConnected()) {
            LOG.trace("Start close procedure");
            this.link.free();
            this.link.detach();
            this.link.close();
            Session session = this.link.getSession();
            session.close();
            session.getConnection().close();
            LOG.trace("Close procedure finished");
        }
    }

    private void terminateExecutor(ExecutorService service, String name) {
        if (service == null) {
            LOG.trace("No active (got NULL) executor to terminate with name: {}", (Object)name);
            return;
        }
        try {
            service.shutdown();
            service.awaitTermination(1L, TimeUnit.SECONDS);
            LOG.trace("Executed close for executor: {} ({})", (Object)name, (Object)service);
        }
        catch (InterruptedException e) {
            LOG.error("Executor(" + name + ")::termination interrupted: " + e.getMessage(), (Throwable)e);
            Thread.currentThread().interrupt();
        }
    }

    public boolean isClosable() {
        return !this.collector.more();
    }

    public boolean isConnected() {
        if (this.connected) {
            return this.channelHandler != null && this.channelHandler.isConnected();
        }
        return false;
    }

    private void runInternal() {
        this.run = true;
        while (this.run) {
            try {
                this.processEvents();
                this.selector.selectNow();
                this.selector.selectedKeys().clear();
                this.selector.select(500L);
                for (SelectionKey key : this.selector.selectedKeys()) {
                    ChannelHandler selectable = (ChannelHandler)key.attachment();
                    selectable.selected();
                }
            }
            catch (MessagingRuntimeException | IOException e) {
                this.run = false;
                this.caughtRunException = e;
                this.connectionLock.countDown();
            }
        }
    }

    public Future<State> start(int time, TimeUnit timeUnit) {
        if (this.msgDriverExecutor != null) {
            throw new IllegalStateException("MessageDriver already started...");
        }
        this.msgDriverExecutor = Executors.newFixedThreadPool(1);
        this.msgDriverExecutor.submit(this::runInternal);
        return CompletableFuture.supplyAsync(() -> {
            try {
                this.connectionLock.await(time, timeUnit);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (this.getError().isPresent()) {
                this.terminateExecutor(this.msgDriverExecutor, "MessageDriver");
                LOG.error("Failed to start MessageDriver::" + this.caughtRunException.getMessage(), (Throwable)this.caughtRunException);
                return new State(false, this.caughtRunException);
            }
            if (this.isConnected()) {
                return new State(true, null);
            }
            this.terminateExecutor(this.msgDriverExecutor, "MessageDriver");
            String message = String.format("Connection timeout ('%d'ms) reached.", timeUnit.toMillis(time));
            LOG.error("Failed to start MessageDriver:: " + message);
            return new State(false, (Exception)((Object)new MessagingException(message)));
        });
    }

    public Optional<MessagingException> getError() {
        if (this.caughtRunException == null) {
            return Optional.empty();
        }
        return Optional.of(new MessagingException("MessageDriver error: " + this.caughtRunException.getMessage(), (Throwable)this.caughtRunException));
    }

    private void processEvents() {
        Event event;
        while (this.run && (event = this.collector.peek()) != null) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Process event (dispatch to messageHandler {}): {}", (Object)this.messageHandler, (Object)event.getType());
            }
            event.dispatch((Handler)this);
            event.dispatch(this.messageHandler);
            this.collector.pop();
        }
    }

    public void onConnectionRemoteOpen(Event e) {
        this.connected = true;
        this.connectionLock.countDown();
    }

    public void onSessionRemoteClose(Event e) {
        this.connected = false;
        ErrorCondition condition = e.getSession().getRemoteCondition();
        if (condition == null || condition.getCondition() == null) {
            LOG.trace("Remote session closed.");
        } else {
            String message = String.format("Remote session closed due an error: %s", condition);
            this.caughtRunException = new MessagingRuntimeException(message);
            LOG.error(message);
        }
    }

    public void onConnectionUnbound(Event e) {
        this.shutdown(true);
    }

    public void onTransportClosed(Event e) {
        this.shutdown(true);
    }

    public void onConnectionLocalClose(Event e) {
        CompletableFuture.runAsync(() -> {
            this.delayedClose = new CountDownLatch(1);
            this.shutdown(false);
        });
    }

    private void shutdown(boolean force) {
        if (this.delayedClose != null) {
            if (force) {
                this.delayedClose.countDown();
                return;
            }
            try {
                while (this.delayedClose != null) {
                    LOG.trace("Start shutdown with delayed close");
                    if (!this.delayedClose.await(500L, TimeUnit.MILLISECONDS)) {
                        LOG.warn("MessageDriver shutdown without correct remote connection close.");
                    }
                    this.delayedClose = null;
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        if (this.run) {
            LOG.trace("Start shutdown");
            this.run = false;
            this.terminateExecutor(this.msgDriverExecutor, "MessageDriver");
            this.msgDriverExecutor = null;
            LOG.trace("Shutdown successful");
        } else {
            LOG.trace("Not in running state => Shutdown not required (possible double call)");
        }
    }

    public void onTransportError(Event event) {
        this.connectionLock.countDown();
        ErrorCondition condition = event.getTransport().getCondition();
        if (condition == null) {
            condition = new ErrorCondition();
            condition.setDescription("(no description returned)");
        }
        this.caughtRunException = new MessagingAmqpException("TransportError occurred: " + condition.getDescription(), condition);
        LOG.error(String.format("TransportError occurred, desc=[%s], detail=[%s]", condition.getDescription(), condition.getCondition() == null ? "<no detail available>" : condition.getCondition().toString()));
    }

    public void onUnhandled(Event event) {
        LOG.trace("Got unhandled event: {}", (Object)event.getType());
        super.onUnhandled(event);
    }

    public void onTransport(Event event) {
        Transport transport = event.getTransport();
        try {
            ChannelHandler ch = (ChannelHandler)transport.getContext();
            ch.selected();
        }
        catch (MessagingRuntimeException e) {
            LOG.error("Error during transport: " + e.getMessage());
            this.run = false;
            this.channelHandler = null;
            this.caughtRunException = e;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onConnectionLocalOpen(Event event) {
        Connection connection = event.getConnection();
        if (connection.getRemoteState() == EndpointState.UNINITIALIZED) {
            connection.setContainer(UUID.randomUUID().toString());
            try {
                AmqpMessagingUtils.ConnectionParameter parameter = new AmqpMessagingUtils.ConnectionParameter(connection);
                parameter.setAuthSettings(this.authSettings);
                parameter.setUser(this.username);
                parameter.setPassword(this.password);
                parameter.setProxy(this.proxyHost, this.proxyPort);
                this.channelHandler = ChannelHandler.connect(parameter, this.selector);
            }
            catch (IOException e) {
                LOG.error("Error during connection local open: " + e.getMessage());
                this.channelHandler = null;
                this.run = false;
                this.caughtRunException = e;
                this.connectionLock.countDown();
            }
            finally {
                this.connectionLock.countDown();
            }
        }
    }

    public void addAuthSettings(AuthSettings authSettings) {
        this.authSettings = authSettings;
    }

    public void addCredentials(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public void addProxy(String host, int port) {
        this.proxyHost = host;
        this.proxyPort = port;
    }

    public String toString() {
        return "MessageDriver {messageHandler(" + this.messageHandler + "), run=" + this.run + ", inError=" + this.caughtRunException + ", channelHandler=" + this.channelHandler + '}';
    }

    static final class ChannelHandler {
        static final String WS_PROTOCOL_SCHEME = "ws";
        static final String WSS_PROTOCOL_SCHEME = "wss";
        private static final int MAX_FRAME_SIZE_AMQP = 10240;
        final SocketChannel socket;
        final SelectionKey key;
        final Transport transport;
        final AmqpMessagingUtils.ConnectionParameter connectionParameter;

        private ChannelHandler(int ops, AmqpMessagingUtils.ConnectionParameter parameter, Selector selector) throws IOException {
            this.connectionParameter = parameter;
            this.socket = SocketChannel.open();
            this.socket.configureBlocking(false);
            this.key = this.socket.register(selector, ops, this);
            this.transport = this.makeTransport(parameter.getConnection(), this.connectionParameter);
            this.transport.setContext((Object)this);
            String host = this.connectionParameter.getConnectionHost();
            int port = this.connectionParameter.getConnectionPort();
            if (LOG.isDebugEnabled()) {
                LOG.debug("CONNECTING to: {}:{} (use proxy = {})", new Object[]{host, port, this.connectionParameter.isProxyEnabled() ? "true -> " + this.connectionParameter.getHost() + ":" + this.connectionParameter.getPort() : "false"});
            }
            this.socket.connect(new InetSocketAddress(host, port));
        }

        public boolean isConnected() {
            return this.socket.isConnected() || this.socket.isOpen();
        }

        static ChannelHandler connect(AmqpMessagingUtils.ConnectionParameter parameter, Selector selector) throws IOException {
            return new ChannelHandler(8, parameter, selector);
        }

        private Transport makeTransport(Connection connection, AmqpMessagingUtils.ConnectionParameter parameter) throws IOException {
            Transport transport = Transport.Factory.create();
            transport.setMaxFrameSize(10240);
            if (this.isSasl(parameter)) {
                this.enableSasl(transport, parameter);
            }
            if (this.isWebSocketTransport(parameter)) {
                this.enableWebSocket(transport, parameter);
            }
            if (this.isSsl(parameter)) {
                this.enableSsl(transport, parameter);
            }
            if (this.isProxy(parameter)) {
                this.enableProxy(transport, parameter);
            }
            transport.bind(connection);
            return transport;
        }

        private boolean isProxy(AmqpMessagingUtils.ConnectionParameter parameter) {
            String host;
            if (parameter.getProxyHost() != null && parameter.getProxyPort() != -1) {
                return true;
            }
            if (this.isSsl(parameter) && (host = System.getProperty("https.proxyHost")) != null) {
                String portParam = System.getProperty("https.proxyPort");
                int port = Integer.parseInt(portParam);
                parameter.setProxy(host, port);
                return true;
            }
            host = System.getProperty("http.proxyHost");
            if (host != null) {
                String portParam = System.getProperty("http.proxyPort");
                int port = Integer.parseInt(portParam);
                parameter.setProxy(host, port);
                return true;
            }
            return false;
        }

        private void enableProxy(Transport transport, AmqpMessagingUtils.ConnectionParameter connectionParameter) {
            ProxyTransportLayer proxyTransportLayer = new ProxyTransportLayer(connectionParameter.getHost(), connectionParameter.getPort());
            ((TransportInternal)transport).addTransportLayer((TransportLayer)proxyTransportLayer);
        }

        private void enableSasl(Transport transport, AmqpMessagingUtils.ConnectionParameter parameter) {
            Sasl sasl = transport.sasl();
            sasl.client();
            if (parameter.getUser() == null) {
                sasl.setMechanisms(new String[]{"ANONYMOUS"});
                LOG.debug("Enabled SASL without explicit user (use default 'ANONYMOUS').");
            } else if ("ANONYMOUS".equalsIgnoreCase(parameter.getUser())) {
                sasl.setMechanisms(new String[]{"ANONYMOUS"});
                LOG.debug("Enabled SASL with explicit user 'ANONYMOUS'.");
            } else {
                sasl.plain(parameter.getUser(), parameter.getPassword());
                LOG.debug("Enabled SASL with explicit user (" + parameter.getUser() + ") and password set=" + (parameter.getPassword() != null) + ".");
            }
        }

        private boolean isSasl(AmqpMessagingUtils.ConnectionParameter parameter) {
            return parameter.getUser() != null;
        }

        private boolean isSsl(AmqpMessagingUtils.ConnectionParameter parameter) {
            return parameter.isSsl();
        }

        private boolean isWebSocketTransport(AmqpMessagingUtils.ConnectionParameter parameter) {
            String protocol = parameter.getProtocol();
            return WS_PROTOCOL_SCHEME.equals(protocol) || WSS_PROTOCOL_SCHEME.equals(protocol);
        }

        private void enableSsl(Transport transport, AmqpMessagingUtils.ConnectionParameter parameter) throws IOException {
            SslDomain domain = SslDomain.Factory.create();
            domain.init(SslDomain.Mode.CLIENT);
            domain.setSslContext(this.createSslContext(parameter));
            domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER);
            transport.ssl(domain);
        }

        SSLContext createSslContext(AmqpMessagingUtils.ConnectionParameter parameter) throws IOException {
            if (parameter.getSslCertificate() == null) {
                if (parameter.isSslVerify()) {
                    LOG.debug("Enabled SSL without explicit certificate (use default SSLContext).");
                    return SslHelper.getSslContext();
                }
                LOG.debug("Enabled SSL without SSL verification.");
                return SslHelper.getSslContext(parameter.isSslVerify());
            }
            LOG.debug("Enabled SSL with explicit certificate.");
            return SslHelper.getSslContext(parameter.getSslCertificate());
        }

        private void enableWebSocket(Transport transport, AmqpMessagingUtils.ConnectionParameter parameter) throws IOException {
            HashMap<String, String> headers;
            if (transport instanceof TransportInternal) {
                headers = new HashMap<String, String>();
                MessagingServiceVersion messagingServiceVersion = MessagingServiceImpl.getMessagingServiceVersion();
                headers.put("User-Agent", messagingServiceVersion.getProductInfo() + " " + messagingServiceVersion.getRuntimeInfo());
                if (this.isAuthHeaderRequest(parameter)) {
                    headers.put("Authorization", parameter.getAuthSettings().getAuthHeader());
                } else if (this.isOAuthRequest(parameter)) {
                    OAuthHandler oAuthHandler = new OAuthHandler(parameter.getAuthSettings());
                    String token = oAuthHandler.doTokenRequest();
                    headers.put("Authorization", token);
                }
            } else {
                throw new MessagingRuntimeException("WebSocketTransportLayer support only works with Apache Qpid implementation of Transport");
            }
            WebSocketTransportLayer webSocketTransportLayer = new WebSocketTransportLayer(transport.getMaxFrameSize());
            webSocketTransportLayer.configure(parameter.getHost(), parameter.getPath(), parameter.getPort(), "amqp", headers);
            ((TransportInternal)transport).addTransportLayer((TransportLayer)webSocketTransportLayer);
        }

        private boolean isAuthHeaderRequest(AmqpMessagingUtils.ConnectionParameter parameter) {
            return parameter.getAuthSettings() != null && parameter.getAuthSettings().getAuthHeader() != null;
        }

        private boolean isOAuthRequest(AmqpMessagingUtils.ConnectionParameter parameter) {
            AuthSettings authSettings = parameter.getAuthSettings();
            if (authSettings != null) {
                return authSettings.getoAuthGrantType() != null && authSettings.getoAuthClientId() != null && authSettings.getoAuthClientSecret() != null;
            }
            return false;
        }

        boolean update() {
            if (this.socket.isConnected()) {
                int capacity = this.transport.capacity();
                int pending = this.transport.pending();
                if (this.key.isValid()) {
                    int ops = (capacity != 0 ? 1 : 0) | (pending > 0 ? 4 : 0);
                    this.key.interestOps(ops);
                }
                return capacity < 0 && pending < 0;
            }
            return false;
        }

        void selected() {
            if (!this.key.isValid()) {
                return;
            }
            try {
                if (this.key.isConnectable()) {
                    this.selectedHandleConnectable();
                }
                if (this.key.isReadable()) {
                    this.selectedHandleRead();
                }
                if (this.key.isWritable()) {
                    this.selectedHandleWrite();
                }
                if (this.update()) {
                    this.selectedHandleUpdate();
                }
            }
            catch (MessagingException ex) {
                LOG.error(ex.getMessage());
                this.transport.unbind();
                try {
                    this.socket.close();
                }
                catch (IOException closeEx) {
                    String errMessage = String.format("Failed to close socket (%s): %s", new Object[]{this.socket, ex});
                    LOG.error(errMessage);
                    throw new MessagingRuntimeException(errMessage, (Throwable)closeEx);
                }
                throw new MessagingRuntimeException((Throwable)ex);
            }
        }

        private void selectedHandleConnectable() throws MessagingException {
            String remote = null;
            try {
                remote = this.socket.getRemoteAddress().toString();
                this.socket.finishConnect();
            }
            catch (IOException e) {
                String remoteEndpoint = this.connectionParameter.getConnectionHost() + ":" + this.connectionParameter.getConnectionPort();
                String exErrMessage = String.format("Failed to finish connect to endpoint (%s; con=%s (proxy=%b)): %s", remote, remoteEndpoint, this.connectionParameter.isProxyEnabled(), e.getMessage());
                LOG.error(exErrMessage);
                throw new MessagingException(exErrMessage, (Throwable)e);
            }
        }

        private void selectedHandleRead() throws MessagingException {
            try {
                int c = this.transport.capacity();
                if (c > 0) {
                    ByteBuffer tail = this.transport.tail();
                    int n = this.socket.read(tail);
                    if (n > 0) {
                        this.transport.process();
                    } else if (n < 0) {
                        this.transport.close_tail();
                    }
                }
            }
            catch (IOException e) {
                String exErrMessage = String.format("Failed to read from socket (%s): %s", this.socket, e.getMessage());
                LOG.error(exErrMessage);
                throw new MessagingException(exErrMessage, (Throwable)e);
            }
            catch (TransportException e) {
                String errMessage = String.format("Failed during transport with socket (%s): %s", this.socket, e.getMessage());
                LOG.error(errMessage);
                throw new MessagingException(errMessage, (Throwable)e);
            }
        }

        private void selectedHandleWrite() throws MessagingException {
            try {
                int p = this.transport.pending();
                if (p > 0) {
                    ByteBuffer head = this.transport.head();
                    int n = this.socket.write(head);
                    if (n > 0) {
                        this.transport.pop(n);
                    } else if (n < 0) {
                        this.transport.close_head();
                    }
                }
            }
            catch (IOException e) {
                String exErrMessage = String.format("Failed to unbind socket (%s): %s", this.socket, e.getMessage());
                LOG.error(exErrMessage);
                throw new MessagingException(exErrMessage, (Throwable)e);
            }
        }

        private void selectedHandleUpdate() throws MessagingException {
            this.transport.unbind();
            try {
                this.socket.close();
            }
            catch (IOException e) {
                String exErrMessage = String.format("Failed to unbind socket (%s): %s", this.socket, e.getMessage());
                LOG.error(exErrMessage);
                throw new MessagingException(exErrMessage, (Throwable)e);
            }
        }

        public String toString() {
            return "ChannelHandler{connected='" + this.isConnected() + "', connectionParameter=" + this.connectionParameter + '}';
        }
    }

    public final class State {
        private final boolean started;
        private final Exception failure;

        State(boolean started, Exception failure) {
            this.started = started;
            this.failure = failure;
        }

        public boolean isStarted() {
            return this.started;
        }

        public Exception getFailure() {
            return this.failure;
        }
    }
}

