/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.commitlog;

import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.ICommitLogExecutorService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;

class PeriodicCommitLogExecutorService
implements ICommitLogExecutorService {
    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getCommitLogPeriodicQueueSize());
    protected volatile long completedTaskCount = 0L;
    private final Thread appendingThread;
    private volatile boolean run = true;

    public PeriodicCommitLogExecutorService(final CommitLog commitLog) {
        WrappedRunnable runnable = new WrappedRunnable(){

            @Override
            public void runMayThrow() throws Exception {
                while (PeriodicCommitLogExecutorService.this.run) {
                    Runnable r = (Runnable)PeriodicCommitLogExecutorService.this.queue.poll(100L, TimeUnit.MILLISECONDS);
                    if (r == null) continue;
                    r.run();
                    ++PeriodicCommitLogExecutorService.this.completedTaskCount;
                }
                commitLog.sync();
            }
        };
        this.appendingThread = new Thread((Runnable)runnable, "COMMIT-LOG-WRITER");
        this.appendingThread.start();
        final Callable syncer = new Callable(){

            public Object call() throws Exception {
                commitLog.sync();
                return null;
            }
        };
        new Thread(new Runnable(){

            @Override
            public void run() {
                while (PeriodicCommitLogExecutorService.this.run) {
                    FBUtilities.waitOnFuture(PeriodicCommitLogExecutorService.this.submit(syncer));
                    Uninterruptibles.sleepUninterruptibly((long)DatabaseDescriptor.getCommitLogSyncPeriod(), (TimeUnit)TimeUnit.MILLISECONDS);
                }
            }
        }, "PERIODIC-COMMIT-LOG-SYNCER").start();
    }

    @Override
    public void add(CommitLog.LogRecordAdder adder) {
        try {
            this.queue.put(adder);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        FutureTask<T> ft = new FutureTask<T>(task);
        try {
            this.queue.put(ft);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return ft;
    }

    @Override
    public void shutdown() {
        new Thread((Runnable)new WrappedRunnable(){

            @Override
            public void runMayThrow() throws InterruptedException, IOException {
                while (!PeriodicCommitLogExecutorService.this.queue.isEmpty()) {
                    Thread.sleep(100L);
                }
                PeriodicCommitLogExecutorService.this.run = false;
                PeriodicCommitLogExecutorService.this.appendingThread.join();
            }
        }, "Commitlog Shutdown").start();
    }

    @Override
    public void awaitTermination() throws InterruptedException {
        this.appendingThread.join();
    }

    @Override
    public long getPendingTasks() {
        return this.queue.size();
    }

    @Override
    public long getCompletedTasks() {
        return this.completedTaskCount;
    }
}

