/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.jms;

import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.examples.jms.ActiveMQBroker;
import com.hazelcast.jet.examples.jms.JmsMessageProducer;
import com.hazelcast.jet.function.BiFunctionEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.PredicateEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import java.io.Serializable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsQueueSample {
    private static final String INPUT_QUEUE = "inputQueue";
    private static final String OUTPUT_QUEUE = "outputQueue";
    private ScheduledExecutorService scheduledExecutorService;
    private ActiveMQBroker activeMQBroker;
    private JmsMessageProducer producer;
    private JetInstance jet;

    private static Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.jmsQueue((SupplierEx & Serializable)() -> new ActiveMQConnectionFactory("tcp://localhost:61616"), (String)INPUT_QUEUE)).withoutTimestamps().filter((PredicateEx & Serializable)message -> message.getJMSPriority() > 3).map((FunctionEx & Serializable)message -> (TextMessage)message).peek(TextMessage::getText).drainTo(Sinks.jmsQueueBuilder((SupplierEx & Serializable)() -> new ActiveMQConnectionFactory("tcp://localhost:61616")).destinationName(OUTPUT_QUEUE).messageFn((BiFunctionEx & Serializable)(session, message) -> {
            TextMessage textMessage = session.createTextMessage(message.getText());
            textMessage.setBooleanProperty("isActive", true);
            textMessage.setJMSPriority(8);
            return textMessage;
        }).build());
        return p;
    }

    public static void main(String[] args) throws Exception {
        new JmsQueueSample().go();
    }

    private void go() throws Exception {
        Job job = null;
        try {
            this.setup();
            job = this.jet.newJob(JmsQueueSample.buildPipeline());
            this.scheduledExecutorService.schedule(() -> ((Job)job).cancel(), 10L, TimeUnit.SECONDS);
            job.join();
        }
        catch (CancellationException e) {
            JmsQueueSample.waitForComplete(job);
        }
        finally {
            this.cleanup();
        }
    }

    private void setup() throws Exception {
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.activeMQBroker = new ActiveMQBroker();
        this.activeMQBroker.start();
        this.producer = new JmsMessageProducer(INPUT_QUEUE, JmsMessageProducer.DestinationType.QUEUE);
        this.producer.start();
        this.jet = Jet.newJetInstance();
    }

    private void cleanup() {
        this.scheduledExecutorService.shutdown();
        this.producer.stop();
        this.activeMQBroker.stop();
        Jet.shutdownAll();
    }

    private static void waitForComplete(Job job) {
        while (job.getStatus() != JobStatus.COMPLETED) {
            Util.uncheckRun(() -> TimeUnit.SECONDS.sleep(1L));
        }
    }
}

