/*
 * Decompiled with CFR 0.152.
 */
package org.fusesource.fabric.stream.log;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.fusesource.fabric.stream.log.Callback;
import org.fusesource.fabric.stream.log.Processor;
import org.fusesource.fabric.stream.log.Support;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LogStreamer {
    private static final transient Logger LOG = LoggerFactory.getLogger(LogStreamer.class);
    static final Object EOF = new Object();
    final ExecutorService inputReader = Executors.newSingleThreadExecutor();
    final ExecutorService batchReader = Executors.newSingleThreadExecutor();
    private final AtomicBoolean runAllowed = new AtomicBoolean(false);
    final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue(1024);
    public String logFilePattern = null;
    public File positionFile = null;
    public int batchSize = 262144;
    public long batchTimeout = 1000L;
    public long tailRetry = 500L;
    public InputStream is;
    public boolean exitOnEOF;
    public Processor processor;
    public Semaphore sendSemaphore = new Semaphore(10);

    LogStreamer() {
    }

    private void updateLogPosition(long file, long offset) {
        if (this.positionFile != null) {
            try {
                Support.writeText(this.positionFile, String.format("%d:%d\n", file, offset));
            }
            catch (IOException e) {
                new RuntimeException(e);
            }
        }
    }

    private boolean logFileExists(long file) {
        return new File(String.format(this.logFilePattern, file)).exists();
    }

    private boolean isRunAllowed() {
        return this.runAllowed.get();
    }

    public void start() {
        if (this.runAllowed.compareAndSet(false, true)) {
            try {
                this.processor.start();
            }
            catch (Exception e) {
                this.runAllowed.set(false);
                return;
            }
            this.inputReader.execute(new Runnable(){

                @Override
                public void run() {
                    LogStreamer.this.readInput();
                }
            });
            this.batchReader.execute(new Runnable(){

                @Override
                public void run() {
                    LogStreamer.this.drainBatchQueue();
                }
            });
        }
    }

    public void stop() {
        if (this.runAllowed.compareAndSet(true, false)) {
            this.inputReader.shutdown();
            this.batchReader.shutdown();
            this.processor.stop();
        }
        this.runAllowed.set(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void readInput() {
        if (this.logFilePattern != null) {
            long currentFile = -1L;
            long currentOffset = -1L;
            try {
                if (!this.positionFile.exists()) {
                    Support.writeText(this.positionFile, "0:0");
                }
                String data = Support.readText(this.positionFile).trim();
                String[] split = data.split(":");
                currentFile = Long.parseLong(split[0]);
                currentOffset = Long.parseLong(split[1]);
                while (this.runAllowed.get()) {
                    File current = new File(String.format(this.logFilePattern, currentFile));
                    FileInputStream is = new FileInputStream(current);
                    try {
                        this.process(is, currentFile, currentOffset);
                        ++currentFile;
                        currentOffset = 0L;
                    }
                    finally {
                        is.close();
                    }
                }
            }
            catch (Exception e) {
                e.printStackTrace();
                return;
            }
        }
        if (this.is == null) {
            this.is = System.in;
        }
        try {
            this.process(this.is, 0L, 0L);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    private boolean process(InputStream is, long file, long offset) throws IOException, InterruptedException {
        if (offset > 0L) {
            is.skip(offset);
        }
        int pos = 0;
        byte[] batch = new byte[4096];
        boolean eof_possible = false;
        while (this.isRunAllowed()) {
            int count = is.read(batch, pos, batch.length - pos);
            if (count < 0) {
                if (this.logFilePattern == null || eof_possible) {
                    if (pos > 0) {
                        this.queue.put(new QueueEntry(Arrays.copyOf(batch, pos), file, offset, pos));
                    }
                    this.queue.put(EOF);
                    return true;
                }
                if (this.logFileExists(file + 1L)) {
                    eof_possible = true;
                    continue;
                }
                eof_possible = false;
                Thread.sleep(this.tailRetry);
            } else {
                eof_possible = false;
                int at = Support.lastnlposition(batch, pos += count);
                if (at >= 0) {
                    int len = at + 1;
                    byte[] data = Arrays.copyOf(batch, len);
                    int remaining = pos - len;
                    System.arraycopy(batch, len, batch, 0, remaining);
                    pos = remaining;
                    this.queue.put(new QueueEntry(data, file, offset, len));
                }
                if (pos == batch.length) {
                    this.queue.put(new QueueEntry(batch, file, offset, pos));
                    batch = new byte[batch.length];
                    pos = 0;
                }
            }
            offset += (long)count;
        }
        return false;
    }

    private void drainBatchQueue() {
        while (this.isRunAllowed()) {
            boolean atEOF = false;
            while (this.isRunAllowed() && !atEOF) {
                QueueEntry firstEntry = null;
                ByteArrayOutputStream batch = new ByteArrayOutputStream((int)((double)this.batchSize * 1.5));
                try {
                    Object obj = this.queue.poll(1000L, TimeUnit.MILLISECONDS);
                    if (obj == null) continue;
                    if (obj == EOF) {
                        atEOF = true;
                        continue;
                    }
                    long start = System.currentTimeMillis();
                    long timeout = start + this.batchTimeout;
                    QueueEntry lastEntry = (QueueEntry)obj;
                    if (firstEntry == null) {
                        firstEntry = lastEntry;
                    }
                    batch.write(lastEntry.data);
                    while (batch.size() < this.batchSize && !atEOF) {
                        obj = this.queue.poll();
                        if (obj != null) {
                            if (obj == EOF) {
                                atEOF = true;
                                continue;
                            }
                            lastEntry = (QueueEntry)obj;
                            batch.write(lastEntry.data);
                            continue;
                        }
                        long remaining = timeout - System.currentTimeMillis();
                        if (remaining <= 0L || (obj = this.queue.poll(remaining, TimeUnit.MILLISECONDS)) == null) break;
                        if (obj == EOF) {
                            atEOF = true;
                            continue;
                        }
                        lastEntry = (QueueEntry)obj;
                        batch.write(lastEntry.data);
                    }
                    if (batch.size() <= 0) continue;
                    byte[] body = batch.toByteArray();
                    batch.reset();
                    assert (firstEntry.file == lastEntry.file);
                    HashMap<String, String> headers = new HashMap<String, String>();
                    headers.put("at", String.format("%d:%d", firstEntry.file, firstEntry.offset));
                    final QueueEntry entry = lastEntry;
                    this.send(headers, lastEntry, body, new Runnable(){

                        @Override
                        public void run() {
                            LogStreamer.this.updateLogPosition(entry.file, entry.offset + (long)entry.size);
                        }
                    });
                }
                catch (Exception e) {
                    e.printStackTrace();
                    this.stop();
                }
            }
            if (!atEOF || !this.isRunAllowed()) continue;
            HashMap<String, String> headers = new HashMap<String, String>();
            headers.put("EOF", "true");
            this.send(headers, null, new byte[0], new Runnable(){

                @Override
                public void run() {
                    if (LogStreamer.this.exitOnEOF) {
                        System.exit(0);
                    }
                }
            });
        }
    }

    private void send(HashMap<String, String> headers, QueueEntry lastEntry, byte[] body, final Runnable onComplete) {
        try {
            this.sendSemaphore.acquire();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        this.processor.send(headers, body, new Callback(){

            @Override
            public void onSuccess() {
                try {
                    LogStreamer.this.sendSemaphore.release();
                    if (onComplete != null) {
                        onComplete.run();
                    }
                }
                catch (Throwable e) {
                    this.onFailure(e);
                }
            }

            @Override
            public void onFailure(Throwable e) {
                e.printStackTrace();
                LogStreamer.this.stop();
            }
        });
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public long getBatchTimeout() {
        return this.batchTimeout;
    }

    public void setBatchTimeout(long batchTimeout) {
        this.batchTimeout = batchTimeout;
    }

    public InputStream getIs() {
        return this.is;
    }

    public void setIs(InputStream is) {
        this.is = is;
    }

    public boolean isExitOnEOF() {
        return this.exitOnEOF;
    }

    public void setExitOnEOF(boolean exitOnEOF) {
        this.exitOnEOF = exitOnEOF;
    }

    public String getLogFilePattern() {
        return this.logFilePattern;
    }

    public void setLogFilePattern(String logFilePattern) {
        this.logFilePattern = logFilePattern;
    }

    public File getPositionFile() {
        return this.positionFile;
    }

    public void setPositionFile(File positionFile) {
        this.positionFile = positionFile;
    }

    public Processor getProcessor() {
        return this.processor;
    }

    public void setProcessor(Processor processor) {
        this.processor = processor;
    }

    public long getTailRetry() {
        return this.tailRetry;
    }

    public void setTailRetry(long tailRetry) {
        this.tailRetry = tailRetry;
    }

    static class QueueEntry {
        private final byte[] data;
        private final long file;
        private final long offset;
        private final int size;

        QueueEntry(byte[] data, long file, long offset, int size) {
            this.data = data;
            this.file = file;
            this.offset = offset;
            this.size = size;
        }
    }
}

