/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.transaction.cleanup;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.events.transaction.CleanupFailedEvent;
import com.couchbase.client.core.error.transaction.internal.ThreadStopRequestedException;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.core.transaction.cleanup.CleanerFactory;
import com.couchbase.client.core.transaction.cleanup.CleanupRequest;
import com.couchbase.client.core.transaction.cleanup.LostCleanupDistributed;
import com.couchbase.client.core.transaction.cleanup.TransactionsCleaner;
import com.couchbase.client.core.transaction.config.CoreTransactionsConfig;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

@Stability.Internal
public class CoreTransactionsCleanup {
    public static final String CATEGORY = "com.couchbase.transactions.cleanup";
    public static final String CATEGORY_STATS = "com.couchbase.transactions.cleanup.stats";
    public static final String CATEGORY_CLIENT_RECORD = "com.couchbase.transactions.clientrecord";
    public static final String LOST_CATEGORY = "com.couchbase.transactions.cleanup.lost";
    public static final String REGULAR_CATEGORY = "com.couchbase.transactions.cleanup.regular";
    private static final Logger LOGGER = LoggerFactory.getLogger((String)"com.couchbase.transactions.cleanup");
    private static final Logger LOGGER_REGULAR = LoggerFactory.getLogger((String)"com.couchbase.transactions.cleanup.regular");
    private final Core core;
    private final CoreTransactionsConfig config;
    private final DelayQueue<CleanupRequest> cleanupQueue = new DelayQueue();
    private volatile boolean stop = false;
    private final CountDownLatch stopLatch;
    @Nullable
    private final LostCleanupDistributed lostCleanup;
    private final CleanerFactory cleanerFactory;

    public CoreTransactionsCleanup(Core core, CoreTransactionsConfig config) {
        this.core = Objects.requireNonNull(core);
        this.config = Objects.requireNonNull(config);
        this.lostCleanup = config.cleanupConfig().runLostAttemptsCleanupThread() ? new LostCleanupDistributed(core, config, this::getCleaner) : null;
        int countdown = 0;
        this.cleanerFactory = config.cleanerFactory();
        if (config.cleanupConfig().runRegularAttemptsCleanupThread()) {
            this.runRegularAttemptsCleanupThread();
            ++countdown;
        }
        this.stopLatch = new CountDownLatch(countdown);
        config.metadataCollection().ifPresent(mc -> {
            core.openBucket(mc.bucket());
            this.addToCleanupSet((CollectionIdentifier)mc);
        });
        config.cleanupConfig().cleanupSet().forEach(coll -> {
            core.openBucket(coll.bucket());
            this.addToCleanupSet((CollectionIdentifier)coll);
        });
    }

    public void addToCleanupSet(CollectionIdentifier coll) {
        if (this.lostCleanup != null) {
            this.lostCleanup.addToCleanupSet(coll);
        }
    }

    public Set<CollectionIdentifier> cleanupSet() {
        return this.lostCleanup != null ? this.lostCleanup.cleanupSet() : new HashSet();
    }

    Mono<Void> stopBackgroundProcesses(Duration timeout) {
        return Mono.defer(() -> {
            this.stop = true;
            LOGGER.info("Waiting for {} regular background threads to exit", (Object)this.stopLatch.getCount());
            try {
                if (!this.stopLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
                    LOGGER.info("Background threads did not stop in expected time {}", (Object)timeout);
                }
            }
            catch (InterruptedException e) {
                LOGGER.warn("Interrupted while waiting for background threads " + e);
            }
            if (this.lostCleanup != null) {
                return this.lostCleanup.shutdown(timeout);
            }
            return Mono.empty();
        }).doOnTerminate(() -> LOGGER.info("Background threads have exitted"));
    }

    private void runRegularAttemptsCleanupThread() {
        Objects.requireNonNull(LOGGER);
        LOGGER_REGULAR.debug("Starting background cleanup thread to find transactions from this client");
        Reactor.safeInterval(Duration.ofMillis(100L), this.core.context().environment().transactionsSchedulers().schedulerCleanup()).flatMap(v -> {
            if (this.stop) {
                LOGGER_REGULAR.info("Stopping background cleanup thread for transactions from this client");
                this.stopLatch.countDown();
                return Mono.error((Throwable)new ThreadStopRequestedException());
            }
            return Mono.just((Object)v);
        }).flatMap(v -> {
            ArrayList<CleanupRequest> requests = new ArrayList<CleanupRequest>();
            CleanupRequest head = null;
            do {
                if ((head = (CleanupRequest)this.cleanupQueue.poll()) == null) continue;
                requests.add(head);
            } while (head != null);
            return Flux.fromIterable(requests).publishOn(this.core.context().environment().transactionsSchedulers().schedulerCleanup());
        }).flatMap(req -> {
            TransactionsCleaner cleaner = this.getCleaner();
            return cleaner.performCleanup((CleanupRequest)req, true, null).doOnSuccess(result -> LOGGER_REGULAR.debug("result of cleanup request {}: success={}", req, (Object)result.success())).onErrorResume(err -> {
                CleanupFailedEvent ev = new CleanupFailedEvent((CleanupRequest)req, (Throwable)err);
                this.core.context().environment().eventBus().publish(ev);
                LOGGER_REGULAR.debug("error while handling cleanup request {}, leaving for lost cleanup: '{}'", req, err);
                return Mono.empty();
            });
        }).retryWhen(Retry.allBut(ThreadStopRequestedException.class).exponentialBackoff(Duration.ofMillis(10L), Duration.ofMillis(2000L)).doOnRetry(v -> LOGGER_REGULAR.debug("retrying regular cleanup on error '{}'", (Object)String.valueOf(v.exception()))).retryMax(100000L).toReactorRetry()).subscribe(next -> {}, err -> {
            if (!(err instanceof ThreadStopRequestedException)) {
                LOGGER_REGULAR.warn("regular cleanup thread ended with exception " + err);
            }
        }, () -> LOGGER_REGULAR.warn("regular cleanup thread ending"));
    }

    public TransactionsCleaner getCleaner() {
        return this.cleanerFactory.create(this.core, this.config.supported());
    }

    public Optional<Integer> cleanupQueueLength() {
        if (this.config.cleanupConfig().runRegularAttemptsCleanupThread()) {
            return Optional.of(this.cleanupQueue.size());
        }
        return Optional.empty();
    }

    public void add(CleanupRequest cleanupRequest) {
        this.cleanupQueue.add(cleanupRequest);
    }

    public Mono<Void> shutdown(Duration timeout) {
        return this.stopBackgroundProcesses(timeout);
    }
}

