/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine;

import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.MessageBaseHandler;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.MessageConfig;
import com.sap.cloud.servicesdk.xbem.adapter.amqp10.driver.engine.MessageDriver;
import com.sap.cloud.servicesdk.xbem.api.Message;
import com.sap.cloud.servicesdk.xbem.api.MessagingException;
import com.sap.cloud.servicesdk.xbem.api.MessagingRuntimeException;
import com.sap.cloud.servicesdk.xbem.api.MessagingSetting;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessagePublisher
extends MessageBaseHandler {
    private static final Logger LOG = LoggerFactory.getLogger(MessagePublisher.class);
    private final int FLOW_CONTROL_SLEEP_STEPS_MS = 75;
    private final int FLOW_CONTROL_MAX_TILL_ERROR = 1000;
    private int maxFlowMessages = MessageConfig.DEFAULT_MAX_MESSAGES_IN_FLOW;
    private int flowControlResumeThreshold = 300;
    private int currentMessagesInFlow = 0;
    private Sender activeSender;
    private boolean publisherActive = false;
    private int lastSendTag = 0;
    MessageDriver messageDriver;
    private ScheduledExecutorService shutdownService;
    private final CountDownLatch messageDriverOpen = new CountDownLatch(1);

    private MessagePublisher(String connectionUrl, String endpoint) {
        super("snd", connectionUrl, endpoint);
        this.messagingReliableMode = MessagingSetting.MessagingReliableMode.AT_LEAST_ONCE;
    }

    public static MessagePublisher with(MessageConfig config) {
        MessagePublisher publisher = new MessagePublisher(config.getConnectionUrl(), config.getEndpointAddress());
        publisher.initBaseConfig(config);
        publisher.maxFlowMessages = config.getMaxMessagesInFlow();
        if (publisher.maxFlowMessages <= 0) {
            publisher.maxFlowMessages = 0;
        }
        publisher.flowControlResumeThreshold = publisher.maxFlowMessages / 10;
        publisher.initialConnectionTimeoutMs = config.getInitialConnectionTimeoutMs();
        return publisher;
    }

    @Override
    public boolean isConnected() {
        if (this.publisherActive && this.activeSender != null) {
            LOG.trace("isConnected connection={}", this.messageDriver == null ? "<no driver>" : Boolean.valueOf(this.messageDriver.isConnected()));
            return this.messageDriver != null && this.messageDriver.isConnected();
        }
        return false;
    }

    @Override
    public boolean isActive() {
        if (this.publisherActive) {
            return this.isConnected() && !this.isClosable();
        }
        return false;
    }

    @Override
    public void connect() throws MessagingException {
        if (!this.isConnected()) {
            this.ensureActiveSender();
        }
    }

    public void publish(Message<?> message) throws MessagingException {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Publish message: " + message.toString());
        }
        this.ensureActiveSender();
        this.sendMessage(this.activeSender, message);
        this.handleFlowControl();
        Optional<MessagingException> messagingException = this.checkForError();
        if (messagingException.isPresent()) {
            LOG.error("Error during message send with message: " + messagingException.get().getMessage());
            throw messagingException.get();
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Published message: " + message.toString());
        }
    }

    private void ensureActiveSender() throws MessagingException {
        if (!this.isConnected()) {
            this.initMessageDriver();
            int waitPerStepInMillis = 100;
            long retries = this.initialConnectionTimeoutMs / waitPerStepInMillis;
            while (!this.isConnected() && retries > 0L) {
                LOG.trace("Wait for active sender ({}).", (Object)retries);
                try {
                    Thread.sleep(waitPerStepInMillis);
                    --retries;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                this.checkForErrorAndThrow();
            }
            if (!this.isConnected()) {
                throw new MessagingException("Failed to ensure an active sender.");
            }
        }
    }

    private void checkForErrorAndThrow() throws MessagingException {
        Optional<MessagingException> messagingException = this.checkForError();
        if (messagingException.isPresent()) {
            throw messagingException.get();
        }
    }

    private Optional<MessagingException> checkForError() {
        Optional<MessagingException> error = this.messageDriver.getError();
        if (error.isPresent()) {
            return error;
        }
        if (this.lastError != null) {
            return Optional.of(this.lastError);
        }
        return Optional.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initMessageDriver() throws MessagingException {
        if (this.messageDriver != null) {
            if (this.lastError != null) {
                throw this.lastError;
            }
            throw new MessagingException("Invalid initialization of MessageDriver");
        }
        CountDownLatch countDownLatch = this.messageDriverOpen;
        synchronized (countDownLatch) {
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Init NEW MessageDriver:\n\turl={}\n\tdest={}\n\tmessagingReliableMode={};\n\tmaxFlow={}\n\tcreds={}\n\tauth={}\n", new Object[]{this.connectionUrl, this.endpoint, this.messagingReliableMode, this.maxFlowMessages, this.username != null, this.authSettings != null});
                }
                MessageDriver driver = MessageDriver.createOutgoing(this.connectionUrl, this.endpoint, this);
                driver.addAuthSettings(this.authSettings);
                if (this.username != null) {
                    driver.addCredentials(this.username, this.password);
                }
                driver.addProxy(this.proxyHost, this.proxyPort);
                MessageDriver.State state = driver.start(this.initialConnectionTimeoutMs, TimeUnit.MILLISECONDS).get();
                if (!state.isStarted()) {
                    driver.close();
                    throw new MessagingException("Messaging sender connection (endpoint=" + this.endpoint + "; url=" + this.connectionUrl + ") init failed: " + state.getFailure().getMessage(), (Throwable)state.getFailure());
                }
                this.messageDriver = driver;
                LOG.trace("Init successful NEW MessageDriver::{}", (Object)this.connectionUrl);
            }
            catch (IOException e) {
                throw new MessagingException((Throwable)e);
            }
            catch (InterruptedException | ExecutionException e) {
                LOG.error("Error during startup from type: {} with message: {}.", e.getClass(), (Object)e.getMessage());
                throw new MessagingException("Error during Startup: " + e.getMessage(), (Throwable)e);
            }
        }
    }

    private void sendMessage(Sender sender, Message<?> message) {
        BinaryContent content = this.toAmqpBinaryContent(message);
        String tag = String.valueOf(++this.lastSendTag);
        Delivery delivery = sender.delivery(tag.getBytes());
        sender.send(content.content, 0, content.length);
        if (this.messagingReliableMode == MessagingSetting.MessagingReliableMode.AT_MOST_ONCE) {
            delivery.setDefaultDeliveryState((DeliveryState)Accepted.getInstance());
            delivery.settle();
        }
        sender.advance();
        if (LOG.isTraceEnabled()) {
            LOG.trace("Message (tag={}) send to: {} [remaining messages={}]", new Object[]{tag, sender.getName(), sender.getQueued()});
            LOG.trace("Sender stats (after message:tag={} [queued={}, credit={}, remoteCredit={}, drained={}]", new Object[]{tag, sender.getQueued(), sender.getCredit(), sender.getRemoteCredit(), sender.getDrain()});
        }
        ++this.currentMessagesInFlow;
    }

    private BinaryContent toAmqpBinaryContent(Message<?> message) {
        org.apache.qpid.proton.message.Message amqpMessage = Proton.message();
        byte[] content = (byte[])message.getContentAs(byte[].class);
        Binary binary = new Binary(content);
        amqpMessage.setBody((Section)new Data(binary));
        HashMap headers = new HashMap();
        message.getHeaderNames().forEach(name -> headers.put(name, message.getHeader(name).get()));
        ApplicationProperties properties = new ApplicationProperties(headers);
        amqpMessage.setApplicationProperties(properties);
        int bestGuessedSize = 12 + content.length + headers.size() * 100;
        byte[] messageData = new byte[bestGuessedSize];
        int length = 0;
        boolean read = true;
        while (read) {
            try {
                length = amqpMessage.encode(messageData, length, messageData.length);
                read = false;
            }
            catch (BufferOverflowException e) {
                messageData = Arrays.copyOf(messageData, messageData.length + 2000);
            }
        }
        return new BinaryContent(length, messageData);
    }

    private void handleFlowControl() {
        if (this.messagingReliableMode == MessagingSetting.MessagingReliableMode.AT_MOST_ONCE) {
            this.handleFlowControlByCredit();
        } else {
            this.handleFlowControlByMessagesInFlow();
        }
    }

    private void handleFlowControlByMessagesInFlow() {
        if (this.currentMessagesInFlow > this.maxFlowMessages) {
            int counter = 0;
            int flowControlSleepMs = 0;
            while (this.currentMessagesInFlow > this.flowControlResumeThreshold) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("flowControl: high messages in flow count '{}', start slow down (sleep={}, step={}).", new Object[]{this.currentMessagesInFlow, flowControlSleepMs, counter});
                }
                try {
                    Thread.sleep(flowControlSleepMs += 75);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                if (counter++ != 1000) continue;
                String message = "flowControl: Message send failed due low credit/slow receiver.";
                LOG.error("flowControl: Message send failed due low credit/slow receiver.");
                throw new MessagingRuntimeException("flowControl: Message send failed due low credit/slow receiver.");
            }
        }
    }

    private void handleFlowControlByCredit() {
        if (this.publisherActive && this.activeSender.getCredit() <= 0) {
            int counter = 0;
            int flowControlSleepMs = 0;
            while (this.activeSender.getCredit() <= this.flowControlResumeThreshold) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("flowControl: low credit {}, start slow down (sleep={}, step={}).", new Object[]{this.activeSender.getCredit(), flowControlSleepMs, counter});
                }
                try {
                    Thread.sleep(flowControlSleepMs += 75);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                if (counter++ != 1000) continue;
                String message = "flowControl: Message send failed due low credit/slow receiver.";
                LOG.error("flowControl: Message send failed due low credit/slow receiver.");
                throw new MessagingRuntimeException("flowControl: Message send failed due low credit/slow receiver.");
            }
        }
    }

    public void onDelivery(Event event) {
        Delivery delivery = event.getDelivery();
        if (LOG.isTraceEnabled()) {
            LOG.trace("OnDelivery...");
            LOG.trace("Delivery settled state={}, remote={}, tag={}", new Object[]{delivery.isSettled(), delivery.remotelySettled(), this.getDeliveryTag(delivery)});
            LOG.trace("Sender has {} unsettled messages (sender settle mode: {})", (Object)event.getSender().getUnsettled(), (Object)event.getSender().getSenderSettleMode());
        }
        if (delivery.remotelySettled()) {
            delivery.settle();
            --this.currentMessagesInFlow;
            if (LOG.isTraceEnabled()) {
                LOG.trace("Settled tag={} (remote state={}, local unsettled={}, inFlow={})", new Object[]{this.getDeliveryTag(delivery), delivery.getRemoteState(), event.getSender().getUnsettled(), this.currentMessagesInFlow});
            }
        } else if (LOG.isTraceEnabled()) {
            LOG.trace(String.format("Not yet settled tag=%s (remote state=%s)", this.getDeliveryTag(delivery), delivery.getRemoteState()));
        }
    }

    private String getDeliveryTag(Delivery delivery) {
        return new String(delivery.getTag(), StandardCharsets.ISO_8859_1);
    }

    public void onLinkInit(Event e) {
        Sender sender = e.getSender();
        if (sender == null) {
            throw new MessagingRuntimeException("No sender found on link init.");
        }
        this.updateLinkReliableMode((Link)sender);
        this.activeSender = sender;
    }

    public void onLinkLocalClose(Event e) {
        Sender sender = e.getSender();
        if (sender != null) {
            LOG.trace("onLinkLocalClose: close.");
            this.activeSender = null;
            this.publisherActive = false;
        }
        super.onLinkLocalClose(e);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onConnectionRemoteOpen(Event e) {
        CountDownLatch countDownLatch = this.messageDriverOpen;
        synchronized (countDownLatch) {
            LOG.trace("onConnectionRemoteOpen: open");
            this.publisherActive = true;
            this.messageDriverOpen.countDown();
        }
    }

    public void close(boolean force) throws MessagingException {
        this.close(force, 5L, TimeUnit.SECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void close(boolean force, long time, TimeUnit unit) throws MessagingException {
        if (this.messageDriver == null || !this.messageDriver.isConnected()) {
            LOG.info("No active MessageDriver (driver={}) to close (skip close).", (Object)this.messageDriver);
            this.publisherActive = false;
            this.shutdown();
            if (this.lastError != null) {
                throw this.lastError;
            }
            return;
        }
        if (force) {
            this.checkForErrorAndHandleClose(this.messageDriver);
            this.shutdown();
            return;
        }
        if (this.messageDriverOpen.getCount() > 0L) {
            CountDownLatch countDownLatch = this.messageDriverOpen;
            synchronized (countDownLatch) {
                try {
                    LOG.warn("Close before active - wait for close till publisher gets active.");
                    if (!this.messageDriverOpen.await(2L, TimeUnit.SECONDS)) {
                        LOG.warn("Close before active - Publisher still not active but max wait time reached.");
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        if (this.publisherActive) {
            if (this.shutdownService == null) {
                LOG.info("Shutdown publisher delayed...");
                this.delayedClose();
            } else {
                LOG.info("Shutdown already requested and ongoing...");
            }
            this.checkForErrorAndHandleClose(this.messageDriver);
        } else {
            this.checkForErrorAndHandleClose(this.messageDriver);
        }
        super.close(time, unit);
    }

    private void checkForErrorAndHandleClose(MessageDriver messageDriver) throws MessagingException {
        if (messageDriver == null) {
            return;
        }
        Optional<MessagingException> possibleException = messageDriver.getError();
        if (possibleException.isPresent()) {
            MessagingException exception = possibleException.get();
            LOG.error("MessageDriver error occurred -> start forced close (msg: " + exception.getMessage() + ")." + (Object)((Object)exception));
            this.shutdown();
            throw exception;
        }
    }

    private synchronized void delayedClose() {
        if (this.messageDriver == null || !this.messageDriver.isConnected()) {
            LOG.info("No active MessageDriver to close (skip delayed close).");
            this.publisherActive = false;
            return;
        }
        Runnable gracefulShutdown = this.createGracefulShutdown(this.messageDriver);
        if (this.shutdownService == null) {
            this.shutdownService = Executors.newScheduledThreadPool(0);
            int initialWait = this.activeSender.getQueued() + this.activeSender.getUnsettled();
            if (initialWait > 100) {
                initialWait /= 50;
            }
            ScheduledFuture<?> scheduledFuture = this.shutdownService.scheduleAtFixedRate(gracefulShutdown, initialWait, 250L, TimeUnit.MILLISECONDS);
            LOG.trace("Start Close scheduler: {}", scheduledFuture);
        } else {
            LOG.trace("Close scheduler already started: {}", (Object)this.shutdownService);
        }
    }

    private Runnable createGracefulShutdown(MessageDriver messageDriver) {
        return () -> {
            boolean isClosable = this.isClosable();
            if (LOG.isDebugEnabled()) {
                LOG.debug("ShutdownService : {}", (Object)this.shutdownService);
                if (this.messagingReliableMode == MessagingSetting.MessagingReliableMode.AT_MOST_ONCE) {
                    LOG.debug("Try graceful close (closable={}); queued: {}.", (Object)isClosable, (Object)this.activeSender.getQueued());
                } else {
                    LOG.debug("Try graceful close (closable={}); inFlow: {}.", (Object)isClosable, (Object)this.currentMessagesInFlow);
                }
            }
            if (isClosable && messageDriver != null) {
                this.publisherActive = false;
                if (messageDriver.isClosable()) {
                    this.shutdown();
                } else {
                    LOG.debug("Driver not yet closable.");
                }
            }
        };
    }

    private void shutdown() {
        if (this.messageDriver == null) {
            LOG.info("No active MessageDriver to close.");
        } else {
            this.publisherActive = false;
            LOG.trace("Close MessageDriver: {}", (Object)this.messageDriver);
            this.messageDriver.close();
            LOG.trace("Executed close for driver: {}", (Object)this.messageDriver);
            this.messageDriver = null;
            LOG.info("Closed MessageDriver.");
        }
        if (this.shutdownService == null) {
            LOG.debug("All closed and no active shutdown service => finished");
        } else {
            LOG.debug("All closed, shutdown Shutdown service...");
            this.shutdownService.shutdown();
            this.shutdownService = null;
            LOG.trace("Shutdown service shutdown => finished");
        }
    }

    private boolean isClosable() {
        if (this.activeSender == null) {
            return true;
        }
        if (this.messagingReliableMode == MessagingSetting.MessagingReliableMode.AT_MOST_ONCE) {
            return this.activeSender.getQueued() <= 0;
        }
        return this.currentMessagesInFlow <= 0;
    }

    @Override
    public boolean isClosed() {
        if (this.publisherActive) {
            return false;
        }
        return this.messageDriver == null && this.shutdownService == null;
    }

    @Override
    protected void logError(String message, Object ... test) {
        LOG.error(String.format(message, test));
    }

    private static class BinaryContent {
        final int length;
        final byte[] content;

        public BinaryContent(int length, byte[] content) {
            this.length = length;
            this.content = content;
        }
    }
}

