/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pulsar.functions.runtime.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie.AbstractLogCompactor;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie.CompactableLedgerStorage;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie.EntryLogMetadata;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie.storage.CompactionEntryLog;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie.storage.EntryLogScanner;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.ServerConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionalEntryLogCompactor
extends AbstractLogCompactor {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionalEntryLogCompactor.class);
    final EntryLogger entryLogger;
    final CompactableLedgerStorage ledgerStorage;
    final List<EntryLocation> offsets = new ArrayList<EntryLocation>();
    public static final String COMPACTING_SUFFIX = ".log.compacting";
    public static final String COMPACTED_SUFFIX = ".compacted";

    public TransactionalEntryLogCompactor(ServerConfiguration conf, EntryLogger entryLogger, CompactableLedgerStorage ledgerStorage, AbstractLogCompactor.LogRemovalListener logRemover) {
        super(conf, logRemover);
        this.entryLogger = entryLogger;
        this.ledgerStorage = ledgerStorage;
    }

    @Override
    public void cleanUpAndRecover() {
        for (CompactionEntryLog log : this.entryLogger.incompleteCompactionLogs()) {
            LOG.info("Found compacted log file {} has partially flushed index, recovering index.", (Object)log);
            UpdateIndexPhase updateIndex = new UpdateIndexPhase(log, true);
            updateIndex.run();
        }
    }

    @Override
    public boolean compact(EntryLogMetadata metadata) {
        if (metadata != null) {
            CompactionEntryLog compactionLog;
            LOG.info("Compacting entry log {} with usage {}.", (Object)metadata.getEntryLogId(), (Object)metadata.getUsage());
            try {
                compactionLog = this.entryLogger.newCompactionLog(metadata.getEntryLogId());
            }
            catch (IOException ioe) {
                LOG.error("Exception creating new compaction entry log", (Throwable)ioe);
                return false;
            }
            ScanEntryLogPhase scanEntryLog = new ScanEntryLogPhase(metadata, compactionLog);
            if (!scanEntryLog.run()) {
                LOG.info("Compaction for entry log {} end in ScanEntryLogPhase.", (Object)metadata.getEntryLogId());
                return false;
            }
            FlushCompactionLogPhase flushCompactionLog = new FlushCompactionLogPhase(compactionLog);
            if (!flushCompactionLog.run()) {
                LOG.info("Compaction for entry log {} end in FlushCompactionLogPhase.", (Object)metadata.getEntryLogId());
                return false;
            }
            UpdateIndexPhase updateIndex = new UpdateIndexPhase(compactionLog);
            if (!updateIndex.run()) {
                LOG.info("Compaction for entry log {} end in UpdateIndexPhase.", (Object)metadata.getEntryLogId());
                return false;
            }
            LOG.info("Compacted entry log : {}.", (Object)metadata.getEntryLogId());
            return true;
        }
        return false;
    }

    class UpdateIndexPhase
    extends CompactionPhase {
        final CompactionEntryLog compactionLog;
        private final boolean isInRecovery;

        public UpdateIndexPhase(CompactionEntryLog compactionLog) {
            this(compactionLog, false);
        }

        public UpdateIndexPhase(CompactionEntryLog compactionLog, boolean isInRecovery) {
            super("UpdateIndexPhase");
            this.compactionLog = compactionLog;
            this.isInRecovery = isInRecovery;
        }

        @Override
        void start() throws IOException {
            this.compactionLog.makeAvailable();
            if (this.isInRecovery) {
                this.recoverEntryLocations(this.compactionLog);
            }
            if (!TransactionalEntryLogCompactor.this.offsets.isEmpty()) {
                TransactionalEntryLogCompactor.this.ledgerStorage.updateEntriesLocations(TransactionalEntryLogCompactor.this.offsets);
                TransactionalEntryLogCompactor.this.ledgerStorage.flushEntriesLocationsIndex();
            }
        }

        @Override
        boolean complete() {
            TransactionalEntryLogCompactor.this.offsets.clear();
            this.compactionLog.finalizeAndCleanup();
            TransactionalEntryLogCompactor.this.logRemovalListener.removeEntryLog(this.compactionLog.getSrcLogId());
            return true;
        }

        @Override
        void abort() {
            TransactionalEntryLogCompactor.this.offsets.clear();
        }

        private void recoverEntryLocations(final CompactionEntryLog compactionLog) throws IOException {
            compactionLog.scan(new EntryLogScanner(){

                @Override
                public boolean accept(long ledgerId) {
                    return true;
                }

                @Override
                public void process(long ledgerId, long offset, ByteBuf entry) throws IOException {
                    long lid = entry.getLong(entry.readerIndex());
                    long entryId = entry.getLong(entry.readerIndex() + 8);
                    if (lid != ledgerId || entryId < -1L) {
                        LOG.warn("Scanning expected ledgerId {}, but found invalid entry with ledgerId {} entryId {} at offset {}", new Object[]{ledgerId, lid, entryId, offset});
                        throw new IOException("Invalid entry found @ offset " + offset);
                    }
                    long location = compactionLog.getDstLogId() << 32 | offset + 4L;
                    TransactionalEntryLogCompactor.this.offsets.add(new EntryLocation(lid, entryId, location));
                }
            });
            LOG.info("Recovered {} entry locations from compacted log {}", (Object)TransactionalEntryLogCompactor.this.offsets.size(), (Object)compactionLog.getDstLogId());
        }
    }

    class FlushCompactionLogPhase
    extends CompactionPhase {
        final CompactionEntryLog compactionLog;

        FlushCompactionLogPhase(CompactionEntryLog compactionLog) {
            super("FlushCompactionLogPhase");
            this.compactionLog = compactionLog;
        }

        @Override
        void start() throws IOException {
            this.compactionLog.flush();
        }

        @Override
        boolean complete() throws IOException {
            try {
                this.compactionLog.markCompacted();
                return true;
            }
            catch (IOException ioe) {
                LOG.warn("Error marking compaction as done", (Throwable)ioe);
                return false;
            }
        }

        @Override
        void abort() {
            TransactionalEntryLogCompactor.this.offsets.clear();
            this.compactionLog.abort();
        }
    }

    class ScanEntryLogPhase
    extends CompactionPhase {
        private final EntryLogMetadata metadata;
        private final CompactionEntryLog compactionLog;

        ScanEntryLogPhase(EntryLogMetadata metadata, CompactionEntryLog compactionLog) {
            super("ScanEntryLogPhase");
            this.metadata = metadata;
            this.compactionLog = compactionLog;
        }

        @Override
        void start() throws IOException {
            TransactionalEntryLogCompactor.this.entryLogger.scanEntryLog(this.metadata.getEntryLogId(), new EntryLogScanner(){

                @Override
                public boolean accept(long ledgerId) {
                    return ScanEntryLogPhase.this.metadata.containsLedger(ledgerId);
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void process(long ledgerId, long offset, ByteBuf entry) throws IOException {
                    TransactionalEntryLogCompactor.this.throttler.acquire(entry.readableBytes());
                    TransactionalEntryLogCompactor transactionalEntryLogCompactor = TransactionalEntryLogCompactor.this;
                    synchronized (transactionalEntryLogCompactor) {
                        long lid = entry.getLong(entry.readerIndex());
                        long entryId = entry.getLong(entry.readerIndex() + 8);
                        if (lid != ledgerId || entryId < -1L) {
                            LOG.warn("Scanning expected ledgerId {}, but found invalid entry with ledgerId {} entryId {} at offset {}", new Object[]{ledgerId, lid, entryId, offset});
                            throw new IOException("Invalid entry found @ offset " + offset);
                        }
                        long newOffset = ScanEntryLogPhase.this.compactionLog.addEntry(ledgerId, entry);
                        TransactionalEntryLogCompactor.this.offsets.add(new EntryLocation(ledgerId, entryId, newOffset));
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Compact add entry : lid = {}, eid = {}, offset = {}", new Object[]{ledgerId, entryId, newOffset});
                        }
                    }
                }
            });
        }

        @Override
        boolean complete() {
            if (TransactionalEntryLogCompactor.this.offsets.isEmpty()) {
                LOG.info("No valid entry is found in entry log after scan, removing entry log now.");
                TransactionalEntryLogCompactor.this.logRemovalListener.removeEntryLog(this.metadata.getEntryLogId());
                this.compactionLog.abort();
                return false;
            }
            return true;
        }

        @Override
        void abort() {
            TransactionalEntryLogCompactor.this.offsets.clear();
            this.compactionLog.abort();
        }
    }

    static abstract class CompactionPhase {
        private String phaseName = "";

        CompactionPhase(String phaseName) {
            this.phaseName = phaseName;
        }

        boolean run() {
            try {
                this.start();
                return this.complete();
            }
            catch (IOException e) {
                LOG.error("Encounter exception in compaction phase {}. Abort current compaction.", (Object)this.phaseName, (Object)e);
                this.abort();
                return false;
            }
        }

        abstract void start() throws IOException;

        abstract boolean complete() throws IOException;

        abstract void abort();
    }
}

