/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.nats;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.PullSubscribeOptions;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.StreamConfiguration;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.nats.NatsConfiguration;
import org.apache.camel.component.nats.NatsEndpoint;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NatsConsumer
extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(NatsConsumer.class);
    private final Processor processor;
    private ExecutorService executor;
    private Connection connection;
    private Dispatcher dispatcher;
    private boolean active;
    private JetStreamSubscription jetStreamSubscription;

    public NatsConsumer(NatsEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.processor = processor;
    }

    public NatsEndpoint getEndpoint() {
        return (NatsEndpoint)super.getEndpoint();
    }

    protected void doStart() throws Exception {
        super.doStart();
        LOG.debug("Starting Nats Consumer");
        this.executor = this.getEndpoint().createExecutor((Object)this);
        LOG.debug("Getting Nats Connection");
        this.connection = this.getEndpoint().getConfiguration().getConnection() != null ? this.getEndpoint().getConfiguration().getConnection() : this.getEndpoint().getConnection();
        this.executor.submit(new NatsConsumingTask(this.connection, this.getEndpoint().getConfiguration()));
    }

    protected void doStop() throws Exception {
        NatsConfiguration configuration = this.getEndpoint().getConfiguration();
        if (configuration.isFlushConnection() && ObjectHelper.isNotEmpty((Object)this.connection)) {
            LOG.debug("Flushing Messages before stopping");
            this.connection.flush(Duration.ofMillis(configuration.getFlushTimeout()));
        }
        if (ObjectHelper.isNotEmpty((Object)this.dispatcher)) {
            try {
                this.dispatcher.unsubscribe(configuration.getTopic());
            }
            catch (Exception e) {
                this.getExceptionHandler().handleException("Error during unsubscribing", (Throwable)e);
            }
        }
        LOG.debug("Stopping Nats Consumer");
        if (this.executor != null) {
            if (this.getEndpoint() != null && this.getEndpoint().getCamelContext() != null) {
                this.getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.executor);
            } else {
                this.executor.shutdownNow();
            }
        }
        this.executor = null;
        if (ObjectHelper.isEmpty((Object)configuration.getConnection()) && ObjectHelper.isNotEmpty((Object)this.connection)) {
            LOG.debug("Closing Nats Connection");
            if (!this.connection.getStatus().equals((Object)Connection.Status.CLOSED)) {
                this.connection.close();
            }
        }
        super.doStop();
    }

    public boolean isActive() {
        return this.active;
    }

    public void setActive(boolean active) {
        this.active = active;
    }

    class NatsConsumingTask
    implements Runnable {
        private final Connection connection;
        private final NatsConfiguration configuration;

        NatsConsumingTask(Connection connection, NatsConfiguration configuration) {
            this.connection = connection;
            this.configuration = configuration;
        }

        @Override
        public void run() {
            try {
                NatsConfiguration config = NatsConsumer.this.getEndpoint().getConfiguration();
                String topic = config.getTopic();
                String queueName = config.getQueueName();
                String maxMessagesStr = config.getMaxMessages();
                Integer maxMessages = null;
                if (ObjectHelper.isNotEmpty((String)maxMessagesStr)) {
                    maxMessages = Integer.parseInt(maxMessagesStr);
                }
                if (config.isJetstreamEnabled() && this.connection.getServerInfo().isJetStreamAvailable()) {
                    this.setupJetStreamConsumer(topic, queueName);
                } else {
                    this.setupStandardNatsConsumer(topic, queueName, maxMessages);
                }
            }
            catch (Exception e) {
                NatsConsumer.this.getExceptionHandler().handleException("Error during processing", (Throwable)e);
            }
        }

        private void setupJetStreamConsumer(String topic, String queueName) throws IOException, JetStreamApiException {
            String streamName = this.configuration.getJetstreamName();
            String durableName = this.configuration.getDurableName();
            String subscriptionType = this.configuration.isPullSubscription() ? "PULL" : "PUSH";
            LOG.debug("Setting up JetStream {}/{} consumer for stream: '{}', subject: {}", new Object[]{subscriptionType, ObjectHelper.isNotEmpty((String)durableName) ? String.format("DURABLE, durableName: '%s'", durableName) : "EPHEMERAL", streamName, this.configuration.getTopic()});
            JetStreamManagement jsm = this.connection.jetStreamManagement();
            StreamConfiguration streamConfig = StreamConfiguration.builder().name(streamName).subjects(new String[]{topic}).build();
            jsm.addStream(streamConfig);
            ConsumerConfiguration cc = this.configuration.getConsumerConfiguration();
            if (cc == null) {
                ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder();
                ccBuilder.deliverSubject(null);
                if (durableName != null) {
                    ccBuilder.durable(durableName + "-durable");
                }
                cc = ccBuilder.build();
            }
            CamelNatsMessageHandler messageHandler = new CamelNatsMessageHandler();
            NatsConsumer.this.dispatcher = this.connection.createDispatcher((MessageHandler)messageHandler);
            if (this.configuration.isPullSubscription()) {
                PullSubscribeOptions pullOptions = ((PullSubscribeOptions.Builder)PullSubscribeOptions.builder().configuration(cc)).build();
                NatsConsumer.this.jetStreamSubscription = this.connection.jetStream().subscribe(NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), NatsConsumer.this.dispatcher, (MessageHandler)messageHandler, pullOptions);
            } else {
                PushSubscribeOptions pushOptions = ((PushSubscribeOptions.Builder)PushSubscribeOptions.builder().configuration(cc)).build();
                NatsConsumer.this.jetStreamSubscription = this.connection.jetStream().subscribe(NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), queueName, NatsConsumer.this.dispatcher, (MessageHandler)messageHandler, true, pushOptions);
            }
            NatsConsumer.this.setActive(true);
        }

        private void setupStandardNatsConsumer(String topic, String queueName, Integer maxMessages) {
            LOG.debug("Setting up standard NATS consumer for subject: {}", (Object)topic);
            NatsConsumer.this.dispatcher = this.connection.createDispatcher((MessageHandler)new CamelNatsMessageHandler());
            NatsConsumer.this.dispatcher = ObjectHelper.isNotEmpty((String)queueName) ? NatsConsumer.this.dispatcher.subscribe(topic, queueName) : NatsConsumer.this.dispatcher.subscribe(topic);
            if (maxMessages != null) {
                NatsConsumer.this.dispatcher.unsubscribe(topic, maxMessages.intValue());
            }
            if (NatsConsumer.this.dispatcher.isActive()) {
                NatsConsumer.this.setActive(true);
            }
        }

        class CamelNatsMessageHandler
        implements MessageHandler {
            CamelNatsMessageHandler() {
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onMessage(Message msg) throws InterruptedException {
                LOG.debug("Received Message: {}", (Object)msg);
                Exchange exchange = NatsConsumer.this.createExchange(false);
                try {
                    exchange.getIn().setBody((Object)msg.getData());
                    exchange.getIn().setHeader("CamelNatsReplyTo", (Object)msg.getReplyTo());
                    exchange.getIn().setHeader("CamelNatsSID", (Object)msg.getSID());
                    exchange.getIn().setHeader("CamelNatsSubject", (Object)msg.getSubject());
                    exchange.getIn().setHeader("CamelNatsQueueName", (Object)msg.getSubscription().getQueueName());
                    exchange.getIn().setHeader("CamelNatsMessageTimestamp", (Object)System.currentTimeMillis());
                    if (msg.isStatusMessage() && msg.getStatus() != null) {
                        exchange.getIn().setHeader("CamelNatsStatusCode", (Object)msg.getStatus().getCode());
                        exchange.getIn().setHeader("CamelNatsStatusError", (Object)msg.getStatus().getMessage());
                    }
                    if (msg.getHeaders() != null) {
                        HeaderFilterStrategy strategy = NatsConsumer.this.getEndpoint().getConfiguration().getHeaderFilterStrategy();
                        msg.getHeaders().entrySet().forEach(entry -> {
                            if (!strategy.applyFilterToExternalHeaders((String)entry.getKey(), entry.getValue(), exchange)) {
                                if (((List)entry.getValue()).size() == 1) {
                                    exchange.getIn().setHeader((String)entry.getKey(), ((List)entry.getValue()).get(0));
                                } else {
                                    exchange.getIn().setHeader((String)entry.getKey(), entry.getValue());
                                }
                            } else {
                                LOG.debug("Excluding header {} as per strategy", entry.getKey());
                            }
                        });
                    }
                    NatsConsumer.this.processor.process(exchange);
                    if (!NatsConsumingTask.this.configuration.isReplyToDisabled() && msg.getReplyTo() != null && msg.getConnection() != null) {
                        Connection con = msg.getConnection();
                        byte[] data = (byte[])exchange.getMessage().getBody(byte[].class);
                        if (data != null) {
                            LOG.debug("Publishing replyTo: {} message", (Object)msg.getReplyTo());
                            con.publish(msg.getReplyTo(), data);
                        }
                    }
                }
                catch (Exception e) {
                    NatsConsumer.this.getExceptionHandler().handleException("Error during processing", exchange, (Throwable)e);
                }
                finally {
                    NatsConsumer.this.releaseExchange(exchange, false);
                }
            }
        }
    }
}

