package com.ning.billing.queue;

import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.ning.billing.Hostname;
import com.ning.billing.clock.Clock;
import com.ning.billing.queue.api.PersistentQueueConfig;
import com.ning.billing.queue.api.PersistentQueueEntryLifecycleState;
import com.ning.billing.queue.dao.EventEntryModelDao;
import com.ning.billing.queue.dao.QueueSqlDao;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import net.sf.ehcache.config.TimeoutBehaviorConfiguration;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ning/billing/queue/DBBackedQueue.class */
public class DBBackedQueue<T extends EventEntryModelDao> {
    private static final Logger log = LoggerFactory.getLogger(DBBackedQueue.class);
    private static final long INFLIGHT_ENTRIES_INITIAL_POLL_SLEEP_MS = 10;
    private static final int INFLIGHT_ENTRIES_POLL_MAX_RETRY = 10;
    private final String DB_QUEUE_LOG_ID;
    private final AtomicBoolean isQueueOpenForWrite;
    private final AtomicBoolean isQueueOpenForRead;
    private final QueueSqlDao<T> sqlDao;
    private final Clock clock;
    private final Queue<Long> inflightEvents;
    private final PersistentQueueConfig config;
    private final String tableName;
    private final String historyTableName;
    private final boolean useInflightQueue;
    private final Counter totalInflightProcessed;
    private final Counter totalProcessed;
    private final Counter totalInflightWritten;
    private final Counter totalWritten;

    public DBBackedQueue(Clock clock, QueueSqlDao<T> queueSqlDao, PersistentQueueConfig persistentQueueConfig, String str, String str2, String str3) {
        this.useInflightQueue = persistentQueueConfig.isUsingInflightQueue();
        this.sqlDao = queueSqlDao;
        this.config = persistentQueueConfig;
        this.tableName = str;
        this.historyTableName = str2;
        this.inflightEvents = this.useInflightQueue ? new LinkedBlockingQueue(persistentQueueConfig.getQueueCapacity()) : null;
        this.isQueueOpenForWrite = new AtomicBoolean(false);
        this.isQueueOpenForRead = new AtomicBoolean(false);
        this.clock = clock;
        this.totalInflightProcessed = Metrics.newCounter(DBBackedQueue.class, str3 + "-totalInflightProcessed");
        this.totalProcessed = Metrics.newCounter(DBBackedQueue.class, str3 + "-totalProcessed");
        this.totalInflightWritten = Metrics.newCounter(DBBackedQueue.class, str3 + "-totalInflightWritten");
        this.totalWritten = Metrics.newCounter(DBBackedQueue.class, str3 + "-totalWritten");
        this.DB_QUEUE_LOG_ID = "DBBackedQueue-" + str3 + ": ";
    }

    public void initialize() {
        List<T> fetchReadyEntries = fetchReadyEntries(this.config.getPrefetchEntries());
        if (fetchReadyEntries.size() == 0) {
            this.isQueueOpenForWrite.set(true);
            this.isQueueOpenForRead.set(true);
        } else if (fetchReadyEntries.size() < this.config.getPrefetchEntries()) {
            this.isQueueOpenForWrite.set(true);
            this.isQueueOpenForRead.set(false);
        } else {
            this.isQueueOpenForWrite.set(false);
            this.isQueueOpenForRead.set(false);
        }
        if (this.useInflightQueue) {
            this.inflightEvents.clear();
        }
        this.totalInflightProcessed.clear();
        this.totalProcessed.clear();
        this.totalInflightWritten.clear();
        this.totalWritten.clear();
        log.info(this.DB_QUEUE_LOG_ID + "Initialized with isQueueOpenForWrite = " + this.isQueueOpenForWrite.get() + ", isQueueOpenForRead" + this.isQueueOpenForRead.get());
    }

    public void insertEntry(final T t) {
        this.sqlDao.inTransaction(new Transaction<Void, QueueSqlDao<T>>() { // from class: com.ning.billing.queue.DBBackedQueue.1
            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.skife.jdbi.v2.Transaction
            public Void inTransaction(QueueSqlDao<T> queueSqlDao, TransactionStatus transactionStatus) throws Exception {
                DBBackedQueue.this.insertEntryFromTransaction(queueSqlDao, t);
                return null;
            }
        });
    }

