/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import lombok.Generated;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionBound;
import org.apache.bookkeeper.mledger.ScanOutcome;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.OpReadEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class OpScan
implements AsyncCallbacks.ReadEntriesCallback {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(OpScan.class);
    private final ManagedCursorImpl cursor;
    private final ManagedLedgerImpl ledger;
    private final AsyncCallbacks.ScanCallback callback;
    private final Predicate<Entry> condition;
    private final Object ctx;
    private final AtomicLong remainingEntries = new AtomicLong();
    private final long timeOutMs;
    private final long startTime = System.currentTimeMillis();
    private final int batchSize;
    Position searchPosition;
    Position lastSeenPosition = null;

    public OpScan(ManagedCursorImpl cursor, int batchSize, Position startPosition, Predicate<Entry> condition, AsyncCallbacks.ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
        this.batchSize = batchSize;
        if (batchSize <= 0) {
            throw new IllegalArgumentException("batchSize " + batchSize);
        }
        this.cursor = Objects.requireNonNull(cursor);
        this.ledger = cursor.ledger;
        this.callback = callback;
        this.condition = condition;
        this.ctx = ctx;
        this.searchPosition = startPosition;
        this.remainingEntries.set(maxEntries);
        this.timeOutMs = timeOutMs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void readEntriesComplete(List<Entry> entries, Object ctx) {
        try {
            Position lastPositionForBatch;
            this.lastSeenPosition = lastPositionForBatch = entries.get(entries.size() - 1).getPosition();
            List<Entry> entriesFiltered = this.cursor.filterReadEntries(entries);
            int skippedEntries = entries.size() - entriesFiltered.size();
            this.remainingEntries.addAndGet(-skippedEntries);
            if (!entriesFiltered.isEmpty()) {
                for (Entry entry : entriesFiltered) {
                    if (this.remainingEntries.decrementAndGet() <= 0L) {
                        log.warn("[{}] Scan abort after reading too many entries", (Object)this.cursor);
                        this.callback.scanComplete(this.lastSeenPosition, ScanOutcome.ABORTED, this.ctx);
                        return;
                    }
                    if (this.condition.test(entry)) continue;
                    log.warn("[{}] Scan abort due to user code", (Object)this.cursor);
                    this.callback.scanComplete(this.lastSeenPosition, ScanOutcome.USER_INTERRUPTED, this.ctx);
                    return;
                }
            }
            this.searchPosition = this.ledger.getPositionAfterN(lastPositionForBatch, 1L, PositionBound.startExcluded);
            if (log.isDebugEnabled()) {
                log.debug("readEntryComplete {} at {} next is {}", (Object)lastPositionForBatch, (Object)this.searchPosition);
            }
            if (this.searchPosition.compareTo(lastPositionForBatch) == 0) {
                this.callback.scanComplete(this.lastSeenPosition, ScanOutcome.COMPLETED, this.ctx);
                return;
            }
        }
        catch (Throwable t) {
            log.error("Unhandled error", t);
            this.callback.scanFailed(ManagedLedgerException.getManagedLedgerException(t), Optional.ofNullable(this.lastSeenPosition), this.ctx);
            return;
        }
        finally {
            entries.forEach(Entry::release);
        }
        this.find();
    }

    @Override
    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
        this.callback.scanFailed(exception, Optional.ofNullable(this.searchPosition), this.ctx);
    }

    public void find() {
        if (this.remainingEntries.get() <= 0L) {
            log.warn("[{}] Scan abort after reading too many entries", (Object)this.cursor);
            this.callback.scanComplete(this.lastSeenPosition, ScanOutcome.ABORTED, this.ctx);
            return;
        }
        if (System.currentTimeMillis() - this.startTime > this.timeOutMs) {
            log.warn("[{}] Scan abort after hitting the deadline", (Object)this.cursor);
            this.callback.scanComplete(this.lastSeenPosition, ScanOutcome.ABORTED, this.ctx);
            return;
        }
        if (this.cursor.hasMoreEntries(this.searchPosition)) {
            OpReadEntry opReadEntry = OpReadEntry.create(this.cursor, this.searchPosition, this.batchSize, this, this.ctx, null, null, false);
            this.ledger.asyncReadEntries(opReadEntry);
        } else {
            this.callback.scanComplete(this.lastSeenPosition, ScanOutcome.COMPLETED, this.ctx);
        }
    }
}

