/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.transaction.cleanup;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.api.kv.CoreSubdocGetResult;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.events.transaction.TransactionCleanupAttemptEvent;
import com.couchbase.client.core.cnc.tracing.RequestTracerAndDecorator;
import com.couchbase.client.core.cnc.tracing.TracingAttribute;
import com.couchbase.client.core.cnc.tracing.TracingDecorator;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.msg.kv.CodecFlags;
import com.couchbase.client.core.msg.kv.SubdocCommandType;
import com.couchbase.client.core.msg.kv.SubdocMutateRequest;
import com.couchbase.client.core.transaction.CoreTransactionGetResult;
import com.couchbase.client.core.transaction.cleanup.CleanerHooks;
import com.couchbase.client.core.transaction.cleanup.CleanupRequest;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecordEntry;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecordUtil;
import com.couchbase.client.core.transaction.components.DocRecord;
import com.couchbase.client.core.transaction.components.DocumentGetter;
import com.couchbase.client.core.transaction.components.DurabilityLevelUtil;
import com.couchbase.client.core.transaction.error.internal.ErrorClass;
import com.couchbase.client.core.transaction.forwards.CoreTransactionsSupportedExtensions;
import com.couchbase.client.core.transaction.forwards.ForwardCompatibility;
import com.couchbase.client.core.transaction.forwards.ForwardCompatibilityStage;
import com.couchbase.client.core.transaction.log.CoreTransactionLogger;
import com.couchbase.client.core.transaction.support.AttemptState;
import com.couchbase.client.core.transaction.support.OptionsUtil;
import com.couchbase.client.core.transaction.support.SpanWrapper;
import com.couchbase.client.core.transaction.support.SpanWrapperUtil;
import com.couchbase.client.core.transaction.util.DebugUtil;
import com.couchbase.client.core.transaction.util.MeteringUnits;
import com.couchbase.client.core.transaction.util.TransactionKVHandler;
import com.couchbase.client.core.transaction.util.TriFunction;
import com.couchbase.client.core.util.Bytes;
import com.couchbase.client.core.util.CbPreconditions;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;

@Stability.Internal
public class TransactionsCleaner {
    private final Core core;
    private final CleanerHooks hooks;
    private final CoreTransactionsSupportedExtensions supported;
    private static final int BEING_LOGGING_FAILED_CLEANUPS_AT_WARN_AFTER_X_MINUTES = 120;

    public TransactionsCleaner(Core core, CleanerHooks hooks, CoreTransactionsSupportedExtensions supported) {
        this.core = Objects.requireNonNull(core);
        this.hooks = Objects.requireNonNull(hooks);
        this.supported = Objects.requireNonNull(supported);
    }

    private Duration kvDurableTimeout() {
        return this.core.context().environment().timeoutConfig().kvDurableTimeout();
    }

    private Duration kvNonMutatingTimeout() {
        return this.core.context().environment().timeoutConfig().kvTimeout();
    }

    Mono<Void> cleanupDocs(CoreTransactionLogger perEntryLog, CleanupRequest req, SpanWrapper pspan) {
        String attemptId = req.attemptId();
        switch (req.state()) {
            case COMMITTED: {
                Mono<Void> inserts = this.commitDocs(perEntryLog, attemptId, req.stagedInserts(), req, pspan);
                Mono<Void> replaces = this.commitDocs(perEntryLog, attemptId, req.stagedReplaces(), req, pspan);
                Mono<Void> removes = this.removeDocsStagedForRemoval(perEntryLog, attemptId, req.stagedRemoves(), req, pspan);
                return inserts.then(replaces).then(removes);
            }
            case ABORTED: {
                Mono<Void> inserts = this.removeDocs(perEntryLog, attemptId, req.stagedInserts(), req, pspan);
                Mono<Void> replaces = this.removeTxnLinks(perEntryLog, attemptId, req.stagedReplaces(), req, pspan);
                Mono<Void> removes = this.removeTxnLinks(perEntryLog, attemptId, req.stagedRemoves(), req, pspan);
                return inserts.then(replaces).then(removes);
            }
            case PENDING: {
                perEntryLog.logDefer(req.attemptId(), "No docs cleanup possible as txn in state {}, just removing", Event.Severity.DEBUG, new Object[]{req.state()});
                return Mono.empty();
            }
        }
        perEntryLog.logDefer(req.attemptId(), "No docs cleanup to do as txn in state {}, just removing", Event.Severity.DEBUG, new Object[]{req.state()});
        return Mono.empty();
    }

