/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.nats;

import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.Message;
import io.nats.client.api.PublishAck;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.impl.Headers;
import io.nats.client.impl.NatsMessage;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.InvalidPayloadException;
import org.apache.camel.component.nats.NatsConfiguration;
import org.apache.camel.component.nats.NatsEndpoint;
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NatsProducer
extends DefaultAsyncProducer {
    private static final Logger LOG = LoggerFactory.getLogger(NatsProducer.class);
    private final ExecutorServiceManager executorServiceManager;
    private ScheduledExecutorService scheduler;
    private Connection connection;

    public NatsProducer(NatsEndpoint endpoint) {
        super((Endpoint)endpoint);
        this.executorServiceManager = endpoint.getCamelContext().getExecutorServiceManager();
    }

    public NatsEndpoint getEndpoint() {
        return (NatsEndpoint)super.getEndpoint();
    }

    public boolean process(Exchange exchange, AsyncCallback callback) {
        NatsConfiguration config = this.getEndpoint().getConfiguration();
        byte[] body = (byte[])exchange.getIn().getBody(byte[].class);
        if (body == null) {
            try {
                body = ((String)exchange.getIn().getMandatoryBody(String.class)).getBytes();
            }
            catch (InvalidPayloadException e2) {
                exchange.setException((Throwable)e2);
                callback.done(true);
                return true;
            }
        }
        if (exchange.getPattern().isOutCapable()) {
            LOG.debug("Requesting to topic: {}", (Object)config.getTopic());
            NatsMessage.Builder builder = NatsMessage.builder().data(body).subject(config.getTopic()).headers(this.buildHeaders(exchange));
            CompletableFuture requestFuture = this.connection.request((Message)builder.build());
            CompletableFuture timeoutFuture = this.failAfter(exchange, Duration.ofMillis(config.getRequestTimeout()));
            CompletableFuture.anyOf(requestFuture, timeoutFuture).whenComplete((message, e) -> {
                if (e == null) {
                    Message msg = (Message)message;
                    exchange.getMessage().setBody((Object)msg.getData());
                    exchange.getMessage().setHeader("CamelNatsReplyTo", (Object)msg.getReplyTo());
                    exchange.getMessage().setHeader("CamelNatsSID", (Object)msg.getSID());
                    exchange.getMessage().setHeader("CamelNatsSubject", (Object)msg.getSubject());
                    exchange.getMessage().setHeader("CamelNatsQueueName", (Object)msg.getSubscription().getQueueName());
                    exchange.getMessage().setHeader("CamelNatsMessageTimestamp", (Object)System.currentTimeMillis());
                } else {
                    exchange.setException(e.getCause());
                }
                callback.done(false);
                if (!requestFuture.isDone()) {
                    requestFuture.cancel(true);
                }
                if (!timeoutFuture.isDone()) {
                    timeoutFuture.cancel(true);
                }
            });
            return false;
        }
        LOG.debug("Publishing to subject: {}", (Object)config.getTopic());
        try {
            if (config.isJetstreamEnabled() && this.connection.getServerInfo().isJetStreamAvailable()) {
                this.publishWithJetStream(config, body, exchange);
            } else {
                this.publishWithoutJetStream(config, body, exchange);
            }
        }
        catch (Exception e3) {
            exchange.setException((Throwable)e3);
            callback.done(true);
            return true;
        }
        callback.done(true);
        return true;
    }

    private Headers buildHeaders(Exchange exchange) {
        Headers headers = new Headers();
        HeaderFilterStrategy filteringStrategy = this.getEndpoint().getConfiguration().getHeaderFilterStrategy();
        exchange.getIn().getHeaders().forEach((key, value) -> {
            if (!filteringStrategy.applyFilterToCamelHeaders(key, value, exchange)) {
                String headerValue = value instanceof byte[] ? new String((byte[])value, StandardCharsets.UTF_8) : String.valueOf(value);
                if (headers.get(key) != null) {
                    headers.get(key).add(headerValue);
                } else {
                    headers.add(key, new String[]{headerValue});
                }
            } else {
                LOG.debug("Excluding header {} as per strategy", key);
            }
        });
        return headers;
    }

    private <T> CompletableFuture<T> failAfter(Exchange exchange, Duration duration) {
        CompletableFuture future = new CompletableFuture();
        this.scheduler.schedule(() -> {
            ExchangeTimedOutException ex = new ExchangeTimedOutException(exchange, duration.toMillis());
            return future.completeExceptionally((Throwable)ex);
        }, duration.toNanos(), TimeUnit.NANOSECONDS);
        return future;
    }

    protected void doStart() throws Exception {
        ThreadPoolProfile profile = this.executorServiceManager.getThreadPoolProfile("CamelNatsRequestTimeoutExecutor");
        if (profile == null) {
            profile = this.executorServiceManager.getDefaultThreadPoolProfile();
        }
        this.scheduler = this.executorServiceManager.newScheduledThreadPool((Object)this, "CamelNatsRequestTimeoutExecutor", profile);
        super.doStart();
        LOG.debug("Starting Nats Producer");
        LOG.debug("Getting Nats Connection");
        this.connection = this.getEndpoint().getConfiguration().getConnection() != null ? this.getEndpoint().getConfiguration().getConnection() : this.getEndpoint().getConnection();
    }

    protected void doStop() throws Exception {
        if (this.scheduler != null) {
            this.executorServiceManager.shutdownNow((ExecutorService)this.scheduler);
        }
        LOG.debug("Stopping Nats Producer");
        if (ObjectHelper.isEmpty((Object)this.getEndpoint().getConfiguration().getConnection())) {
            LOG.debug("Closing Nats Connection");
            if (this.connection != null && !this.connection.getStatus().equals((Object)Connection.Status.CLOSED)) {
                if (this.getEndpoint().getConfiguration().isFlushConnection()) {
                    LOG.debug("Flushing Nats Connection");
                    this.connection.flush(Duration.ofMillis(this.getEndpoint().getConfiguration().getFlushTimeout()));
                }
                this.connection.close();
            }
        }
        super.doStop();
    }

    private void publishWithJetStream(NatsConfiguration config, byte[] body, Exchange exchange) throws Exception {
        JetStream js;
        LOG.debug("JetStream is available");
        try {
            JetStreamManagement jsm = this.connection.jetStreamManagement();
            js = jsm.jetStream();
            if (js == null) {
                jsm.addStream(StreamConfiguration.builder().name(config.getJetstreamName()).subjects(new String[]{config.getTopic()}).build());
                js = jsm.jetStream();
            }
        }
        catch (JetStreamApiException | IOException e) {
            throw new Exception("Failed to initialize JetStream: " + e.getMessage(), e);
        }
        NatsMessage.Builder builder = NatsMessage.builder().data(body).subject(config.getTopic()).headers(this.buildHeaders(exchange));
        if (ObjectHelper.isNotEmpty((String)config.getReplySubject())) {
            String replySubject = config.getReplySubject();
            builder.replyTo(replySubject);
        }
        NatsMessage jetStreamMessage = builder.build();
        if (config.isJetstreamAsync()) {
            PublishAck pa;
            CompletableFuture future = js.publishAsync((Message)jetStreamMessage);
            try {
                pa = (PublishAck)future.get(config.getRequestTimeout(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                throw new Exception("Failed to publish message asynchronously with JetStream: " + e.getMessage(), e);
            }
            LOG.debug("Publish Sequence async: {}", (Object)pa.getSeqno());
        } else {
            PublishAck pa;
            try {
                pa = js.publish((Message)jetStreamMessage);
            }
            catch (JetStreamApiException | IOException e) {
                throw new Exception("Failed to publish message synchronously with JetStream: " + e.getMessage(), e);
            }
            LOG.debug("Publish Sequence synchronously: {}", (Object)pa.getSeqno());
        }
    }

    private void publishWithoutJetStream(NatsConfiguration config, byte[] body, Exchange exchange) {
        NatsMessage.Builder builder = NatsMessage.builder().data(body).subject(config.getTopic()).headers(this.buildHeaders(exchange));
        if (ObjectHelper.isNotEmpty((String)config.getReplySubject())) {
            String replySubject = config.getReplySubject();
            builder.replyTo(replySubject);
        }
        this.connection.publish((Message)builder.build());
    }
}

