/*
 * Decompiled with CFR 0.152.
 */
package com.redhat.jenkins.plugins.ci.messaging;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.redhat.jenkins.plugins.ci.CIEnvironmentContributingAction;
import com.redhat.jenkins.plugins.ci.messaging.ActiveMqMessagingProvider;
import com.redhat.jenkins.plugins.ci.messaging.JMSMessagingWorker;
import com.redhat.jenkins.plugins.ci.messaging.MessagingProviderOverrides;
import com.redhat.jenkins.plugins.ci.messaging.data.SendResult;
import com.redhat.jenkins.plugins.ci.provider.data.ActiveMQPublisherProviderData;
import com.redhat.jenkins.plugins.ci.provider.data.ActiveMQSubscriberProviderData;
import com.redhat.jenkins.plugins.ci.provider.data.ProviderData;
import com.redhat.utils.OrderedProperties;
import com.redhat.utils.PluginUtils;
import hudson.EnvVars;
import hudson.model.Action;
import hudson.model.Result;
import hudson.model.Run;
import hudson.model.TaskListener;
import java.io.IOException;
import java.io.StringReader;
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.sql.Time;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import jenkins.model.Jenkins;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.StringUtils;

public class ActiveMqMessagingWorker
extends JMSMessagingWorker {
    private static final Logger log = Logger.getLogger(ActiveMqMessagingWorker.class.getName());
    private final ActiveMqMessagingProvider provider;
    private Connection connection;
    private MessageConsumer subscriber;
    private String uuid = UUID.randomUUID().toString();

    public ActiveMqMessagingWorker(ActiveMqMessagingProvider provider, MessagingProviderOverrides overrides, String jobname) {
        this.provider = provider;
        this.overrides = overrides;
        this.jobname = jobname;
    }

    @Override
    public boolean subscribe(String jobname, String selector) {
        this.topic = this.getTopic(this.provider);
        if (this.topic != null) {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    if (!this.isConnected() && !this.connect()) {
                        return false;
                    }
                    if (this.subscriber == null) {
                        log.info("Subscribing job '" + jobname + "' to '" + this.topic + "' topic.");
                        Session session = this.connection.createSession(false, 1);
                        if (this.provider.getUseQueues().booleanValue()) {
                            Queue destination = session.createQueue(this.topic);
                            this.subscriber = session.createConsumer((Destination)destination, selector, false);
                        } else {
                            Topic destination = session.createTopic(this.topic);
                            this.subscriber = session.createDurableSubscriber(destination, jobname, selector, false);
                        }
                        log.info("Successfully subscribed job '" + jobname + "' to '" + this.topic + "' topic with selector: " + selector);
                    } else {
                        log.fine("Already subscribed to '" + this.topic + "' topic with selector: " + selector + " for job '" + jobname);
                    }
                    return true;
                }
                catch (JMSException ex) {
                    log.log(Level.SEVERE, "JMS exception raised while subscribing job '" + jobname + "', retrying in " + RETRY_MINUTES + " minutes.", ex);
                    if (Thread.currentThread().isInterrupted()) continue;
                    this.unsubscribe(jobname);
                    try {
                        Thread.sleep(RETRY_MINUTES * 60 * 1000);
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
        return false;
    }

    @Override
    public boolean connect() {
        this.connection = null;
        ActiveMQConnectionFactory connectionFactory = this.provider.getConnectionFactory();
        Connection connectiontmp = null;
        try {
            connectiontmp = connectionFactory.createConnection();
            String url = "";
            if (Jenkins.getInstance() != null) {
                url = Jenkins.getInstance().getRootUrl();
            }
            connectiontmp.setClientID(this.provider.getName() + "_" + url + "_" + this.uuid + "_" + this.jobname);
            connectiontmp.start();
        }
        catch (JMSException e) {
            log.severe("Unable to connect to " + this.provider.getBroker() + " " + e.getMessage());
            try {
                if (connectiontmp != null) {
                    connectiontmp.close();
                }
            }
            catch (JMSException jMSException) {
                // empty catch block
            }
            return false;
        }
        log.info("Connection started");
        this.connection = connectiontmp;
        return true;
    }

    @Override
    public void unsubscribe(String jobname) {
        log.info("Unsubcribing job '" + jobname + "' from the '" + this.topic + "' topic.");
        this.disconnect();
        if (this.subscriber != null) {
            try {
                this.subscriber.close();
            }
            catch (Exception exception) {
            }
            finally {
                this.subscriber = null;
            }
        }
    }

    public static String getMessageHeaders(Message message) {
        try {
            ObjectMapper mapper = new ObjectMapper();
            ObjectNode root = mapper.createObjectNode();
            Enumeration e = message.getPropertyNames();
            while (e.hasMoreElements()) {
                String s = (String)e.nextElement();
                if (message.getStringProperty(s) == null) continue;
                root.set(s, (JsonNode)mapper.convertValue(message.getObjectProperty(s), JsonNode.class));
            }
            root.set("JMSCorrelationID", (JsonNode)mapper.convertValue((Object)message.getJMSCorrelationID(), JsonNode.class));
            root.set("JMSDeliveryMode", (JsonNode)mapper.convertValue((Object)new Integer(message.getJMSDeliveryMode()), JsonNode.class));
            root.set("JMSDestination", (JsonNode)mapper.convertValue((Object)message.getJMSDestination().toString(), JsonNode.class));
            root.set("JMSExpiration", (JsonNode)mapper.convertValue((Object)message.getJMSExpiration(), JsonNode.class));
            root.set("JMSMessageID", (JsonNode)mapper.convertValue((Object)message.getJMSMessageID(), JsonNode.class));
            root.set("JMSPriority", (JsonNode)mapper.convertValue((Object)message.getJMSPriority(), JsonNode.class));
            root.set("JMSRedelivered", (JsonNode)mapper.convertValue((Object)message.getJMSRedelivered(), JsonNode.class));
            root.set("JMSReplyTo", (JsonNode)mapper.convertValue((Object)message.getJMSReplyTo(), JsonNode.class));
            root.set("JMSTimestamp", (JsonNode)mapper.convertValue((Object)message.getJMSTimestamp(), JsonNode.class));
            root.set("JMSType", (JsonNode)mapper.convertValue((Object)message.getJMSType(), JsonNode.class));
            return mapper.writer().writeValueAsString((Object)root);
        }
        catch (Exception e) {
            log.log(Level.SEVERE, "Unhandled exception retrieving message headers:\n" + ActiveMqMessagingWorker.formatMessage(message), e);
            return "";
        }
    }

    public static String getMessageBody(Message message) {
        try {
            if (message instanceof MapMessage) {
                MapMessage mm = (MapMessage)message;
                ObjectMapper mapper = new ObjectMapper();
                ObjectNode root = mapper.createObjectNode();
                Enumeration e = mm.getMapNames();
                while (e.hasMoreElements()) {
                    String field = (String)e.nextElement();
                    root.set(field, (JsonNode)mapper.convertValue(mm.getObject(field), JsonNode.class));
                }
                return mapper.writer().writeValueAsString((Object)root);
            }
            if (message instanceof TextMessage) {
                TextMessage tm = (TextMessage)message;
                return tm.getText();
            }
            if (message instanceof BytesMessage) {
                BytesMessage bm = (BytesMessage)message;
                bm.reset();
                byte[] bytes = new byte[(int)bm.getBodyLength()];
                bm.readBytes(bytes);
                return new String(bytes);
            }
            log.log(Level.SEVERE, "Unsupported message type:\n" + ActiveMqMessagingWorker.formatMessage(message));
        }
        catch (Exception e) {
            log.log(Level.SEVERE, "Unhandled exception retrieving message body:\n" + ActiveMqMessagingWorker.formatMessage(message), e);
        }
        return "";
    }

    private void process(String jobname, Message message) {
        try {
            HashMap<String, String> params = new HashMap<String, String>();
            params.put("CI_MESSAGE", ActiveMqMessagingWorker.getMessageBody(message));
            params.put("MESSAGE_HEADERS", ActiveMqMessagingWorker.getMessageHeaders(message));
            Enumeration e = message.getPropertyNames();
            while (e.hasMoreElements()) {
                String s = (String)e.nextElement();
                if (message.getStringProperty(s) == null) continue;
                params.put(s, message.getObjectProperty(s).toString());
            }
            super.trigger(jobname, ActiveMqMessagingWorker.formatMessage(message), params);
        }
        catch (Exception e) {
            log.log(Level.SEVERE, "Unhandled exception processing message:\n" + ActiveMqMessagingWorker.formatMessage(message), e);
        }
    }

    @Override
    public void receive(String jobname, ProviderData pdata) {
        block8: {
            ActiveMQSubscriberProviderData pd = (ActiveMQSubscriberProviderData)pdata;
            int timeoutInMs = (pd.getTimeout() != null ? pd.getTimeout() : ActiveMQSubscriberProviderData.DEFAULT_TIMEOUT_IN_MINUTES) * 60 * 1000;
            while (!this.subscribe(jobname, pd.getSelector())) {
                if (Thread.currentThread().isInterrupted()) continue;
                try {
                    int WAIT_SECONDS = 2;
                    Thread.sleep(WAIT_SECONDS * 1000);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            try {
                Message m = this.subscriber.receive((long)timeoutInMs);
                if (m != null) {
                    if (this.provider.verify(ActiveMqMessagingWorker.getMessageContent(m), pd.getChecks())) {
                        this.process(jobname, m);
                    }
                } else {
                    log.info("No message received for the past " + timeoutInMs + " ms, re-subscribing job '" + jobname + "'.");
                    this.unsubscribe(jobname);
                }
            }
            catch (JMSException e) {
                if (Thread.currentThread().isInterrupted()) break block8;
                log.log(Level.WARNING, "JMS exception raised while receiving, going to re-subscribe job '" + jobname + "'.", e);
                this.unsubscribe(jobname);
            }
        }
    }

    @Override
    public boolean isConnected() {
        return this.connection != null;
    }

    @Override
    public boolean isConnectedAndSubscribed() {
        return this.connection != null && this.subscriber != null;
    }

    @Override
    public void disconnect() {
        if (this.connection != null) {
            try {
                this.connection.close();
            }
            catch (JMSException jMSException) {
            }
            finally {
                this.connection = null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SendResult sendMessage(Run<?, ?> build, TaskListener listener, ProviderData pdata) {
        String mesgContent;
        String mesgId;
        block45: {
            ActiveMQPublisherProviderData pd = (ActiveMQPublisherProviderData)pdata;
            Connection connection = null;
            Session session = null;
            MessageProducer publisher = null;
            TextMessage message = null;
            mesgId = "0";
            mesgContent = "";
            try {
                String ltopic = PluginUtils.getSubstitutedValue(this.getTopic(this.provider), build.getEnvironment(listener));
                if (this.provider.getAuthenticationMethod() != null && ltopic != null && this.provider.getBroker() != null) {
                    ActiveMQConnectionFactory connectionFactory = this.provider.getConnectionFactory();
                    connection = connectionFactory.createConnection();
                    connection.start();
                    session = connection.createSession(false, 1);
                    Topic destination = session.createTopic(ltopic);
                    publisher = session.createProducer((Destination)destination);
                    message = session.createTextMessage("");
                    message.setJMSType("application/json");
                    TreeMap<String, String> envVarParts = new TreeMap<String, String>();
                    message.setStringProperty("CI_NAME", build.getParent().getName());
                    envVarParts.put("CI_NAME", build.getParent().getName());
                    if (pd.getMessageType() != null) {
                        message.setStringProperty("CI_TYPE", pd.getMessageType().getMessage());
                        envVarParts.put("CI_TYPE", pd.getMessageType().getMessage());
                    }
                    if (!build.isBuilding()) {
                        String ciStatus = build.getResult() == Result.SUCCESS ? "passed" : "failed";
                        message.setStringProperty("CI_STATUS", ciStatus);
                        envVarParts.put("CI_STATUS", ciStatus);
                        envVarParts.put("BUILD_STATUS", build.getResult().toString());
                    }
                    EnvVars baseEnvVars = build.getEnvironment(listener);
                    EnvVars envVars = new EnvVars();
                    envVars.putAll((Map)baseEnvVars);
                    envVars.putAll(envVarParts);
                    if (!StringUtils.isEmpty((CharSequence)pd.getMessageProperties())) {
                        OrderedProperties p = new OrderedProperties();
                        p.load(new StringReader(pd.getMessageProperties()));
                        Enumeration e = p.propertyNames();
                        while (e.hasMoreElements()) {
                            String key = (String)e.nextElement();
                            if (key.toLowerCase().startsWith("jms") && this.setMessageHeader((Message)message, key, p.getProperty(key), session)) continue;
                            EnvVars envVars2 = new EnvVars();
                            envVars2.putAll((Map)baseEnvVars);
                            envVars2.putAll(envVarParts);
                            String val = PluginUtils.getSubstitutedValue(p.getProperty(key), envVars2);
                            message.setStringProperty(key, val);
                            envVarParts.put(key, val);
                        }
                    }
                    EnvVars envVars2 = new EnvVars();
                    envVars2.putAll((Map)baseEnvVars);
                    envVars2.putAll(envVarParts);
                    message.setText(PluginUtils.getSubstitutedValue(pd.getMessageContent(), envVars2));
                    publisher.send((Message)message);
                    log.info("Sent " + pd.getMessageType().toString() + " message for job '" + build.getParent().getName() + "' to topic '" + ltopic + "':\n" + ActiveMqMessagingWorker.formatMessage((Message)message));
                    mesgId = message.getJMSMessageID();
                    mesgContent = message.getText();
                    break block45;
                }
                log.severe("One or more of the following is invalid (null): user, password, topic, broker.");
                SendResult sendResult = new SendResult(false, mesgId, mesgContent);
                return sendResult;
            }
            catch (Exception e) {
                if (pd.isFailOnError().booleanValue()) {
                    log.severe("Unhandled exception in perform: ");
                    log.severe(ExceptionUtils.getStackTrace((Throwable)e));
                    listener.fatalError("Unhandled exception in perform: ");
                    listener.fatalError(ExceptionUtils.getStackTrace((Throwable)e));
                    SendResult sendResult = new SendResult(false, mesgId, mesgContent);
                    return sendResult;
                }
                log.warning("Unhandled exception in perform: ");
                log.warning(ExceptionUtils.getStackTrace((Throwable)e));
                listener.error("Unhandled exception in perform: ");
                listener.error(ExceptionUtils.getStackTrace((Throwable)e));
                SendResult sendResult = new SendResult(true, mesgId, mesgContent);
                return sendResult;
            }
            finally {
                if (publisher != null) {
                    try {
                        publisher.close();
                    }
                    catch (JMSException jMSException) {}
                }
                if (session != null) {
                    try {
                        session.close();
                    }
                    catch (JMSException jMSException) {}
                }
                if (connection != null) {
                    try {
                        connection.close();
                    }
                    catch (JMSException jMSException) {}
                }
            }
        }
        return new SendResult(true, mesgId, mesgContent);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String waitForMessage(Run<?, ?> build, TaskListener listener, ProviderData pdata) {
        ActiveMQSubscriberProviderData pd = (ActiveMQSubscriberProviderData)pdata;
        String ip = null;
        try {
            ip = Inet4Address.getLocalHost().getHostAddress();
        }
        catch (UnknownHostException e) {
            log.severe("Unable to get localhost IP address.");
        }
        String ltopic = this.getTopic(this.provider);
        try {
            ltopic = PluginUtils.getSubstitutedValue(this.getTopic(this.provider), build.getEnvironment(listener));
        }
        catch (IOException e) {
            log.warning(e.getMessage());
        }
        catch (InterruptedException e) {
            log.warning(e.getMessage());
        }
        if (ip != null && this.provider.getAuthenticationMethod() != null && ltopic != null && this.provider.getBroker() != null) {
            log.info("Waiting for message with selector: " + pd.getSelector());
            listener.getLogger().println("Waiting for message with selector: " + pd.getSelector());
            Connection connection = null;
            MessageConsumer consumer = null;
            try {
                Queue destination;
                ActiveMQConnectionFactory connectionFactory = this.provider.getConnectionFactory();
                connection = connectionFactory.createConnection();
                connection.setClientID(ip + "_" + UUID.randomUUID().toString());
                connection.start();
                Session session = connection.createSession(false, 1);
                if (this.provider.getUseQueues().booleanValue()) {
                    destination = session.createQueue(ltopic);
                    consumer = session.createConsumer((Destination)destination, pd.getSelector(), false);
                } else {
                    destination = session.createTopic(ltopic);
                    consumer = session.createDurableSubscriber((Topic)destination, this.jobname, pd.getSelector(), false);
                }
                Message message = consumer.receive((long)((pd.getTimeout() != null ? pd.getTimeout() : ActiveMQSubscriberProviderData.DEFAULT_TIMEOUT_IN_MINUTES) * 60 * 1000));
                if (message != null) {
                    String value = ActiveMqMessagingWorker.getMessageBody(message);
                    if (build != null && StringUtils.isNotEmpty((CharSequence)pd.getVariable())) {
                        EnvVars vars = new EnvVars();
                        vars.put(pd.getVariable(), value);
                        build.addAction((Action)new CIEnvironmentContributingAction((Map<String, String>)vars));
                    }
                    log.info("Received message with selector: " + pd.getSelector() + "\n" + ActiveMqMessagingWorker.formatMessage(message));
                    listener.getLogger().println("Received message with selector: " + pd.getSelector() + "\n" + ActiveMqMessagingWorker.formatMessage(message));
                    String string = value;
                    return string;
                }
                log.info("Timed out waiting for message!");
                listener.getLogger().println("Timed out waiting for message!");
            }
            catch (Exception e) {
                log.log(Level.SEVERE, "Unhandled exception waiting for message.", e);
            }
            finally {
                if (consumer != null) {
                    try {
                        consumer.close();
                    }
                    catch (Exception exception) {}
                }
                if (connection != null) {
                    try {
                        connection.close();
                    }
                    catch (Exception exception) {}
                }
            }
        } else {
            log.severe("One or more of the following is invalid (null): ip, user, password, topic, broker.");
        }
        return null;
    }

    @Override
    public void prepareForInterrupt() {
    }

    @Override
    public boolean isBeingInterrupted() {
        return false;
    }

    private static String formatHeaders(Message message) {
        Destination dest = null;
        int delMode = 0;
        long expiration = 0L;
        Time expTime = null;
        int priority = 0;
        String msgID = null;
        long timestamp = 0L;
        Time timestampTime = null;
        String correlID = null;
        Destination replyTo = null;
        boolean redelivered = false;
        String type = null;
        StringBuilder sb = new StringBuilder();
        try {
            try {
                dest = message.getJMSDestination();
                sb.append("  JMSDestination: ");
                sb.append(dest);
                sb.append("\n");
            }
            catch (Exception e) {
                log.log(Level.WARNING, "Unable to generate JMSDestination header\n", e);
            }
            try {
                delMode = message.getJMSDeliveryMode();
                if (delMode == 1) {
                    sb.append("  JMSDeliveryMode: non-persistent\n");
                } else if (delMode == 2) {
                    sb.append("  JMSDeliveryMode: persistent\n");
                } else {
                    sb.append("  JMSDeliveryMode: neither persistent nor non-persistent; error\n");
                }
            }
            catch (Exception e) {
                log.log(Level.WARNING, "Unable to generate JMSDeliveryMode header\n", e);
            }
            try {
                expiration = message.getJMSExpiration();
                if (expiration != 0L) {
                    expTime = new Time(expiration);
                    sb.append("  JMSExpiration: ");
                    sb.append(expTime);
                    sb.append("\n");
                } else {
                    sb.append("  JMSExpiration: 0\n");
                }
            }
            catch (Exception e) {
                log.log(Level.WARNING, "Unable to generate JMSExpiration header\n", e);
            }
            try {
                priority = message.getJMSPriority();
                sb.append("  JMSPriority: ");
                sb.append(priority);
                sb.append("\n");
            }
            catch (Exception e) {
                log.log(Level.WARNING, "Unable to generate JMSPriority header\n", e);
            }
            try {
                msgID = message.getJMSMessageID();
                sb.append("  JMSMessageID: ");
                sb.append(msgID);
                sb.append("\n");
            }
            catch (Exception e) {
                log.log(Level.WARNING, "Unable to generate JMSMessageID header\n", e);
            }
            try {
                timestamp = message.getJMSTimestamp();
                if (timestamp != 0L) {
                    timestampTime = new Time(timestamp);
                    sb.append("  JMSTimestamp: ");
                    sb.append(timestampTime);
                    sb.append("\n");
                } else {
                    sb.append("  JMSTimestamp: 0\n");
                }
            }
            catch (Exception e) {
                log.log(Level.WARNING, "Unable to generate JMSTimestamp header\n", e);
            }
            try {
                correlID = message.getJMSCorrelationID();
                sb.append("  JMSCorrelationID: ");
                sb.append(correlID);
                sb.append("\n");
            }
            catch (Exception e) {
                log.log(Level.WARNING, "Unable to generate JMSCorrelationID header\n", e);
            }
            try {
                replyTo = message.getJMSReplyTo();
                sb.append("  JMSReplyTo: ");
                sb.append(replyTo);
                sb.append("\n");
            }
            catch (Exception e) {
                log.log(Level.WARNING, "Unable to generate JMSReplyTo header\n", e);
            }
            try {
                redelivered = message.getJMSRedelivered();
                sb.append("  JMSRedelivered: ");
                sb.append(redelivered);
                sb.append("\n");
            }
            catch (Exception e) {
                log.log(Level.WARNING, "Unable to generate JMSRedelivered header\n", e);
            }
            try {
                type = message.getJMSType();
                sb.append("  JMSType: ");
                sb.append(type);
                sb.append("\n");
            }
            catch (Exception e) {
                log.log(Level.WARNING, "Unable to generate JMSType header\n", e);
            }
        }
        catch (Exception e) {
            log.log(Level.WARNING, "Unable to generate JMS headers\n", e);
        }
        return sb.toString();
    }

    public static String formatMessage(Message message) {
        StringBuilder sb = new StringBuilder();
        try {
            String headers = ActiveMqMessagingWorker.formatHeaders(message);
            if (headers.length() > 0) {
                sb.append("Message Headers:\n");
                sb.append(headers);
            }
            sb.append("Message Properties:\n");
            Enumeration propNames = message.getPropertyNames();
            while (propNames.hasMoreElements()) {
                String propertyName = (String)propNames.nextElement();
                sb.append("  ");
                sb.append(propertyName);
                sb.append(": ");
                if (message.getObjectProperty(propertyName) != null) {
                    sb.append(message.getObjectProperty(propertyName).toString());
                }
                sb.append("\n");
            }
            sb.append("Message Content:\n");
            sb.append(ActiveMqMessagingWorker.getMessageContent(message));
        }
        catch (Exception e) {
            log.log(Level.SEVERE, "Unable to format message:", e);
        }
        return sb.toString();
    }

    private static String getMessageContent(Message message) {
        StringBuilder sb = new StringBuilder();
        try {
            if (message instanceof TextMessage) {
                sb.append(((TextMessage)message).getText());
            } else if (message instanceof MapMessage) {
                MapMessage mm = (MapMessage)message;
                ObjectMapper mapper = new ObjectMapper();
                ObjectNode root = mapper.createObjectNode();
                Enumeration e = mm.getMapNames();
                while (e.hasMoreElements()) {
                    String field = (String)e.nextElement();
                    root.set(field, (JsonNode)mapper.convertValue(mm.getObject(field), JsonNode.class));
                }
                sb.append(mapper.writerWithDefaultPrettyPrinter().writeValueAsString((Object)root));
            } else if (message instanceof BytesMessage) {
                BytesMessage bm = (BytesMessage)message;
                bm.reset();
                byte[] bytes = new byte[(int)bm.getBodyLength()];
                bm.readBytes(bytes);
                sb.append(new String(bytes));
            } else {
                sb.append("  Unhandled message type: " + message.getJMSType());
            }
        }
        catch (Exception e) {
            log.log(Level.SEVERE, "Unable to format message:", e);
        }
        return sb.toString();
    }

    private boolean setMessageHeader(Message m, String key, String value, Session session) {
        try {
            switch (key.toLowerCase()) {
                case "jmscorrelationid": {
                    m.setJMSCorrelationID(value);
                    return true;
                }
                case "jmsreplyto": {
                    Topic destination = session.createTopic(value);
                    m.setJMSReplyTo((Destination)destination);
                    return true;
                }
                case "jmstype": {
                    m.setJMSType(value);
                    return true;
                }
            }
            log.log(Level.WARNING, "Unable to set message header '" + key + "'.");
        }
        catch (Exception e) {
            log.log(Level.SEVERE, "Unhandled exception setting message header '" + key + "'.", e);
        }
        return false;
    }
}