    private Mono<Void> commitDocs(CoreTransactionLogger perEntryLog, String attemptId, List<DocRecord> docs, CleanupRequest req, SpanWrapper pspan) {
        return this.doPerDoc(perEntryLog, attemptId, docs, pspan, true, (collection, doc, lir) -> {
            CbPreconditions.check(doc.links() != null);
            CbPreconditions.check(doc.links().isDocumentInTransaction());
            CbPreconditions.check(doc.links().stagedContentJsonOrBinary().isPresent());
            byte[] content = doc.links().stagedContentJsonOrBinary().get();
            return this.hooks.beforeCommitDoc.apply(doc.id()).then(Mono.defer(() -> {
                if (lir.tombstone()) {
                    return TransactionKVHandler.insert(this.core, collection, doc.id(), content, doc.links().stagedUserFlags().orElse(CodecFlags.JSON_COMMON_FLAGS), this.kvDurableTimeout(), req.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::commitDocsInsert"), pspan);
                }
                List<SubdocMutateRequest.Command> commands = Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "txn", null, false, true, false, 0), new SubdocMutateRequest.Command(SubdocCommandType.SET_DOC, "", content, false, false, false, 1));
                return TransactionKVHandler.mutateIn(this.core, collection, doc.id(), this.kvDurableTimeout(), false, false, false, lir.tombstone(), false, doc.cas(), doc.links().stagedUserFlags().orElse(CodecFlags.JSON_COMMON_FLAGS), req.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::commitDocs"), pspan, commands);
            })).doOnSubscribe(v -> perEntryLog.logDefer(attemptId, "removing txn links and writing content to doc {}", Event.Severity.DEBUG, DebugUtil.docId(doc))).then();
        });
    }

