/*
 * Decompiled with CFR 0.152.
 */
package org.fusesource.fabric.stream.log;

import java.io.File;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;
import org.apache.activemq.command.ActiveMQDestination;
import org.fusesource.fabric.stream.log.Callback;
import org.fusesource.fabric.stream.log.LogStreamer;
import org.fusesource.fabric.stream.log.Processor;
import org.fusesource.fabric.stream.log.Support;

public class Producer {
    private String logFilePattern = null;
    private File positionFile = null;
    private String broker;
    private String destination;
    private int batchSize = 65536;
    private long batchTimeout = 5000L;
    private boolean compress = true;
    private InputStream is = null;

    public static void main(String[] args) throws Exception {
        Producer producer = new Producer();
        LinkedList<String> argl = new LinkedList<String>(Arrays.asList(args));
        while (!argl.isEmpty()) {
            try {
                String arg = argl.removeFirst();
                if ("--help".equals(arg)) {
                    Producer.displayHelpAndExit(0);
                    continue;
                }
                if ("--broker".equals(arg)) {
                    producer.broker = Producer.shift(argl);
                    continue;
                }
                if ("--destination".equals(arg)) {
                    producer.destination = Producer.shift(argl);
                    continue;
                }
                if ("--batch-size".equals(arg)) {
                    producer.batchSize = Integer.parseInt(Producer.shift(argl));
                    continue;
                }
                if ("--batch-timeout".equals(arg)) {
                    producer.batchTimeout = Long.parseLong(Producer.shift(argl));
                    continue;
                }
                if ("--compress".equals(arg)) {
                    producer.compress = Boolean.parseBoolean(Producer.shift(argl));
                    continue;
                }
                if ("--log-file".equals(arg)) {
                    producer.logFilePattern = Producer.shift(argl);
                    continue;
                }
                if ("--position-file".equals(arg)) {
                    producer.positionFile = new File(Producer.shift(argl));
                    continue;
                }
                System.err.println("Invalid usage: unknown option: " + arg);
                Producer.displayHelpAndExit(1);
            }
            catch (NumberFormatException e) {
                System.err.println("Invalid usage: argument not a number");
                Producer.displayHelpAndExit(1);
            }
        }
        if (producer.logFilePattern != null ^ producer.positionFile != null) {
            System.err.println("Invalid usage: --log-file and --position-file but both be set.");
            Producer.displayHelpAndExit(1);
        }
        if (producer.broker == null) {
            System.err.println("Invalid usage: --broker option not specified.");
            Producer.displayHelpAndExit(1);
        }
        if (producer.destination == null) {
            System.err.println("Invalid usage: --destination option not specified.");
            Producer.displayHelpAndExit(1);
        }
        producer.execute();
        System.exit(0);
    }

    private static String shift(LinkedList<String> argl) {
        if (argl.isEmpty()) {
            System.err.println("Invalid usage: Missing argument");
            Producer.displayHelpAndExit(1);
        }
        return argl.removeFirst();
    }

    private static void displayHelpAndExit(int exitCode) {
        Support.displayResourceFile("producer-usage.txt");
        System.exit(exitCode);
    }

    private void execute() throws Exception {
        LogStreamer source = this.configure();
        source.start();
        Producer producer = this;
        synchronized (producer) {
            while (true) {
                this.wait();
            }
        }
    }

    public LogStreamer configure() throws Exception {
        Processor processor = new Processor(){
            Connection connection;
            Session session;
            ActiveMQMessageProducer producer;

            @Override
            public void start() throws JMSException {
                ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(Producer.this.broker);
                this.connection = factory.createConnection();
                this.connection.start();
                this.session = this.connection.createSession(false, 1);
                this.producer = (ActiveMQMessageProducer)this.session.createProducer(ActiveMQDestination.createDestination(Producer.this.destination, (byte)1));
                if (Producer.this.positionFile != null) {
                    this.producer.setDeliveryMode(2);
                } else {
                    this.producer.setDeliveryMode(1);
                }
            }

            @Override
            public void stop() {
                new Thread(){

                    @Override
                    public void run() {
                        try {
                            connection.stop();
                        }
                        catch (JMSException jMSException) {
                            // empty catch block
                        }
                    }
                }.start();
            }

            @Override
            public void send(HashMap<String, String> headers, byte[] data, final Callback onComplete) {
                try {
                    BytesMessage msg = this.session.createBytesMessage();
                    this.producer.send(msg, new AsyncCallback(){

                        @Override
                        public void onSuccess() {
                            onComplete.onSuccess();
                        }

                        @Override
                        public void onException(JMSException exception) {
                            onComplete.onFailure(exception);
                        }
                    });
                }
                catch (JMSException e) {
                    onComplete.onFailure(e);
                }
            }
        };
        if (this.compress) {
            final Processor next = processor;
            processor = new Processor(){

                @Override
                public void start() throws Exception {
                    next.start();
                }

                @Override
                public void stop() {
                    next.stop();
                }

                @Override
                public void send(HashMap<String, String> headers, byte[] data, Callback onComplete) {
                    next.send(headers, Support.compress(data), onComplete);
                }
            };
        }
        LogStreamer streamer = new LogStreamer();
        streamer.setBatchSize(this.batchSize);
        streamer.setBatchTimeout(this.batchTimeout);
        streamer.setIs(this.is);
        streamer.setLogFilePattern(this.logFilePattern);
        streamer.setPositionFile(this.positionFile);
        if (this.positionFile == null) {
            streamer.setExitOnEOF(true);
        }
        streamer.setProcessor(processor);
        return streamer;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public long getBatchTimeout() {
        return this.batchTimeout;
    }

    public void setBatchTimeout(long batchTimeout) {
        this.batchTimeout = batchTimeout;
    }

    public String getBroker() {
        return this.broker;
    }

    public void setBroker(String broker) {
        this.broker = broker;
    }

    public boolean isCompress() {
        return this.compress;
    }

    public void setCompress(boolean compress) {
        this.compress = compress;
    }

    public String getDestination() {
        return this.destination;
    }

    public void setDestination(String destination) {
        this.destination = destination;
    }

    public InputStream getIs() {
        return this.is;
    }

    public void setIs(InputStream is) {
        this.is = is;
    }

    public String getLogFilePattern() {
        return this.logFilePattern;
    }

    public void setLogFilePattern(String logFilePattern) {
        this.logFilePattern = logFilePattern;
    }

    public File getPositionFile() {
        return this.positionFile;
    }

    public void setPositionFile(File positionFile) {
        this.positionFile = positionFile;
    }
}

