/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine;

import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.AmqpMessagingUtils;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.MessageBaseHandler;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.MessageConfig;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.MessageDriver;
import com.sap.cloud.servicesdk.xbem.api.Message;
import com.sap.cloud.servicesdk.xbem.api.MessagingException;
import com.sap.cloud.servicesdk.xbem.api.MessagingRuntimeException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Stack;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.reactor.FlowController;
import org.apache.qpid.proton.reactor.Handshaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageSubscription
extends MessageBaseHandler {
    private static final Logger LOG = LoggerFactory.getLogger(MessageSubscription.class);
    private static final int LOG_WAIT_MESSAGE_EVERY_X = 10;
    private final AmqpMessagingUtils messagingUtils = new AmqpMessagingUtils();
    private final Object messageDriverLock = new Object();
    private static final int MAX_WAIT_TIME = 10000;
    private static final int RETRY_COUNTER = 10;
    private static final int RETRY_COUNTER_DAEMON_MODE = -1;
    private static final int DAEMON_MODE_TIME = -1;
    private final Stack<Message<?>> messageStack = new Stack();
    private int messageGetWaitTimeInMs = -1;
    private boolean daemonMode;
    private boolean active = false;
    private String clientId;
    MessageDriver messageDriver;

    private MessageSubscription(String clientId, String connectionUrl, String endpoint) {
        super("rec::" + clientId, connectionUrl, endpoint);
        this.clientId = clientId;
    }

    public static MessageSubscription with(String clientId, MessageConfig config) {
        MessageSubscription subscription = new MessageSubscription(clientId, config.getConnectionUrl(), config.getEndpointAddress());
        subscription.initBaseConfig(config);
        subscription.messageGetWaitTimeInMs = config.getMessageGetWaitTimeInMs();
        return subscription;
    }

    @Override
    protected void logError(String message, Object ... test) {
        LOG.error(String.format(message, test));
    }

    @Override
    public boolean isActive() {
        if (this.daemonMode) {
            return this.isConnected() && this.active;
        }
        return this.isConnected() && (this.active || !this.messageStack.isEmpty());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isConnected() {
        Object object = this.messageDriverLock;
        synchronized (object) {
            return this.messageDriver != null && this.messageDriver.isConnected();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isClosed() {
        Object object = this.messageDriverLock;
        synchronized (object) {
            return this.messageDriver == null;
        }
    }

    @Override
    public void connect() throws MessagingException {
        this.init(this.messageGetWaitTimeInMs);
    }

    public void onLinkInit(Event e) {
        Receiver receiver = e.getReceiver();
        if (receiver == null) {
            throw new MessagingRuntimeException("No receiver found on link init.");
        }
        this.updateLinkReliableMode((Link)receiver);
    }

    public void onDelivery(Event event) {
        Receiver receiver = event.getReceiver();
        if (LOG.isTraceEnabled()) {
            LOG.trace("Message arrived at: {}", (Object)receiver.getName());
        }
        Delivery delivery = receiver.current();
        delivery.setDefaultDeliveryState((DeliveryState)this.getDefaultDeliveryState());
        if (delivery.isReadable() && !delivery.isPartial()) {
            int size = delivery.pending();
            byte[] buffer = new byte[size];
            int read = receiver.recv(buffer, 0, buffer.length);
            org.apache.qpid.proton.message.Message amqpMessage = Proton.message();
            amqpMessage.decode(buffer, 0, read);
            byte[] content = this.getMessageContent(amqpMessage);
            Message emMessage = this.createMessage(receiver, amqpMessage, content);
            this.messageStack.push(emMessage);
            receiver.advance();
            delivery.settle();
            if (LOG.isTraceEnabled()) {
                LOG.trace("Handled and settled message arrived at: {}", (Object)receiver.getName());
            }
        } else if (LOG.isTraceEnabled()) {
            LOG.trace("Got delivery of message (@{}) with: readable={}; partial={}", new Object[]{receiver.getName(), delivery.isReadable(), delivery.isPartial()});
        }
    }

    private Message createMessage(Receiver receiver, org.apache.qpid.proton.message.Message amqpMessage, byte[] content) {
        return this.messagingUtils.createMessage(this.endpoint, receiver, amqpMessage, content);
    }

    private Accepted getDefaultDeliveryState() {
        return Accepted.getInstance();
    }

    private byte[] getMessageContent(org.apache.qpid.proton.message.Message message) {
        Section body = message.getBody();
        if (body instanceof Data) {
            Data data = (Data)body;
            if (LOG.isTraceEnabled()) {
                LOG.trace("Received 'Data' message => [{}]", (Object)this.toLoggable(data));
            }
            return data.getValue().getArray();
        }
        if (body instanceof AmqpValue) {
            AmqpValue value = (AmqpValue)body;
            if (LOG.isTraceEnabled()) {
                LOG.trace("Received 'AmqpValue' message => [{}]", value.getValue());
            }
            String stringValue = value.getValue().toString();
            return stringValue.getBytes(StandardCharsets.UTF_8);
        }
        throw new MessagingRuntimeException("Unsupported message content of type: " + (body == null ? "NULL" : body.getClass()));
    }

    private String toLoggable(Data data) {
        int len = data.getValue().getLength();
        if (len > 20) {
            return data.getValue().subBinary(0, 20).toString() + "[l=" + len + "]";
        }
        return data.getValue().toString() + "[l=" + len + "]";
    }

    public Message getOrWait() throws MessagingException {
        if (this.daemonMode) {
            return this.getOrWait(-1);
        }
        return this.getOrWait(10);
    }

    private Message getOrWait(int counter) throws MessagingException {
        this.active = true;
        while (this.active) {
            if (!this.messageStack.isEmpty()) {
                return this.messageStack.pop();
            }
            if (counter == 0) {
                this.active = false;
                return null;
            }
            if (LOG.isTraceEnabled() && counter % 10 == 0) {
                LOG.trace("[" + this.clientId + "->" + this.endpoint + "] " + "Waiting for messages (" + this.messageGetWaitTimeInMs + "ms (" + (10 - counter) + ")).");
            }
            this.silentWait(this.messageGetWaitTimeInMs);
            --counter;
            if (this.lastError == null) continue;
            throw this.lastError;
        }
        return null;
    }

    private void init(int waitTimeInMs) throws MessagingException {
        boolean bl = this.daemonMode = waitTimeInMs < 0;
        this.messageGetWaitTimeInMs = waitTimeInMs <= 0 ? 1000 : (waitTimeInMs < 10000 ? waitTimeInMs : 10000) / 10;
        this.add((Handler)new Handshaker());
        this.add((Handler)new FlowController());
        this.initMessageDriver();
    }

    private void initMessageDriver() throws MessagingException {
        if (this.messageDriver == null) {
            try {
                MessageDriver driver = MessageDriver.createIncoming(this.connectionUrl, this.endpoint, this);
                driver.addAuthSettings(this.authSettings);
                if (this.username != null) {
                    driver.addCredentials(this.username, this.password);
                }
                driver.addProxy(this.proxyHost, this.proxyPort);
                LOG.trace("Created MessageDriver for service connectionUrl='{}' and endpoint='{}'{}.", new Object[]{this.connectionUrl, this.endpoint, this.authSettings == null ? "" : " and enabled 'authSettings'"});
                MessageDriver.State state = driver.start(this.initialConnectionTimeoutMs, TimeUnit.MILLISECONDS).get();
                if (!state.isStarted()) {
                    throw new MessagingException("Messaging receiving connection (endpoint=" + this.endpoint + "; url=" + this.connectionUrl + ") init failed: " + state.getFailure().getMessage(), (Throwable)state.getFailure());
                }
                this.messageDriver = driver;
            }
            catch (IOException | InterruptedException | ExecutionException e) {
                LOG.error("Failed to start message driver with error: " + e.getMessage(), (Throwable)e);
                this.close(500L, TimeUnit.MILLISECONDS);
                throw new MessagingException("Messaging Service connection init failed due an unexpected error. Check logs for further details.");
            }
        }
    }

    private void silentWait(int waitInMs) {
        try {
            TimeUnit.MILLISECONDS.sleep(waitInMs);
        }
        catch (InterruptedException e) {
            LOG.warn("Silent messageGetWaitTimeInMs was interrupted -> keep quiet");
            Thread.currentThread().interrupt();
        }
    }

    public void close() throws MessagingException {
        this.close(5L, TimeUnit.SECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(long time, TimeUnit unit) throws MessagingException {
        Object object = this.messageDriverLock;
        synchronized (object) {
            LOG.info("Shutdown subscription.");
            if (this.messageDriver == null) {
                LOG.info("No subscription started?! Do nothing.");
            } else {
                this.messageDriver.close();
                this.messageDriver = null;
                LOG.info("Closed subscription.");
                this.active = false;
                super.close(time, unit);
            }
        }
    }
}

