/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.jms;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.examples.jms.ActiveMQBroker;
import com.hazelcast.jet.examples.jms.JmsMessageProducer;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import java.io.Serializable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsTopicSample {
    private static final String INPUT_TOPIC = "inputTopic";
    private static final String OUTPUT_TOPIC = "outputTopic";
    private ActiveMQBroker broker;
    private JmsMessageProducer producer;
    private JetInstance jet;

    private static Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        p.readFrom(Sources.jmsTopic((SupplierEx & Serializable)() -> new ActiveMQConnectionFactory("tcp://localhost:61616"), (String)INPUT_TOPIC)).withoutTimestamps().filter((PredicateEx & Serializable)message -> message.getJMSPriority() > 3).map((FunctionEx & Serializable)message -> (TextMessage)message).peek(TextMessage::getText).writeTo(Sinks.jmsTopicBuilder((SupplierEx & Serializable)() -> new ActiveMQConnectionFactory("tcp://localhost:61616")).destinationName(OUTPUT_TOPIC).messageFn((BiFunctionEx & Serializable)(session, message) -> {
            TextMessage textMessage = session.createTextMessage(message.getText());
            textMessage.setBooleanProperty("isActive", true);
            textMessage.setJMSPriority(8);
            return textMessage;
        }).build());
        return p;
    }

    public static void main(String[] args) throws Exception {
        new JmsTopicSample().go();
    }

    private void go() throws Exception {
        try {
            this.setup();
            Job job = this.jet.newJob(JmsTopicSample.buildPipeline());
            TimeUnit.SECONDS.sleep(10L);
            job.cancel();
            try {
                job.join();
            }
            catch (CancellationException cancellationException) {
                // empty catch block
            }
        }
        finally {
            this.cleanup();
        }
    }

    private void setup() throws Exception {
        this.broker = new ActiveMQBroker();
        this.producer = new JmsMessageProducer(INPUT_TOPIC, false);
        this.jet = Jet.bootstrappedInstance();
    }

    private void cleanup() throws Exception {
        this.producer.stop();
        Jet.shutdownAll();
        this.broker.stop();
    }
}