    public void insertEntryFromTransaction(QueueSqlDao<T> queueSqlDao, T t) {
        queueSqlDao.insertEntry(t, this.tableName);
        this.totalWritten.inc();
        if (this.useInflightQueue && this.isQueueOpenForWrite.get()) {
            Long lastInsertId = queueSqlDao.getLastInsertId();
            boolean offer = this.inflightEvents.offer(lastInsertId);
            if (log.isDebugEnabled()) {
                log.debug(this.DB_QUEUE_LOG_ID + "Inserting entry " + lastInsertId + (offer ? " into inflightQ" : " into disk") + " [" + t.getEventJson() + "]");
            }
            if (offer) {
                this.totalInflightWritten.inc();
            } else if (this.isQueueOpenForWrite.compareAndSet(true, false)) {
                log.info(this.DB_QUEUE_LOG_ID + "Closing Q for write: Overflowed with recordId = " + lastInsertId);
            }
        }
    }

    public List<T> getReadyEntries() {
        List<T> of = ImmutableList.of();
        if (!this.useInflightQueue) {
            List<T> fetchReadyEntries = fetchReadyEntries(this.config.getMaxEntriesClaimed());
            if (fetchReadyEntries.size() > 0) {
                of = claimEntries(fetchReadyEntries);
            }
            return of;
        }
        if (this.isQueueOpenForRead.get()) {
            of = fetchReadyEntriesFromIds();
            if (of.size() > 0) {
                return claimEntries(of);
            }
            if (!this.isQueueOpenForWrite.get() && this.isQueueOpenForRead.compareAndSet(true, false)) {
                log.info(this.DB_QUEUE_LOG_ID + " Closing Q for read");
            }
        }
        if (this.isQueueOpenForRead.get()) {
            return of;
        }
        List<T> fetchReadyEntries2 = fetchReadyEntries(this.config.getPrefetchEntries());
        if (fetchReadyEntries2.size() < this.config.getPrefetchEntries()) {
            log.info(this.DB_QUEUE_LOG_ID + " Opening Q for write");
            this.isQueueOpenForWrite.compareAndSet(false, true);
        }
        List<T> subList = fetchReadyEntries2.subList(0, fetchReadyEntries2.size() > this.config.getMaxEntriesClaimed() ? this.config.getMaxEntriesClaimed() : fetchReadyEntries2.size());
        this.totalProcessed.inc(subList.size());
        if (removeInflightEventsWhenSwitchingToQueueOpenForRead(subList)) {
            log.info(this.DB_QUEUE_LOG_ID + " Opening Q for read");
            if (this.isQueueOpenForRead.compareAndSet(false, true)) {
                log.info(this.DB_QUEUE_LOG_ID + " Opening Q for read");
            }
        }
        return claimEntries(subList);
    }

    private boolean removeInflightEventsWhenSwitchingToQueueOpenForRead(List<T> list) {
        boolean z = false;
        Iterator<T> it = list.iterator();
        while (it.hasNext()) {
            z = this.inflightEvents.remove(it.next().getRecordId());
        }
        return z;
    }

    public void moveEntryToHistory(final T t) {
        this.sqlDao.inTransaction(new Transaction<Void, QueueSqlDao<T>>() { // from class: com.ning.billing.queue.DBBackedQueue.2
            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.skife.jdbi.v2.Transaction
            public Void inTransaction(QueueSqlDao<T> queueSqlDao, TransactionStatus transactionStatus) throws Exception {
                DBBackedQueue.this.moveEntryToHistoryFromTransaction(queueSqlDao, t);
                return null;
            }
        });
    }

    public void moveEntryToHistoryFromTransaction(QueueSqlDao<T> queueSqlDao, T t) {
        queueSqlDao.insertEntry(t, this.historyTableName);
        queueSqlDao.removeEntry(t.getRecordId(), this.tableName);
    }

