/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentMessageFinder
implements AsyncCallbacks.FindEntryCallback {
    private final ManagedCursor cursor;
    private final String subName;
    private final int ledgerCloseTimestampMaxClockSkewMillis;
    private final String topicName;
    private long timestamp = 0L;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private volatile int messageFindInProgress = 0;
    private static final AtomicIntegerFieldUpdater<PersistentMessageFinder> messageFindInProgressUpdater = AtomicIntegerFieldUpdater.newUpdater(PersistentMessageFinder.class, "messageFindInProgress");
    private static final Logger log = LoggerFactory.getLogger(PersistentMessageFinder.class);

    public PersistentMessageFinder(String topicName, ManagedCursor cursor, int ledgerCloseTimestampMaxClockSkewMillis) {
        this.topicName = topicName;
        this.cursor = cursor;
        this.subName = Codec.decode((String)cursor.getName());
        this.ledgerCloseTimestampMaxClockSkewMillis = ledgerCloseTimestampMaxClockSkewMillis;
    }

    public void findMessages(long timestamp, AsyncCallbacks.FindEntryCallback callback) {
        if (messageFindInProgressUpdater.compareAndSet(this, 0, 1)) {
            this.timestamp = timestamp;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Starting message position find at timestamp {}", (Object)this.subName, (Object)timestamp);
            }
            Pair<Position, Position> range = PersistentMessageFinder.getFindPositionRange(this.cursor.getManagedLedger().getLedgersInfo().values(), this.cursor.getManagedLedger().getLastConfirmedEntry(), timestamp, this.ledgerCloseTimestampMaxClockSkewMillis);
            this.cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> {
                try {
                    long entryTimestamp = Commands.getEntryTimestamp((ByteBuf)entry.getDataBuffer());
                    boolean bl = MessageImpl.isEntryPublishedEarlierThan((long)entryTimestamp, (long)timestamp);
                    return bl;
                }
                catch (Exception e) {
                    log.error("[{}][{}] Error deserializing message for message position find", new Object[]{this.topicName, this.subName, e});
                }
                finally {
                    entry.release();
                }
                return false;
            }, (Position)range.getLeft(), (Position)range.getRight(), (AsyncCallbacks.FindEntryCallback)this, (Object)callback, true);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Ignore message position find scheduled task, last find is still running", (Object)this.topicName, (Object)this.subName);
            }
            callback.findEntryFailed((ManagedLedgerException)new ManagedLedgerException.ConcurrentFindCursorPositionException("last find is still running"), Optional.empty(), null);
        }
    }

    @VisibleForTesting
    public static Pair<Position, Position> getFindPositionRange(Iterable<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerInfos, Position lastConfirmedEntry, long targetTimestamp, int ledgerCloseTimestampMaxClockSkewMillis) {
        if (ledgerCloseTimestampMaxClockSkewMillis < 0) {
            return Pair.of(null, null);
        }
        long targetTimestampMin = targetTimestamp - (long)ledgerCloseTimestampMaxClockSkewMillis;
        long targetTimestampMax = targetTimestamp + (long)ledgerCloseTimestampMaxClockSkewMillis;
        Position start = null;
        Position end = null;
        for (MLDataFormats.ManagedLedgerInfo.LedgerInfo info : ledgerInfos) {
            if (!info.hasTimestamp()) {
                return Pair.of(null, null);
            }
            long closeTimestamp = info.getTimestamp();
            if (closeTimestamp == 0L) {
                end = null;
                break;
            }
            if (closeTimestamp <= targetTimestampMin) {
                start = PositionFactory.create((long)info.getLedgerId(), (long)0L);
                continue;
            }
            if (closeTimestamp <= targetTimestampMax) continue;
            end = PositionFactory.create((long)info.getLedgerId(), (long)(info.getEntries() - 1L));
            break;
        }
        return Pair.of(start, end);
    }

    public void findEntryComplete(Position position, Object ctx) {
        Preconditions.checkArgument((boolean)(ctx instanceof AsyncCallbacks.FindEntryCallback));
        AsyncCallbacks.FindEntryCallback callback = (AsyncCallbacks.FindEntryCallback)ctx;
        if (position != null) {
            log.info("[{}][{}] Found position {} closest to provided timestamp {}", new Object[]{this.topicName, this.subName, position, this.timestamp});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}][{}] No position found closest to provided timestamp {}", new Object[]{this.topicName, this.subName, this.timestamp});
        }
        this.messageFindInProgress = 0;
        callback.findEntryComplete(position, null);
    }

    public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
        Preconditions.checkArgument((boolean)(ctx instanceof AsyncCallbacks.FindEntryCallback));
        AsyncCallbacks.FindEntryCallback callback = (AsyncCallbacks.FindEntryCallback)ctx;
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] message position find operation failed for provided timestamp {}", new Object[]{this.topicName, this.subName, this.timestamp, exception});
        }
        this.messageFindInProgress = 0;
        callback.findEntryFailed(exception, failedReadPosition, null);
    }
}

