/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.transaction.cleanup;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.events.transaction.CleanupFailedEvent;
import com.couchbase.client.core.error.transaction.internal.ThreadStopRequestedException;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.core.transaction.cleanup.CleanerFactory;
import com.couchbase.client.core.transaction.cleanup.CleanupRequest;
import com.couchbase.client.core.transaction.cleanup.LostCleanupDistributed;
import com.couchbase.client.core.transaction.cleanup.TransactionsCleaner;
import com.couchbase.client.core.transaction.config.CoreTransactionsConfig;
import com.couchbase.client.core.transaction.log.SimpleEventBusLogger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.Nullable;

@Stability.Internal
public class CoreTransactionsCleanup {
    public static final String CATEGORY = "com.couchbase.transactions.cleanup";
    public static final String CATEGORY_STATS = "com.couchbase.transactions.cleanup.stats";
    public static final String CATEGORY_CLIENT_RECORD = "com.couchbase.transactions.clientrecord";
    public static final String LOST_CATEGORY = "com.couchbase.transactions.cleanup.lost";
    public static final String REGULAR_CATEGORY = "com.couchbase.transactions.cleanup.regular";
    private final Core core;
    private final CoreTransactionsConfig config;
    private final DelayQueue<CleanupRequest> cleanupQueue = new DelayQueue();
    private volatile boolean stop = false;
    private final CountDownLatch stopLatch;
    @Nullable
    private final LostCleanupDistributed lostCleanup;
    private final SimpleEventBusLogger LOGGER;
    private final SimpleEventBusLogger LOGGER_REGULAR;
    private final CleanerFactory cleanerFactory;

    public CoreTransactionsCleanup(Core core, CoreTransactionsConfig config) {
        this.core = Objects.requireNonNull(core);
        this.config = Objects.requireNonNull(config);
        this.LOGGER = new SimpleEventBusLogger(core.context().environment().eventBus(), CATEGORY);
        this.LOGGER_REGULAR = new SimpleEventBusLogger(core.context().environment().eventBus(), REGULAR_CATEGORY);
        this.lostCleanup = config.cleanupConfig().runLostAttemptsCleanupThread() ? new LostCleanupDistributed(core, config, this::getCleaner) : null;
        int countdown = 0;
        this.cleanerFactory = config.cleanerFactory();
        if (config.cleanupConfig().runRegularAttemptsCleanupThread()) {
            this.runRegularAttemptsCleanupThread();
            ++countdown;
        }
        this.stopLatch = new CountDownLatch(countdown);
        config.metadataCollection().ifPresent(mc -> {
            core.openBucket(mc.bucket());
            this.addToCleanupSet((CollectionIdentifier)mc);
        });
        config.cleanupConfig().cleanupSet().forEach(coll -> {
            core.openBucket(coll.bucket());
            this.addToCleanupSet((CollectionIdentifier)coll);
        });
    }

    public void addToCleanupSet(CollectionIdentifier coll) {
        if (this.lostCleanup != null) {
            this.lostCleanup.addToCleanupSet(coll);
        }
    }

    public Set<CollectionIdentifier> cleanupSet() {
        return this.lostCleanup != null ? this.lostCleanup.cleanupSet() : new HashSet();
    }

    Mono<Void> stopBackgroundProcesses(Duration timeout) {
        return Mono.defer(() -> {
            this.stop = true;
            this.LOGGER.info(String.format("Waiting for %d regular background threads to exit", this.stopLatch.getCount()), new Object[0]);
            try {
                if (!this.stopLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
                    this.LOGGER.info("Background threads did not stop in expected time {}", timeout);
                }
            }
            catch (InterruptedException e) {
                this.LOGGER.warn("Interrupted while waiting for background threads " + e, new Object[0]);
            }
            if (this.lostCleanup != null) {
                return this.lostCleanup.shutdown(timeout);
            }
            return Mono.empty();
        }).doOnTerminate(() -> this.LOGGER.info("Background threads have exitted", new Object[0]));
    }

    private void runRegularAttemptsCleanupThread() {
        Objects.requireNonNull(this.LOGGER);
        this.LOGGER_REGULAR.debug("Starting background cleanup thread to find transactions from this client", new Object[0]);
        Flux.interval((Duration)Duration.ofMillis(100L), (Scheduler)this.core.context().environment().transactionsSchedulers().schedulerCleanup()).flatMap(v -> {
            if (this.stop) {
                this.LOGGER_REGULAR.info("Stopping background cleanup thread for transactions from this client", new Object[0]);
                this.stopLatch.countDown();
                return Mono.error((Throwable)new ThreadStopRequestedException());
            }
            return Mono.just((Object)v);
        }).flatMap(v -> {
            ArrayList<CleanupRequest> requests = new ArrayList<CleanupRequest>();
            CleanupRequest head = null;
            do {
                if ((head = (CleanupRequest)this.cleanupQueue.poll()) == null) continue;
                requests.add(head);
            } while (head != null);
            return Flux.fromIterable(requests).publishOn(this.core.context().environment().transactionsSchedulers().schedulerCleanup());
        }).flatMap(req -> {
            TransactionsCleaner cleaner = this.getCleaner();
            return cleaner.performCleanup((CleanupRequest)req, true, null).doOnSuccess(result -> this.LOGGER_REGULAR.debug(String.format("result of cleanup request %s: success=%s", req, result.success()), new Object[0])).onErrorResume(err -> {
                CleanupFailedEvent ev = new CleanupFailedEvent((CleanupRequest)req, (Throwable)err);
                this.core.context().environment().eventBus().publish(ev);
                this.LOGGER_REGULAR.debug(String.format("error while handling cleanup request %s, leaving for lost cleanup: '%s'", req, err), new Object[0]);
                return Mono.empty();
            });
        }).retryWhen(Retry.allBut(ThreadStopRequestedException.class).exponentialBackoff(Duration.ofMillis(10L), Duration.ofMillis(2000L)).doOnRetry(v -> this.LOGGER_REGULAR.debug(String.format("retrying regular cleanup on error '%s'", v.exception()), new Object[0])).retryMax(100000L).toReactorRetry()).subscribe(next -> {}, err -> {
            if (!(err instanceof ThreadStopRequestedException)) {
                this.LOGGER_REGULAR.warn("regular cleanup thread ended with exception " + err, new Object[0]);
            }
        }, () -> this.LOGGER_REGULAR.warn("regular cleanup thread ending", new Object[0]));
    }

    public TransactionsCleaner getCleaner() {
        return this.cleanerFactory.create(this.core);
    }

    public Optional<Integer> cleanupQueueLength() {
        if (this.config.cleanupConfig().runRegularAttemptsCleanupThread()) {
            return Optional.of(this.cleanupQueue.size());
        }
        return Optional.empty();
    }

    public void add(CleanupRequest cleanupRequest) {
        this.cleanupQueue.add(cleanupRequest);
    }

    public Mono<Void> shutdown(Duration timeout) {
        return this.stopBackgroundProcesses(timeout);
    }
}

