/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.compaction;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.lang.invoke.LambdaMetafactory;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.compaction.Compactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TwoPhaseCompactor
extends Compactor {
    private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
    private static final int MAX_OUTSTANDING = 500;
    private final Duration phaseOneLoopReadTimeout;
    private final boolean topicCompactionRetainNullKey;

    public TwoPhaseCompactor(ServiceConfiguration conf, PulsarClient pulsar, BookKeeper bk, ScheduledExecutorService scheduler) {
        super(conf, pulsar, bk, scheduler);
        this.phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
        this.topicCompactionRetainNullKey = conf.isTopicCompactionRetainNullKey();
    }

    @Override
    protected CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper bk) {
        return reader.hasMessageAvailableAsync().thenCompose(available -> {
            if (available.booleanValue()) {
                return this.phaseOne(reader).thenCompose(r -> this.phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk));
            }
            log.info("Skip compaction of the empty topic {}", (Object)reader.getTopic());
            return CompletableFuture.completedFuture(-1L);
        });
    }

    private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
        HashMap latestForKey = new HashMap();
        CompletableFuture<PhaseOneResult> loopPromise = new CompletableFuture<PhaseOneResult>();
        ((CompletableFuture)reader.getLastMessageIdAsync().thenAccept(lastMessageId -> {
            log.info("Commencing phase one of compaction for {}, reading to {}", (Object)reader.getTopic(), lastMessageId);
            MessageIdImpl lastImpl = (MessageIdImpl)lastMessageId;
            MessageIdImpl lastEntryMessageId = new MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(), lastImpl.getPartitionIndex());
            this.phaseOneLoop(reader, Optional.empty(), Optional.empty(), (MessageId)lastEntryMessageId, latestForKey, loopPromise);
        })).exceptionally(ex -> {
            loopPromise.completeExceptionally((Throwable)ex);
            return null;
        });
        return loopPromise;
    }

    private void phaseOneLoop(RawReader reader, Optional<MessageId> firstMessageId, Optional<MessageId> toMessageId, MessageId lastMessageId, Map<String, MessageId> latestForKey, CompletableFuture<PhaseOneResult> loopPromise) {
        if (loopPromise.isDone()) {
            return;
        }
        CompletableFuture<RawMessage> future = reader.readNextAsync();
        FutureUtil.addTimeoutHandling(future, (Duration)this.phaseOneLoopReadTimeout, (ScheduledExecutorService)this.scheduler, () -> FutureUtil.createTimeoutException((String)"Timeout", this.getClass(), (String)"phaseOneLoop(...)"));
        ((CompletableFuture)future.thenAcceptAsync(m -> {
            try (RawMessage rawMessage = m;){
                MessageId to;
                MessageId id = m.getMessageId();
                boolean deletedMessage = false;
                boolean replaceMessage = false;
                this.mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
                MessageMetadata metadata = Commands.parseMessageMetadata((ByteBuf)m.getHeadersAndPayload());
                if (Markers.isServerOnlyMarker((MessageMetadata)metadata)) {
                    this.mxBean.addCompactionRemovedEvent(reader.getTopic());
                    deletedMessage = true;
                } else if (RawBatchConverter.isReadableBatch(metadata)) {
                    try {
                        int numMessagesInBatch = metadata.getNumMessagesInBatch();
                        int deleteCnt = 0;
                        for (ImmutableTriple<MessageId, String, Integer> e : this.extractIdsAndKeysAndSizeFromBatch((RawMessage)m)) {
                            if (e == null) continue;
                            if (e.getMiddle() == null) {
                                if (this.topicCompactionRetainNullKey) continue;
                                ++deleteCnt;
                                this.mxBean.addCompactionRemovedEvent(reader.getTopic());
                                continue;
                            }
                            if ((Integer)e.getRight() > 0) {
                                MessageId old = latestForKey.put((String)e.getMiddle(), (MessageId)e.getLeft());
                                if (old == null) continue;
                                this.mxBean.addCompactionRemovedEvent(reader.getTopic());
                                continue;
                            }
                            latestForKey.remove(e.getMiddle());
                            ++deleteCnt;
                            this.mxBean.addCompactionRemovedEvent(reader.getTopic());
                        }
                        if (deleteCnt == numMessagesInBatch) {
                            deletedMessage = true;
                        }
                    }
                    catch (IOException ioe) {
                        log.info("Error decoding batch for message {}. Whole batch will be included in output", (Object)id, (Object)ioe);
                    }
                } else {
                    Pair<String, Integer> keyAndSize = this.extractKeyAndSize((RawMessage)m);
                    if (keyAndSize != null) {
                        if ((Integer)keyAndSize.getRight() > 0) {
                            MessageId old = latestForKey.put((String)keyAndSize.getLeft(), id);
                            replaceMessage = old != null;
                        } else {
                            deletedMessage = true;
                            latestForKey.remove(keyAndSize.getLeft());
                        }
                    } else if (!this.topicCompactionRetainNullKey) {
                        deletedMessage = true;
                    }
                    if (replaceMessage || deletedMessage) {
                        this.mxBean.addCompactionRemovedEvent(reader.getTopic());
                    }
                }
                MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
                MessageId messageId = to = deletedMessage ? (MessageId)toMessageId.orElse(null) : id;
                if (id.compareTo((Object)lastMessageId) == 0) {
                    loopPromise.complete(new PhaseOneResult(first == null ? id : first, to == null ? id : to, lastMessageId, latestForKey));
                } else {
                    this.phaseOneLoop(reader, Optional.ofNullable(first), Optional.ofNullable(to), lastMessageId, latestForKey, loopPromise);
                }
            }
        }, (Executor)this.scheduler)).exceptionally(ex -> {
            loopPromise.completeExceptionally((Throwable)ex);
            return null;
        });
    }

    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId, Map<String, MessageId> latestForKey, BookKeeper bk) {
        Map metadata = LedgerMetadataUtils.buildMetadataForCompactedLedger((String)reader.getTopic(), (byte[])to.toByteArray());
        return this.createLedger(bk, metadata).thenCompose(ledger -> {
            log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}", new Object[]{reader.getTopic(), from, to, latestForKey.size(), ledger.getId()});
            return this.phaseTwoSeekThenLoop(reader, from, to, lastReadId, latestForKey, bk, (LedgerHandle)ledger);
        });
    }

    private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId from, MessageId to, MessageId lastReadId, Map<String, MessageId> latestForKey, BookKeeper bk, LedgerHandle ledger) {
        CompletableFuture<Long> promise = new CompletableFuture<Long>();
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)reader.seekAsync(from).thenCompose(v -> {
            Semaphore outstanding = new Semaphore(500);
            CompletableFuture<Void> loopPromise = new CompletableFuture<Void>();
            this.phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise, MessageId.earliest);
            return loopPromise;
        })).thenCompose(v -> this.closeLedger(ledger))).thenCompose(v -> reader.acknowledgeCumulativeAsync(lastReadId, Map.of("CompactedTopicLedger", ledger.getId())))).whenComplete((res, exception) -> {
            if (exception != null) {
                this.deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
                    if (exception2 != null) {
                        log.warn("Cleanup of ledger {} for failed", (Object)ledger, exception2);
                    }
                    promise.completeExceptionally((Throwable)exception);
                });
            } else {
                promise.complete(ledger.getId());
            }
        });
        return promise;
    }

    private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> latestForKey, LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise, MessageId lastCompactedMessageId) {
        if (promise.isDone()) {
            return;
        }
        ((CompletableFuture)reader.readNextAsync().thenAcceptAsync(arg_0 -> this.lambda$phaseTwoLoop$16(promise, lastCompactedMessageId, reader, to, latestForKey, lh, outstanding, arg_0), (Executor)this.scheduler)).exceptionally(ex -> {
            promise.completeExceptionally((Throwable)ex);
            return null;
        });
    }

    protected CompletableFuture<LedgerHandle> createLedger(BookKeeper bk, Map<String, byte[]> metadata) {
        CompletableFuture<LedgerHandle> bkf = new CompletableFuture<LedgerHandle>();
        try {
            bk.asyncCreateLedger(this.conf.getManagedLedgerDefaultEnsembleSize(), this.conf.getManagedLedgerDefaultWriteQuorum(), this.conf.getManagedLedgerDefaultAckQuorum(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD, (rc, ledger, ctx) -> {
                if (rc != 0) {
                    bkf.completeExceptionally(BKException.create((int)rc));
                } else {
                    bkf.complete(ledger);
                }
            }, null, metadata);
        }
        catch (Throwable t) {
            log.error("Encountered unexpected error when creating compaction ledger", t);
            return FutureUtil.failedFuture((Throwable)t);
        }
        return bkf;
    }

    protected CompletableFuture<Void> deleteLedger(BookKeeper bk, LedgerHandle lh) {
        CompletableFuture<Void> bkf = new CompletableFuture<Void>();
        try {
            bk.asyncDeleteLedger(lh.getId(), (rc, ctx) -> {
                if (rc != 0) {
                    bkf.completeExceptionally(BKException.create((int)rc));
                } else {
                    bkf.complete(null);
                }
            }, null);
        }
        catch (Throwable t) {
            return FutureUtil.failedFuture((Throwable)t);
        }
        return bkf;
    }

    protected CompletableFuture<Void> closeLedger(LedgerHandle lh) {
        CompletableFuture<Void> bkf = new CompletableFuture<Void>();
        try {
            lh.asyncClose((rc, ledger, ctx) -> {
                if (rc != 0) {
                    bkf.completeExceptionally(BKException.create((int)rc));
                } else {
                    bkf.complete(null);
                }
            }, null);
        }
        catch (Throwable t) {
            return FutureUtil.failedFuture((Throwable)t);
        }
        return bkf;
    }

    private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh, RawMessage m, String topic) {
        CompletableFuture<Void> bkf = new CompletableFuture<Void>();
        ByteBuf serialized = m.serialize();
        try {
            this.mxBean.addCompactionWriteOp(topic, m.getHeadersAndPayload().readableBytes());
            long start = System.nanoTime();
            lh.asyncAddEntry(serialized, (rc, ledger, eid, ctx) -> {
                this.mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS);
                if (rc != 0) {
                    bkf.completeExceptionally(BKException.create((int)rc));
                } else {
                    bkf.complete(null);
                }
            }, null);
        }
        catch (Throwable t) {
            return FutureUtil.failedFuture((Throwable)t);
        }
        return bkf;
    }

    protected Pair<String, Integer> extractKeyAndSize(RawMessage m) {
        ByteBuf headersAndPayload = m.getHeadersAndPayload();
        MessageMetadata msgMetadata = Commands.parseMessageMetadata((ByteBuf)headersAndPayload);
        if (msgMetadata.hasPartitionKey()) {
            int size = headersAndPayload.readableBytes();
            if (msgMetadata.hasUncompressedSize()) {
                size = msgMetadata.getUncompressedSize();
            }
            return Pair.of((Object)msgMetadata.getPartitionKey(), (Object)size);
        }
        return null;
    }

    protected List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKeysAndSizeFromBatch(RawMessage msg) throws IOException {
        return RawBatchConverter.extractIdsAndKeysAndSize(msg);
    }

    protected Optional<RawMessage> rebatchMessage(String topic, RawMessage msg, BiPredicate<String, MessageId> filter, boolean retainNullKey) throws IOException {
        if (log.isDebugEnabled()) {
            log.debug("Rebatching message {} for topic {}", (Object)msg.getMessageId(), (Object)topic);
        }
        return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey);
    }

    public long getPhaseOneLoopReadTimeoutInSeconds() {
        return this.phaseOneLoopReadTimeout.getSeconds();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     * Could not resolve type clashes
     */
    private /* synthetic */ void lambda$phaseTwoLoop$16(CompletableFuture promise, MessageId lastCompactedMessageId, RawReader reader, MessageId to, Map latestForKey, LedgerHandle lh, Semaphore outstanding, RawMessage m) {
        if (promise.isDone()) {
            m.close();
            return;
        }
        if (m.getMessageId().compareTo((Object)lastCompactedMessageId) <= 0) {
            m.close();
            this.phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, lastCompactedMessageId);
            return;
        }
        try {
            id = m.getMessageId();
            messageToAdd /* !! */  = Optional.empty();
            this.mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
            metadata = Commands.parseMessageMetadata((ByteBuf)m.getHeadersAndPayload());
            if (Markers.isServerOnlyMarker((MessageMetadata)metadata)) {
                messageToAdd /* !! */  = Optional.empty();
            } else if (RawBatchConverter.isReadableBatch(metadata)) {
                try {
                    messageToAdd /* !! */  = this.rebatchMessage(reader.getTopic(), m, (BiPredicate<String, MessageId>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;Ljava/lang/Object;)Z, lambda$phaseTwoLoop$13(java.util.Map java.lang.String org.apache.pulsar.client.api.MessageId ), (Ljava/lang/String;Lorg/apache/pulsar/client/api/MessageId;)Z)((Map)latestForKey), this.topicCompactionRetainNullKey);
                }
                catch (IOException ioe) {
                    TwoPhaseCompactor.log.info("Error decoding batch for message {}. Whole batch will be included in output", (Object)id, (Object)ioe);
                    messageToAdd /* !! */  = Optional.of(m);
                }
            } else {
                keyAndSize = this.extractKeyAndSize(m);
                if (keyAndSize == null) {
                    messageToAdd /* !! */  = this.topicCompactionRetainNullKey != false ? Optional.of(m) : Optional.empty();
                } else {
                    msg = (MessageId)latestForKey.get(keyAndSize.getLeft());
                    if (msg != null && msg.equals(id)) {
                        if ((Integer)keyAndSize.getRight() <= 0) {
                            promise.completeExceptionally(new IllegalArgumentException("Compaction phase found empty record from sorted key-map"));
                        }
                        messageToAdd /* !! */  = Optional.of(m);
                    }
                }
            }
            if (messageToAdd /* !! */ .isPresent()) {
                message = (RawMessage)messageToAdd /* !! */ .get();
                try {
                    outstanding.acquire();
                    addFuture = this.addToCompactedLedger(lh, message, reader.getTopic()).whenComplete((BiConsumer)(BiConsumer<Void, Throwable>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;Ljava/lang/Object;)V, lambda$phaseTwoLoop$14(java.util.concurrent.Semaphore java.util.concurrent.CompletableFuture java.lang.Void java.lang.Throwable ), (Ljava/lang/Void;Ljava/lang/Throwable;)V)((Semaphore)outstanding, (CompletableFuture)promise));
                    if (!to.equals(id)) ** GOTO lbl64
                    outstanding.acquire(500);
                    addFuture.whenComplete((BiConsumer<Void, Throwable>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;Ljava/lang/Object;)V, lambda$phaseTwoLoop$15(java.util.concurrent.CompletableFuture java.lang.Void java.lang.Throwable ), (Ljava/lang/Void;Ljava/lang/Throwable;)V)((CompletableFuture)promise));
                    return;
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    promise.completeExceptionally(ie);
                }
                finally {
                    if (message != m) {
                        message.close();
                    }
                }
            } else if (to.equals(id)) {
                try {
                    outstanding.acquire(500);
                    promise.complete(null);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    promise.completeExceptionally(e);
                }
                return;
            }
lbl64:
            // 4 sources

            this.phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, m.getMessageId());
        }
        finally {
            m.close();
        }
    }

    private static /* synthetic */ void lambda$phaseTwoLoop$15(CompletableFuture promise, Void res, Throwable exception2) {
        if (exception2 == null) {
            promise.complete(null);
        }
    }

    private static /* synthetic */ void lambda$phaseTwoLoop$14(Semaphore outstanding, CompletableFuture promise, Void res, Throwable exception2) {
        outstanding.release();
        if (exception2 != null) {
            promise.completeExceptionally(exception2);
        }
    }

    private static /* synthetic */ boolean lambda$phaseTwoLoop$13(Map latestForKey, String key, MessageId subid) {
        return subid.equals(latestForKey.get(key));
    }

    private static class PhaseOneResult {
        final MessageId from;
        final MessageId to;
        final MessageId lastReadId;
        final Map<String, MessageId> latestForKey;

        PhaseOneResult(MessageId from, MessageId to, MessageId lastReadId, Map<String, MessageId> latestForKey) {
            this.from = from;
            this.to = to;
            this.lastReadId = lastReadId;
            this.latestForKey = latestForKey;
        }
    }
}

