/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.replication;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Stopwatch;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Lists;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.replication.AuditorStats;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.replication.AuditorTask;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.replication.BookieLedgerIndexer;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.replication.ReplicationEnableCb;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.replication.ReplicationException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AuditorBookieCheckTask
extends AuditorTask {
    private static final Logger LOG = LoggerFactory.getLogger(AuditorBookieCheckTask.class);
    private final BookieLedgerIndexer bookieLedgerIndexer;
    private final BiConsumer<Void, Throwable> submitCheckTask;

    public AuditorBookieCheckTask(ServerConfiguration conf, AuditorStats auditorStats, BookKeeperAdmin admin, LedgerManager ledgerManager, LedgerUnderreplicationManager ledgerUnderreplicationManager, AuditorTask.ShutdownTaskHandler shutdownTaskHandler, BookieLedgerIndexer bookieLedgerIndexer, BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask, BiConsumer<Void, Throwable> submitCheckTask) {
        super(conf, auditorStats, admin, ledgerManager, ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
        this.bookieLedgerIndexer = bookieLedgerIndexer;
        this.submitCheckTask = submitCheckTask;
    }

    @Override
    protected void runTask() {
        if (!this.hasBookieCheckTask()) {
            this.startAudit(true);
        } else {
            LOG.info("Audit already scheduled; skipping periodic bookie check");
        }
    }

    @Override
    public void shutdown() {
    }

    void startAudit(boolean shutDownTask) {
        try {
            this.auditBookies();
            shutDownTask = false;
        }
        catch (BKException bke) {
            LOG.error("Exception getting bookie list", (Throwable)bke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            LOG.error("Interrupted while watching available bookies ", (Throwable)ie);
        }
        catch (ReplicationException.BKAuditException bke) {
            LOG.error("Exception while watching available bookies", (Throwable)bke);
        }
        if (shutDownTask) {
            this.submitShutdownTask();
        }
    }

    void auditBookies() throws ReplicationException.BKAuditException, InterruptedException, BKException {
        try {
            this.waitIfLedgerReplicationDisabled();
        }
        catch (ReplicationException.NonRecoverableReplicationException nre) {
            LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
            this.submitShutdownTask();
            return;
        }
        catch (ReplicationException.UnavailableException ue) {
            LOG.error("Underreplication unavailable, skipping audit.Will retry after a period");
            return;
        }
        LOG.info("Starting auditBookies");
        Stopwatch stopwatch = Stopwatch.createStarted();
        Map<String, Set<Long>> ledgerDetails = this.generateBookie2LedgersIndex();
        try {
            if (!this.isLedgerReplicationEnabled()) {
                this.submitCheckTask.accept(null, null);
                return;
            }
        }
        catch (ReplicationException.UnavailableException ue) {
            LOG.error("Underreplication unavailable, skipping audit.Will retry after a period");
            return;
        }
        List<String> availableBookies = this.getAvailableBookies();
        Set<String> knownBookies = ledgerDetails.keySet();
        Collection<String> lostBookies = CollectionUtils.subtract(knownBookies, availableBookies);
        this.auditorStats.getBookieToLedgersMapCreationTime().registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
        if (lostBookies.size() > 0) {
            try {
                FutureUtils.result(this.handleLostBookiesAsync(lostBookies, ledgerDetails), ReplicationException.EXCEPTION_HANDLER);
            }
            catch (ReplicationException e) {
                throw new ReplicationException.BKAuditException(e.getMessage(), e.getCause());
            }
            this.auditorStats.getURLPublishTimeForLostBookies().registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
        }
        LOG.info("Completed auditBookies");
        this.auditorStats.getAuditBookiesTime().registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
    }

    private Map<String, Set<Long>> generateBookie2LedgersIndex() throws ReplicationException.BKAuditException {
        return this.bookieLedgerIndexer.getBookieToLedgerIndex();
    }

    private CompletableFuture<?> handleLostBookiesAsync(Collection<String> lostBookies, Map<String, Set<Long>> ledgerDetails) {
        LOG.info("Following are the failed bookies: {}, and searching its ledgers for re-replication", lostBookies);
        return FutureUtils.processList(Lists.newArrayList(lostBookies), bookieIP -> this.publishSuspectedLedgersAsync(Lists.newArrayList(bookieIP), (Set)ledgerDetails.get(bookieIP)), null);
    }

    protected void waitIfLedgerReplicationDisabled() throws ReplicationException.UnavailableException, InterruptedException {
        if (!this.isLedgerReplicationEnabled()) {
            LOG.info("LedgerReplication is disabled externally through Zookeeper, since DISABLE_NODE ZNode is created, so waiting untill it is enabled");
            ReplicationEnableCb cb = new ReplicationEnableCb();
            this.ledgerUnderreplicationManager.notifyLedgerReplicationEnabled(cb);
            cb.await();
        }
    }
}