    private Mono<Void> removeTxnLinks(CoreTransactionLogger perEntryLog, String attemptId, List<DocRecord> docs, CleanupRequest req, SpanWrapper pspan) {
        return this.doPerDoc(perEntryLog, attemptId, docs, pspan, false, (collectionIdentifier, doc, lir) -> this.hooks.beforeRemoveLinks.apply(doc.id()).then(TransactionKVHandler.mutateIn(this.core, collectionIdentifier, doc.id(), this.kvDurableTimeout(), false, false, false, lir.tombstone(), false, doc.cas(), doc.userFlags(), req.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::removeTxnLinks"), pspan, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "txn", Bytes.EMPTY_BYTE_ARRAY, false, true, false, 0)))).doOnSubscribe(v -> perEntryLog.logDefer(attemptId, "removing txn links from doc {}", Event.Severity.DEBUG, DebugUtil.docId(doc))).then());
    }

    private Mono<Void> removeDocsStagedForRemoval(CoreTransactionLogger perEntryLog, String attemptId, List<DocRecord> docs, CleanupRequest req, SpanWrapper pspan) {
        return this.doPerDoc(perEntryLog, attemptId, docs, pspan, true, (collection, doc, lir) -> {
            if (doc.links().isDocumentBeingRemoved()) {
                return this.hooks.beforeRemoveDocStagedForRemoval.apply(doc.id()).then(TransactionKVHandler.remove(this.core, collection, doc.id(), this.kvDurableTimeout(), doc.cas(), req.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::removeDoc"), pspan)).doOnSubscribe(v -> perEntryLog.debug(attemptId, "removing doc {}", doc.id())).then();
            }
            return Mono.create(v -> {
                perEntryLog.debug(attemptId, "doc {} does not have expected remove indication, skipping", DebugUtil.docId(doc));
                v.success();
            });
        });
    }

    private Mono<Void> removeDocs(CoreTransactionLogger perEntryLog, String attemptId, List<DocRecord> docs, CleanupRequest req, SpanWrapper pspan) {
        return this.doPerDoc(perEntryLog, attemptId, docs, pspan, false, (collection, doc, lir) -> this.hooks.beforeRemoveDoc.apply(doc.id()).then(Mono.defer(() -> {
            if (lir.tombstone()) {
                return TransactionKVHandler.mutateIn(this.core, collection, doc.id(), this.kvDurableTimeout(), false, false, false, true, false, doc.cas(), doc.userFlags(), req.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::commitDocs"), pspan, Collections.singletonList(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "txn", Bytes.EMPTY_BYTE_ARRAY, false, true, false, 0)));
            }
            return TransactionKVHandler.remove(this.core, collection, doc.id(), this.kvDurableTimeout(), doc.cas(), req.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::removeDocs"), pspan);
        })).doOnSubscribe(v -> perEntryLog.debug(attemptId, "removing doc {}", DebugUtil.docId(doc))).then());
    }

    private Mono<Void> doPerDoc(CoreTransactionLogger perEntryLog, String attemptId, List<DocRecord> docs, SpanWrapper pspan, boolean requireCrc32ToMatchStaging, TriFunction<CollectionIdentifier, CoreTransactionGetResult, CoreSubdocGetResult, Mono<Void>> perDoc) {
        return Flux.fromIterable(docs).publishOn(this.core.context().environment().transactionsSchedulers().schedulerCleanup()).concatMap(docRecord -> {
            CollectionIdentifier collection = new CollectionIdentifier(docRecord.bucketName(), Optional.of(docRecord.scopeName()), Optional.of(docRecord.collectionName()));
            MeteringUnits.MeteringUnitsBuilder units = new MeteringUnits.MeteringUnitsBuilder();
            return this.hooks.beforeDocGet.apply(docRecord.id()).then(this.doPerDocGotDoc(perEntryLog, attemptId, pspan, requireCrc32ToMatchStaging, perDoc, (DocRecord)docRecord, collection, units));
        }).then();
    }

    private Mono<Void> doPerDocGotDoc(CoreTransactionLogger perEntryLog, String attemptId, SpanWrapper pspan, boolean requireCrc32ToMatchStaging, TriFunction<CollectionIdentifier, CoreTransactionGetResult, CoreSubdocGetResult, Mono<Void>> perDoc, DocRecord docRecord, CollectionIdentifier collection, MeteringUnits.MeteringUnitsBuilder units) {
        return DocumentGetter.justGetDoc(this.core, collection, docRecord.id(), this.kvNonMutatingTimeout(), pspan, true, perEntryLog, units, false).flatMap(docOpt -> {
            if (docOpt.isPresent()) {
                CoreTransactionGetResult doc = (CoreTransactionGetResult)((Tuple2)docOpt.get()).getT1();
                CoreSubdocGetResult lir = (CoreSubdocGetResult)((Tuple2)docOpt.get()).getT2();
                MeteringUnits built = units.build();
                perEntryLog.debug(attemptId, "handling doc {} with cas {} and links {}, isTombstone={}{}", DebugUtil.docId(doc), doc.cas(), doc.links(), lir.tombstone(), DebugUtil.dbg(built));
                if (!doc.links().isDocumentInTransaction()) {
                    perEntryLog.debug(attemptId, "no staged content for doc {}, assuming it was committed and skipping", DebugUtil.docId(doc));
                    return Mono.empty();
                }
                if (!doc.links().stagedAttemptId().get().equals(attemptId)) {
                    perEntryLog.debug(attemptId, "for doc {}, staged version is for a different attempt {}, skipping", DebugUtil.docId(doc), doc.links().stagedAttemptId().get());
                    return Mono.empty();
                }
                if (requireCrc32ToMatchStaging && doc.links().crc32OfStaging().isPresent()) {
                    String crc32WhenStaging = doc.links().crc32OfStaging().get();
                    String crc32Now = doc.crc32OfGet().get();
                    perEntryLog.debug(attemptId, "checking whether document {} has changed since staging, crc32 then {} now {}", DebugUtil.docId(doc), crc32WhenStaging, crc32Now);
                    if (!crc32Now.equals(crc32WhenStaging)) {
                        perEntryLog.warn(attemptId, "document {} has changed since staging, ignoring it to avoid data loss", DebugUtil.docId(doc));
                        return Mono.empty();
                    }
                }
                return (Mono)perDoc.apply(collection, doc, lir);
            }
            perEntryLog.debug(attemptId, "could not get doc {}, skipping", DebugUtil.docId(collection, docRecord.id()));
            return Mono.empty();
        }).onErrorResume(err -> {
            ErrorClass ec = ErrorClass.classify(err);
            perEntryLog.debug(attemptId, "got exception while handling doc {}: {}", DebugUtil.docId(collection, docRecord.id()), DebugUtil.dbg(err));
            if (ec == ErrorClass.FAIL_CAS_MISMATCH) {
                perEntryLog.debug(attemptId, "got CAS mismatch while cleaning up doc {}, failing this cleanup attempt (it will be retried)", DebugUtil.docId(collection, docRecord.id()));
                return Mono.error((Throwable)err);
            }
            return Mono.error((Throwable)err);
        });
    }

    private RequestTracerAndDecorator tracer() {
        return this.core.context().coreResources().requestTracerAndDecorator();
    }

    public Mono<TransactionCleanupAttemptEvent> cleanupATREntry(CollectionIdentifier atrCollection, String atrId, String attemptId, ActiveTransactionRecordEntry atrEntry, boolean isRegularCleanup) {
        CleanupRequest req = CleanupRequest.fromAtrEntry(atrCollection, atrEntry);
        return this.performCleanup(req, isRegularCleanup, null);
    }

    public Mono<TransactionCleanupAttemptEvent> performCleanup(CleanupRequest req, boolean isRegularCleanup, @Nullable SpanWrapper pspan) {
        SpanWrapper span = SpanWrapperUtil.createOp(null, this.tracer(), req.atrCollection(), req.atrId(), "transaction_cleanup", pspan);
        TracingDecorator tip = this.core.context().coreResources().tracingDecorator();
        tip.provideAttr(TracingAttribute.TRANSACTION_ATTEMPT_ID, span.span(), req.attemptId());
        tip.provideAttr(TracingAttribute.TRANSACTION_AGE, span.span(), req.ageMillis());
        tip.provideAttr(TracingAttribute.TRANSACTION_STATE, span.span(), req.state().name());
        req.durabilityLevel().ifPresent(v -> tip.provideLowCardinalityAttr(TracingAttribute.DURABILITY, span.span(), DurabilityLevelUtil.convertDurabilityLevel(v)));
        return Mono.defer(() -> {
            CollectionIdentifier atrCollection = req.atrCollection();
            String atrId = req.atrId();
            String attemptId = req.attemptId();
            CoreTransactionLogger perEntryLog = new CoreTransactionLogger(this.core.context().environment().eventBus(), ActiveTransactionRecordUtil.getAtrDebug(atrCollection, atrId).toString());
            Event.Severity logLevel = Event.Severity.DEBUG;
            perEntryLog.logDefer(attemptId, "Cleaning up ATR entry (isRegular={}) {}", logLevel, isRegularCleanup, req);
            Mono<Void> cleanupDocs = this.cleanupDocs(perEntryLog, req, span);
            Mono<Object> cleanupEntry = this.removeATREntry(req.state(), atrCollection, atrId, attemptId, perEntryLog, span, req);
            return ForwardCompatibility.check(this.core, ForwardCompatibilityStage.CLEANUP_ENTRY, req.forwardCompatibility(), perEntryLog, this.supported).then(cleanupDocs).then(cleanupEntry).then(Mono.fromCallable(() -> {
                TransactionCleanupAttemptEvent event = new TransactionCleanupAttemptEvent(Event.Severity.DEBUG, true, isRegularCleanup, perEntryLog.logs(), attemptId, atrId, atrCollection, req, "");
                this.core.context().environment().eventBus().publish(event);
                return event;
            })).onErrorResume(err -> {
                long ageInMinutes = TimeUnit.MILLISECONDS.toMinutes(req.ageMillis());
                perEntryLog.logDefer(attemptId, "error while attempting to cleanup ATR entry {}, entry is {} mins old, cleanup will retry later: {}", Event.Severity.WARN, ActiveTransactionRecordUtil.getAtrDebug(atrCollection, atrId), ageInMinutes, DebugUtil.dbg(err));
                Event.Severity level = Event.Severity.DEBUG;
                String addlDebug = "";
                if (ageInMinutes >= 120L) {
                    level = Event.Severity.WARN;
                    addlDebug = "despite being " + ageInMinutes + " mins old which could indicate a serious error - please raise with support.  Diagnostics: ";
                }
                TransactionCleanupAttemptEvent event = new TransactionCleanupAttemptEvent(level, false, isRegularCleanup, perEntryLog.logs(), attemptId, atrId, atrCollection, req, addlDebug);
                this.core.context().environment().eventBus().publish(event);
                return Mono.just((Object)event);
            }).doOnError(err -> span.finish((Throwable)err)).doOnTerminate(() -> span.finish());
        });
    }

    Mono<Object> removeATREntry(AttemptState state, CollectionIdentifier atrCollection, String atrId, String attemptId, CoreTransactionLogger perEntryLog, SpanWrapper pspan, CleanupRequest req) {
        ArrayList<SubdocMutateRequest.Command> specs = new ArrayList<SubdocMutateRequest.Command>();
        if (state == AttemptState.PENDING) {
            specs.add(new SubdocMutateRequest.Command(SubdocCommandType.DICT_ADD, "attempts." + attemptId + "." + "p", new byte[]{48}, false, true, false, 0));
        }
        specs.add(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "attempts." + attemptId, Bytes.EMPTY_BYTE_ARRAY, false, true, false, specs.size()));
        return this.hooks.beforeAtrRemove.get().then(TransactionKVHandler.mutateIn(this.core, atrCollection, atrId, this.kvDurableTimeout(), false, false, false, false, false, 0L, CodecFlags.BINARY_COMMON_FLAGS, req.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::removeATREntry"), pspan, specs)).doOnNext(v -> perEntryLog.debug(attemptId, "successfully removed ATR entry")).onErrorResume(err -> {
            ErrorClass ec = ErrorClass.classify(err);
            perEntryLog.debug(attemptId, "got exception while removing ATR entry {}: {}", atrId, DebugUtil.dbg(err));
            if (ec == ErrorClass.FAIL_PATH_NOT_FOUND) {
                perEntryLog.logDefer(attemptId, "failed to remove {} as entry isn't there, likely due to concurrent cleanup", Event.Severity.DEBUG, ActiveTransactionRecordUtil.getAtrDebug(atrCollection, atrId));
                return Mono.empty();
            }
            if (ec == ErrorClass.FAIL_PATH_ALREADY_EXISTS) {
                perEntryLog.logDefer(attemptId, "not removing {} as it has changed from PENDING to COMMITTED", Event.Severity.DEBUG, ActiveTransactionRecordUtil.getAtrDebug(atrCollection, atrId));
                return Mono.error((Throwable)err);
            }
            return Mono.error((Throwable)err);
        }).map(v -> v);
    }

    public CleanerHooks hooks() {
        return this.hooks;
    }
}

