/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rya.mongodb.batch;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mongodb.DuplicateKeyException;
import com.mongodb.MongoBulkWriteException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.rya.mongodb.batch.MongoDbBatchWriterConfig;
import org.apache.rya.mongodb.batch.MongoDbBatchWriterException;
import org.apache.rya.mongodb.batch.collection.CollectionType;

public class MongoDbBatchWriter<T> {
    private static final Logger log = Logger.getLogger(MongoDbBatchWriter.class);
    private static final int CHECK_QUEUE_INTERVAL_MS = 10;
    private final CollectionType<T> collectionType;
    private final long batchFlushTimeMs;
    private final ArrayBlockingQueue<T> statementInsertionQueue;
    private final ScheduledThreadPoolExecutor scheduledExecutor = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(0);
    private ScheduledFuture<?> flushBatchFuture;
    private final Runnable flushBatchTask;
    private Thread queueFullCheckerThread;
    private final AtomicBoolean isInit = new AtomicBoolean();
    private static final ThreadFactory QUEUE_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("Queue Full Checker Thread - %d").build();

    public MongoDbBatchWriter(CollectionType<T> collectionType, MongoDbBatchWriterConfig mongoDbBatchWriterConfig) {
        this.collectionType = (CollectionType)Preconditions.checkNotNull(collectionType);
        this.batchFlushTimeMs = ((MongoDbBatchWriterConfig)Preconditions.checkNotNull((Object)mongoDbBatchWriterConfig)).getBatchFlushTimeMs();
        this.statementInsertionQueue = new ArrayBlockingQueue(mongoDbBatchWriterConfig.getBatchSize());
        this.flushBatchTask = new BatchFlusher();
    }

    public void start() throws MongoDbBatchWriterException {
        if (!this.isInit.get()) {
            if (this.flushBatchFuture == null) {
                this.flushBatchFuture = this.startFlushTimer();
            }
            if (this.queueFullCheckerThread == null) {
                this.queueFullCheckerThread = QUEUE_THREAD_FACTORY.newThread(new QueueFullChecker());
            }
            this.isInit.set(true);
            this.queueFullCheckerThread.start();
        }
    }

    public void shutdown() throws MongoDbBatchWriterException {
        this.isInit.set(false);
        if (this.flushBatchFuture != null) {
            this.flushBatchFuture.cancel(true);
            this.flushBatchFuture = null;
        }
        if (this.queueFullCheckerThread != null && this.queueFullCheckerThread.isAlive()) {
            try {
                this.queueFullCheckerThread.join(20L);
            }
            catch (InterruptedException e) {
                log.error((Object)"Error waiting for thread to finish", (Throwable)e);
            }
            this.queueFullCheckerThread = null;
        }
    }

    public void addObjectToQueue(T object) throws MongoDbBatchWriterException {
        if (object != null) {
            try {
                this.resetFlushTimer();
                this.statementInsertionQueue.put(object);
            }
            catch (Exception e) {
                throw new MongoDbBatchWriterException("Error adding object to batch queue.", e);
            }
        }
    }

    public void addObjectsToQueue(List<T> objects) throws MongoDbBatchWriterException {
        if (objects != null) {
            for (T object : objects) {
                this.addObjectToQueue(object);
            }
        }
    }

    public void flush() throws MongoDbBatchWriterException {
        ArrayList batch = new ArrayList();
        try {
            this.statementInsertionQueue.drainTo(batch);
            if (!batch.isEmpty()) {
                this.collectionType.insertMany(batch);
            }
        }
        catch (DuplicateKeyException e) {
            log.warn((Object)e);
        }
        catch (MongoBulkWriteException e) {
            if (e.getMessage().contains("duplicate key error")) {
                log.warn((Object)e);
            }
            throw new MongoDbBatchWriterException("Error flushing statements", e);
        }
        catch (Exception e) {
            throw new MongoDbBatchWriterException("Error flushing statements", e);
        }
    }

    private void resetFlushTimer() throws MongoDbBatchWriterException {
        this.flushBatchFuture.cancel(false);
        this.flushBatchFuture = this.startFlushTimer();
    }

    private ScheduledFuture<?> startFlushTimer() throws MongoDbBatchWriterException {
        try {
            return this.scheduledExecutor.schedule(this.flushBatchTask, this.batchFlushTimeMs, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            throw new MongoDbBatchWriterException("Error starting batch flusher", e);
        }
    }

    private class QueueFullChecker
    implements Runnable {
        private QueueFullChecker() {
        }

        @Override
        public void run() {
            try {
                while (MongoDbBatchWriter.this.isInit.get()) {
                    if (MongoDbBatchWriter.this.statementInsertionQueue.remainingCapacity() == 0) {
                        log.trace((Object)"Statement queue is FULL -> going to empty it");
                        try {
                            MongoDbBatchWriter.this.flush();
                        }
                        catch (MongoDbBatchWriterException e) {
                            log.error((Object)"Error emptying queue", (Throwable)e);
                        }
                    }
                    Thread.sleep(10L);
                }
            }
            catch (InterruptedException e) {
                log.error((Object)"Encountered an unexpected error while checking the batch queue.", (Throwable)e);
            }
        }
    }

    private class BatchFlusher
    implements Runnable {
        private BatchFlusher() {
        }

        @Override
        public void run() {
            try {
                if (!MongoDbBatchWriter.this.statementInsertionQueue.isEmpty()) {
                    log.trace((Object)"Running statement insertion flush task. Too much time has passed without any object insertions so all queued data is being flushed.");
                    MongoDbBatchWriter.this.flush();
                }
            }
            catch (Exception e) {
                log.error((Object)"Error flush out the statements", (Throwable)e);
            }
        }
    }
}

