/*
 * Decompiled with CFR 0.152.
 */
package com.indeed.lsmtree.recordlog;

import com.google.common.collect.Lists;
import com.indeed.lsmtree.recordlog.RecordFile;
import com.indeed.lsmtree.recordlog.RecordLogDirectory;
import com.indeed.util.core.io.Closeables2;
import com.indeed.util.io.checkpointer.Checkpointer;
import com.indeed.util.varexport.Export;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;

public class GenericRecordLogDirectoryPoller<T>
implements Runnable,
Closeable {
    private static final Logger log = Logger.getLogger(GenericRecordLogDirectoryPoller.class);
    private static final int SYNC_FREQUENCY = 10000;
    private long lastPosition;
    private final RecordLogDirectory<T> recordLogDirectory;
    private final Checkpointer<Long> checkpointer;
    private final boolean loop;
    private final boolean gc;
    private final boolean skipFirst;
    private final AtomicBoolean isClosed;
    private final List<Functions<T>> functionsList;
    private Thread pollerThread;

    public GenericRecordLogDirectoryPoller(RecordLogDirectory<T> recordLogDirectory, Checkpointer<Long> checkpointer) throws IOException {
        this(recordLogDirectory, checkpointer, true);
    }

    public GenericRecordLogDirectoryPoller(RecordLogDirectory<T> recordLogDirectory, Checkpointer<Long> checkpointer, boolean loop) throws IOException {
        this(recordLogDirectory, checkpointer, loop, false);
    }

    public GenericRecordLogDirectoryPoller(RecordLogDirectory<T> recordLogDirectory, Checkpointer<Long> checkpointer, boolean loop, boolean gc) throws IOException {
        this(recordLogDirectory, checkpointer, loop, gc, false);
    }

    public GenericRecordLogDirectoryPoller(RecordLogDirectory<T> recordLogDirectory, Checkpointer<Long> checkpointer, boolean loop, boolean gc, boolean skipFirst) throws IOException {
        this.recordLogDirectory = recordLogDirectory;
        this.checkpointer = checkpointer;
        this.loop = loop;
        this.gc = gc;
        this.skipFirst = skipFirst;
        this.isClosed = new AtomicBoolean(false);
        this.functionsList = Lists.newArrayList();
        this.lastPosition = (Long)checkpointer.getCheckpoint();
    }

    public void registerFunctions(Functions<T> functions) {
        this.functionsList.add(functions);
    }

    public void start() {
        this.pollerThread = new Thread(this);
        this.pollerThread.start();
    }

    @Override
    public void run() {
        do {
            try {
                long start;
                RecordFile.Reader<T> poller;
                block19: {
                    try {
                        poller = this.recordLogDirectory.reader(this.lastPosition);
                    }
                    catch (Exception e) {
                        log.error((Object)"error seeking to last valid position", (Throwable)e);
                        Thread.sleep(5000L);
                        continue;
                    }
                    int count = 0;
                    long lastKnownGoodPosition = start = this.lastPosition;
                    try {
                        boolean doLoop = this.skipFirst ? poller.next() : true;
                        if (!doLoop) break block19;
                        while (poller.next()) {
                            this.lastPosition = poller.getPosition();
                            T op = poller.get();
                            for (Functions<T> functions : this.functionsList) {
                                functions.process(this.lastPosition, op);
                            }
                            if (++count % 10000 == 0) {
                                for (Functions<T> functions : this.functionsList) {
                                    functions.sync();
                                }
                                this.checkpointPosition(this.lastPosition);
                            }
                            lastKnownGoodPosition = this.lastPosition;
                        }
                    }
                    catch (Exception e) {
                        log.error((Object)"error reading segment file", (Throwable)e);
                        this.lastPosition = lastKnownGoodPosition;
                        Closeables2.closeQuietly(poller, (Logger)log);
                        Thread.sleep(5000L);
                        continue;
                    }
                }
                try {
                    poller.close();
                }
                catch (IOException e) {
                    log.error((Object)"error closing reader", (Throwable)e);
                }
                if (start != this.lastPosition) {
                    for (Functions<T> functions : this.functionsList) {
                        try {
                            functions.sync();
                        }
                        catch (Exception e) {
                            log.error((Object)"sync error", (Throwable)e);
                            throw new RuntimeException(e);
                        }
                    }
                    try {
                        this.checkpointPosition(this.lastPosition);
                    }
                    catch (Exception e) {
                        log.error((Object)"error writing last position file", (Throwable)e);
                    }
                }
                if (!this.loop) continue;
                Thread.sleep(5000L);
            }
            catch (InterruptedException e) {
                return;
            }
        } while (!this.isClosed.get() && this.loop);
    }

    private void checkpointPosition(long lastPosition) throws IOException {
        this.checkpointer.setCheckpoint((Object)lastPosition);
        if (this.gc) {
            this.recordLogDirectory.garbageCollect(lastPosition);
        }
    }

    @Export(name="last-position", doc="The last position poller has read")
    public long getLastPosition() {
        return this.lastPosition;
    }

    @Export(name="current-segment-timestamp", doc="Timestamp of the current segment being read")
    public long getCurrentSegmentTimestamp() throws IOException {
        return this.recordLogDirectory.getSegmentTimestamp(this.getCurrentSegmentNum());
    }

    @Export(name="current-segment-timestring", doc="Same as current-segment-timestamp in human readable form")
    public String getCurrentSegmentTimestring() throws IOException {
        return new DateTime(this.getCurrentSegmentTimestamp()).toString();
    }

    @Export(name="max-segment-timestamp", doc="Timestamp of the max segment that exists in directory")
    public long getMaxSegmentTimestamp() throws IOException {
        return this.recordLogDirectory.getSegmentTimestamp(this.getMaxSegmentNum());
    }

    @Export(name="max-segment-timestring", doc="Same as max-segment-timestamp in human readable form")
    public String getMaxSegmentTimestring() throws IOException {
        return new DateTime(this.getMaxSegmentTimestamp()).toString();
    }

    @Export(name="current-segment-num", doc="The current segment that the poller is reading")
    public int getCurrentSegmentNum() {
        return this.recordLogDirectory.getSegmentNum(this.getLastPosition());
    }

    @Export(name="max-segment-num", doc="The max segment number that exists in directory")
    public int getMaxSegmentNum() throws IOException {
        return this.recordLogDirectory.getMaxSegmentNum();
    }

    @Override
    public void close() throws IOException {
        this.isClosed.set(true);
        if (this.pollerThread != null) {
            while (this.pollerThread.isAlive()) {
                Thread.yield();
            }
        }
    }

    public boolean isAlive() {
        return this.pollerThread.isAlive();
    }

    public static interface Functions<T> {
        public void process(long var1, T var3) throws IOException;

        public void sync() throws IOException;
    }
}

