/*
 * Decompiled with CFR 0.152.
 */
package com.github.jcustenborder.kafka.connect.utils.data;

import com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDeque;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SourceRecordDequeImpl
extends ConcurrentLinkedDeque<SourceRecord>
implements SourceRecordDeque {
    private static final Logger log = LoggerFactory.getLogger(SourceRecordDequeImpl.class);
    private final Time time;
    private final int maximumCapacity;
    private final int batchSize;
    private final int emptyWaitMs;
    private final int maximumCapacityWaitMs;
    private final int maximumCapacityTimeoutMs;
    private final RateLimiter writeRateLimit;

    SourceRecordDequeImpl(Time time, int maximumCapacity, int batchSize, int emptyWaitMs, int maximumCapacityWaitMs, int maximumCapacityTimeoutMs, RateLimiter writeRateLimit) {
        this.time = time;
        this.batchSize = batchSize;
        this.maximumCapacity = maximumCapacity;
        this.emptyWaitMs = emptyWaitMs;
        this.maximumCapacityWaitMs = maximumCapacityWaitMs;
        this.maximumCapacityTimeoutMs = maximumCapacityTimeoutMs;
        this.writeRateLimit = writeRateLimit;
    }

    private void waitForCapacity() {
        this.waitForCapacity(1);
    }

    private void waitForCapacity(int size) {
        if (null != this.writeRateLimit) {
            this.writeRateLimit.acquire(size);
        }
        if (this.size() >= this.maximumCapacity) {
            long start;
            long elapsed = start = this.time.milliseconds();
            while (this.size() >= this.maximumCapacity) {
                if (elapsed > (long)this.maximumCapacityTimeoutMs) {
                    throw new TimeoutException(String.format("Timeout of %s ms exceeded while waiting for Deque to be drained below %s", this.maximumCapacityTimeoutMs, this.maximumCapacity));
                }
                this.time.sleep((long)this.maximumCapacityWaitMs);
                elapsed = this.time.milliseconds() - start;
            }
        }
    }

    @Override
    public boolean add(SourceRecord sourceRecord) {
        this.waitForCapacity();
        return super.add(sourceRecord);
    }

    @Override
    public boolean addAll(Collection<? extends SourceRecord> c) {
        this.waitForCapacity(c.size());
        return super.addAll(c);
    }

    @Override
    public void addFirst(SourceRecord sourceRecord) {
        this.waitForCapacity();
        super.addFirst(sourceRecord);
    }

    @Override
    public void addLast(SourceRecord sourceRecord) {
        this.waitForCapacity();
        super.addLast(sourceRecord);
    }

    @Override
    public List<SourceRecord> newList() {
        return new ArrayList<SourceRecord>(this.batchSize);
    }

    @Override
    public List<SourceRecord> drain() {
        List<SourceRecord> result = this.newList();
        this.drain(result);
        return result;
    }

    @Override
    public boolean drain(List<SourceRecord> records) {
        return this.drain(records, this.emptyWaitMs);
    }

    @Override
    public boolean drain(List<SourceRecord> records, int emptyWaitMs) {
        Preconditions.checkNotNull(records, (Object)"records cannot be null");
        Preconditions.checkArgument((emptyWaitMs >= 0 ? 1 : 0) != 0, (Object)"emptyWaitMs should be greater than or equal to 0.");
        log.trace("drain() - Determining size for this run. batchSize={}, records.size()={}", (Object)this.batchSize, (Object)records.size());
        int count = Math.min(this.batchSize, this.size());
        log.trace("drain() - Attempting to draining {} record(s).", (Object)count);
        for (int i = 0; i < count; ++i) {
            SourceRecord record = (SourceRecord)this.poll();
            if (null == record) {
                log.trace("drain() - Poll returned null. exiting");
                break;
            }
            records.add(record);
        }
        if (records.isEmpty() && emptyWaitMs > 0) {
            log.trace("drain() - Found no records, sleeping {} ms.", (Object)emptyWaitMs);
            this.time.sleep((long)emptyWaitMs);
        }
        return !records.isEmpty();
    }
}