    private List<T> fetchReadyEntriesFromIds() {
        int maxEntriesClaimed = this.config.getMaxEntriesClaimed() < this.inflightEvents.size() ? this.config.getMaxEntriesClaimed() : this.inflightEvents.size();
        ArrayList arrayList = new ArrayList(maxEntriesClaimed);
        for (int i = 0; i < maxEntriesClaimed; i++) {
            Long poll = this.inflightEvents.poll();
            if (poll != null) {
                this.totalInflightProcessed.inc();
                this.totalProcessed.inc();
                arrayList.add(poll);
            }
        }
        ImmutableList of = ImmutableList.of();
        if (arrayList.size() > 0) {
            if (log.isDebugEnabled()) {
                log.debug(this.DB_QUEUE_LOG_ID + "fetchReadyEntriesFromIds, size = " + maxEntriesClaimed + ", ids = " + Joiner.on(", ").join(arrayList));
            }
            of = ImmutableList.copyOf(Collections2.filter(getEntriesFromIds(arrayList), new Predicate<T>() { // from class: com.ning.billing.queue.DBBackedQueue.3
                public boolean apply(T t) {
                    return t.getProcessingState() == PersistentQueueEntryLifecycleState.AVAILABLE;
                }
            }));
        }
        return of;
    }

    private List<T> getEntriesFromIds(List<Long> list) {
        int size = list.size();
        ArrayList arrayList = new ArrayList(list.size());
        int i = 0;
        do {
            List<T> entriesFromIds = this.sqlDao.getEntriesFromIds(list, this.tableName);
            if (entriesFromIds.size() > 0) {
                Iterator<T> it = entriesFromIds.iterator();
                while (it.hasNext()) {
                    list.remove(it.next().getRecordId());
                }
                arrayList.addAll(entriesFromIds);
            }
            if (arrayList.size() < size) {
                try {
                    long pow = INFLIGHT_ENTRIES_INITIAL_POLL_SLEEP_MS * ((int) Math.pow(2.0d, i));
                    Thread.sleep(pow);
                    log.info(this.DB_QUEUE_LOG_ID + "Sleeping " + pow + " for IDS = " + Joiner.on(TimeoutBehaviorConfiguration.DEFAULT_PROPERTY_SEPARATOR).join(list));
                } catch (InterruptedException e) {
                    log.warn(this.DB_QUEUE_LOG_ID + "Thread " + Thread.currentThread() + " got interrupted");
                    Thread.currentThread().interrupt();
                    return arrayList;
                }
            }
            i++;
            if (arrayList.size() >= size) {
                break;
            }
        } while (i < 10);
        if (list.size() > 0) {
            log.warn(this.DB_QUEUE_LOG_ID + " Missing inflight entries from disk, recordIds = [" + Joiner.on(TimeoutBehaviorConfiguration.DEFAULT_PROPERTY_SEPARATOR).join(list) + " ]");
        }
        return arrayList;
    }

    private List<T> fetchReadyEntries(int i) {
        return this.sqlDao.getReadyEntries(this.clock.getUTCNow().toDate(), Hostname.get(), i, this.tableName);
    }

    private List<T> claimEntries(List<T> list) {
        return ImmutableList.copyOf(Collections2.filter(list, new Predicate<T>() { // from class: com.ning.billing.queue.DBBackedQueue.4
            public boolean apply(T t) {
                return DBBackedQueue.this.claimEntry(t);
            }
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean claimEntry(T t) {
        boolean z = this.sqlDao.claimEntry(t.getRecordId(), this.clock.getUTCNow().toDate(), Hostname.get(), this.clock.getUTCNow().plus(this.config.getClaimedTime().getMillis()).toDate(), this.tableName) == 1;
        if (z && log.isDebugEnabled()) {
            log.debug(this.DB_QUEUE_LOG_ID + "Claiming entry " + t.getRecordId());
        }
        return z;
    }

    public QueueSqlDao<T> getSqlDao() {
        return this.sqlDao;
    }

    public boolean isQueueOpenForWrite() {
        return this.isQueueOpenForWrite.get();
    }

    public boolean isQueueOpenForRead() {
        return this.isQueueOpenForRead.get();
    }

    public long getTotalInflightProcessed() {
        return this.totalInflightProcessed.count();
    }

    public long getTotalProcessed() {
        return this.totalProcessed.count();
    }

    public long getTotalInflightWritten() {
        return this.totalInflightWritten.count();
    }

    public long getTotalWritten() {
        return this.totalWritten.count();
    }
}
