package com.couchbase.client.core.transaction;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.ObjectNode;
import com.couchbase.client.core.error.transaction.RetryTransactionException;
import com.couchbase.client.core.error.transaction.TransactionOperationFailedException;
import com.couchbase.client.core.error.transaction.internal.CoreTransactionCommitAmbiguousException;
import com.couchbase.client.core.error.transaction.internal.CoreTransactionExpiredException;
import com.couchbase.client.core.error.transaction.internal.CoreTransactionFailedException;
import com.couchbase.client.core.msg.query.QueryResponse;
import com.couchbase.client.core.retry.reactor.DefaultRetry;
import com.couchbase.client.core.retry.reactor.Jitter;
import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
import com.couchbase.client.core.transaction.config.CoreMergedTransactionConfig;
import com.couchbase.client.core.transaction.config.CoreTransactionOptions;
import com.couchbase.client.core.transaction.config.CoreTransactionsConfig;
import com.couchbase.client.core.transaction.forwards.Supported;
import com.couchbase.client.core.transaction.support.SpanWrapper;
import com.couchbase.client.core.transaction.threadlocal.TransactionMarker;
import com.couchbase.client.core.transaction.util.CoreTransactionAttemptContextHooks;
import com.couchbase.client.core.transaction.util.DebugUtil;
import com.couchbase.client.core.transaction.util.QueryUtil;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.retry.Retry;

@Stability.Internal
/* loaded from: input_file:com/couchbase/client/core/transaction/CoreTransactionsReactive.class */
public class CoreTransactionsReactive {
    static final int MAX_ATTEMPTS = 100;
    private final Core core;
    private final CoreTransactionsConfig config;

    public CoreTransactionsReactive(Core core, CoreTransactionsConfig coreTransactionsConfig) {
        this.core = (Core) Objects.requireNonNull(core);
        this.config = (CoreTransactionsConfig) Objects.requireNonNull(coreTransactionsConfig);
    }

    public Mono<CoreTransactionResult> executeTransaction(Mono<CoreTransactionAttemptContext> mono, CoreMergedTransactionConfig coreMergedTransactionConfig, CoreTransactionContext coreTransactionContext, Function<CoreTransactionAttemptContext, Mono<Void>> function, boolean z) {
        AtomicReference atomicReference = new AtomicReference();
        return mono.publishOn(this.core.context().environment().transactionsSchedulers().schedulerBlocking()).doOnSubscribe(subscription -> {
            if (atomicReference.get() == null) {
                atomicReference.set(Long.valueOf(System.nanoTime()));
            }
        }).doOnNext(coreTransactionAttemptContext -> {
            coreTransactionContext.incAttempts();
            coreTransactionAttemptContext.LOGGER.info(coreTransactionAttemptContext.attemptId(), "starting attempt %d/%s/%s", Integer.valueOf(coreTransactionContext.numAttempts()), coreTransactionAttemptContext.transactionId(), coreTransactionAttemptContext.attemptId());
        }).flatMap(coreTransactionAttemptContext2 -> {
            return ((Mono) function.apply(coreTransactionAttemptContext2)).contextWrite(context -> {
                return context.put(TransactionMarker.class, new TransactionMarker(coreTransactionAttemptContext2));
            }).onErrorResume(th -> {
                return Mono.error(coreTransactionAttemptContext2.convertToOperationFailedIfNeeded(th, z));
            }).then(coreTransactionAttemptContext2.implicitCommit(z)).onErrorResume(th2 -> {
                return coreTransactionAttemptContext2.lambdaEnd(core().transactionsCleanup(), th2, z);
            }).then(coreTransactionAttemptContext2.lambdaEnd(core().transactionsCleanup(), null, z)).then(coreTransactionAttemptContext2.transactionEnd(null, z)).onErrorResume(th3 -> {
                if (!(th3 instanceof RetryTransactionException) && !(th3 instanceof CoreTransactionFailedException)) {
                    return coreTransactionAttemptContext2.transactionEnd(th3, z);
                }
                return Mono.error(th3);
            }).doOnNext(coreTransactionResult -> {
                coreTransactionContext.span().attribute("db.couchbase.transactions.retries", Integer.valueOf(coreTransactionContext.numAttempts())).finish();
            }).doOnError(th4 -> {
                coreTransactionContext.span().attribute("db.couchbase.transactions.retries", Integer.valueOf(coreTransactionContext.numAttempts())).failWith(th4);
            });
        }).retryWhen(executeCreateRetryWhen(coreTransactionContext)).doOnTerminate(() -> {
            coreTransactionContext.LOGGER.info("finished txn in %dus", Long.valueOf(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - ((Long) atomicReference.get()).longValue())));
        });
    }

    private Retry executeCreateRetryWhen(CoreTransactionContext coreTransactionContext) {
        return DefaultRetry.create(retryContext -> {
            return retryContext.exception() instanceof RetryTransactionException;
        }).exponentialBackoff(Duration.ofMillis(1L), Duration.ofMillis(100L)).doOnRetry(retryContext2 -> {
            coreTransactionContext.LOGGER.info("<>", "retrying transaction after backoff %dmillis", Long.valueOf(retryContext2.backoff().toMillis()));
        }).jitter(Jitter.random()).retryMax(100L).toReactorRetry();
    }

    public CoreTransactionAttemptContext createAttemptContext(CoreTransactionContext coreTransactionContext, CoreMergedTransactionConfig coreMergedTransactionConfig, String str) {
        return coreMergedTransactionConfig.attemptContextFactory().create(this.core, coreTransactionContext, coreMergedTransactionConfig, str, this, Optional.of(coreTransactionContext.span()));
    }

    public Mono<CoreTransactionResult> run(Function<CoreTransactionAttemptContext, Mono<?>> function, @Nullable CoreTransactionOptions coreTransactionOptions) {
        return Mono.defer(() -> {
            CoreMergedTransactionConfig coreMergedTransactionConfig = new CoreMergedTransactionConfig(this.config, Optional.ofNullable(coreTransactionOptions));
            CoreTransactionContext coreTransactionContext = new CoreTransactionContext(this.core.context().environment().requestTracer(), this.core.context().environment().eventBus(), UUID.randomUUID().toString(), coreMergedTransactionConfig, this.core.transactionsCleanup());
            coreTransactionContext.LOGGER.info(configDebug(this.config, coreTransactionOptions, this.core));
            return executeTransaction(Mono.fromCallable(() -> {
                return createAttemptContext(coreTransactionContext, coreMergedTransactionConfig, UUID.randomUUID().toString());
            }), coreMergedTransactionConfig, coreTransactionContext, coreTransactionAttemptContext -> {
                return Mono.defer(() -> {
                    return (Mono) function.apply(coreTransactionAttemptContext);
                }).then();
            }, false);
        });
    }

    private void logElidedStacktrace(CoreTransactionAttemptContext coreTransactionAttemptContext, Throwable th) {
        coreTransactionAttemptContext.LOGGER.info(coreTransactionAttemptContext.attemptId(), DebugUtil.createElidedStacktrace(th));
    }

    private static String configDebug(CoreTransactionsConfig coreTransactionsConfig, @Nullable CoreTransactionOptions coreTransactionOptions, Core core) {
        StringBuilder sb = new StringBuilder();
        sb.append("SDK version: ");
        sb.append(core.context().environment().clientVersion().orElse("-"));
        sb.append(" config: ");
        sb.append("atrs=");
        sb.append(coreTransactionsConfig.numAtrs());
        sb.append(", metadataCollection=");
        sb.append(coreTransactionsConfig.metadataCollection());
        sb.append(", expiry=");
        if (coreTransactionOptions != null) {
            sb.append(coreTransactionOptions.timeout().orElse(coreTransactionsConfig.transactionExpirationTime()).toMillis());
        } else {
            sb.append(coreTransactionsConfig.transactionExpirationTime().toMillis());
        }
        sb.append("ms durability=");
        sb.append(coreTransactionsConfig.durabilityLevel());
        if (coreTransactionOptions != null) {
            sb.append(" per-txn config=");
            sb.append(" durability=");
            sb.append(coreTransactionOptions.durabilityLevel());
        }
        sb.append(", supported=");
        sb.append(Supported.SUPPORTED);
        return sb.toString();
    }

    public CoreTransactionsConfig config() {
        return this.config;
    }

    public Core core() {
        return this.core;
    }

    @Stability.Uncommitted
    public Mono<QueryResponse> query(String str, String str2, String str3, ObjectNode objectNode, Optional<RequestSpan> optional, Consumer<RuntimeException> consumer) {
        return Mono.defer(() -> {
            CoreMergedTransactionConfig coreMergedTransactionConfig = new CoreMergedTransactionConfig(this.config, Optional.empty());
            CoreTransactionContext coreTransactionContext = new CoreTransactionContext(this.core.context().environment().requestTracer(), this.core.context().environment().eventBus(), UUID.randomUUID().toString(), coreMergedTransactionConfig, this.core.transactionsCleanup());
            coreTransactionContext.LOGGER.info(configDebug(this.config, null, this.core));
            Mono<CoreTransactionAttemptContext> fromCallable = Mono.fromCallable(() -> {
                return createAttemptContext(coreTransactionContext, coreMergedTransactionConfig, UUID.randomUUID().toString());
            });
            AtomicReference atomicReference = new AtomicReference();
            Function<CoreTransactionAttemptContext, Mono<Void>> function = coreTransactionAttemptContext -> {
                return Mono.defer(() -> {
                    return coreTransactionAttemptContext.doQueryOperation("single query streaming", (num, atomicReference2) -> {
                        return coreTransactionAttemptContext.queryWrapperLocked(0, str2, str3, str, objectNode, CoreTransactionAttemptContextHooks.HOOK_QUERY, false, true, null, null, (SpanWrapper) optional.map(requestSpan -> {
                            return new SpanWrapper(this.core.context().environment().requestTracer(), requestSpan);
                        }).orElse(null), true, null, true).doOnNext(queryResponse -> {
                            atomicReference.set(queryResponse);
                        });
                    }).then();
                });
            };
            Consumer<Throwable> singleQueryHandleErrorDuringRowStreaming = singleQueryHandleErrorDuringRowStreaming(coreTransactionContext, consumer);
            return executeTransaction(fromCallable, coreMergedTransactionConfig, coreTransactionContext, function, true).then(Mono.defer(() -> {
                QueryResponse queryResponse = (QueryResponse) atomicReference.get();
                if (queryResponse == null) {
                    return Mono.error(new CoreTransactionFailedException(new IllegalStateException("No query has been run"), coreTransactionContext.LOGGER, coreTransactionContext.transactionId()));
                }
                return Mono.just(new QueryResponse(queryResponse.status(), queryResponse.header(), queryResponse.rows().onErrorResume(th -> {
                    singleQueryHandleErrorDuringRowStreaming.accept(th);
                    return Mono.empty();
                }), queryResponse.trailer()));
            }));
        });
    }

    private static Consumer<Throwable> singleQueryHandleErrorDuringRowStreaming(CoreTransactionContext coreTransactionContext, Consumer<RuntimeException> consumer) {
        return th -> {
            RuntimeException convertQueryError = QueryUtil.convertQueryError(th);
            coreTransactionContext.LOGGER.warn("", "got error on rows stream %s, converted from %s", DebugUtil.dbg(convertQueryError), DebugUtil.dbg(th));
            RuntimeException runtimeException = convertQueryError;
            if (convertQueryError instanceof TransactionOperationFailedException) {
                TransactionOperationFailedException transactionOperationFailedException = (TransactionOperationFailedException) convertQueryError;
                switch (transactionOperationFailedException.toRaise()) {
                    case TRANSACTION_FAILED_POST_COMMIT:
                        runtimeException = new CoreTransactionFailedException(transactionOperationFailedException, coreTransactionContext.LOGGER, coreTransactionContext.transactionId());
                        break;
                    case TRANSACTION_EXPIRED:
                        runtimeException = new CoreTransactionExpiredException(th, coreTransactionContext.LOGGER, coreTransactionContext.transactionId(), "Transaction has expired configured timeout of " + coreTransactionContext.expirationTime().toMillis() + "ms.  The transaction is not committed.");
                        break;
                    case TRANSACTION_COMMIT_AMBIGUOUS:
                        runtimeException = new CoreTransactionCommitAmbiguousException(th, coreTransactionContext.LOGGER, coreTransactionContext.transactionId(), "It is ambiguous whether the transaction committed");
                        break;
                    default:
                        runtimeException = new CoreTransactionFailedException(th, coreTransactionContext.LOGGER, coreTransactionContext.transactionId());
                        break;
                }
            }
            consumer.accept(runtimeException);
        };
    }

    public Mono<CoreTransactionAttemptContext.BufferedQueryResponse> queryBlocking(String str, String str2, String str3, ObjectNode objectNode, Optional<RequestSpan> optional) {
        return Mono.defer(() -> {
            objectNode.put("tximplicit", true);
            CoreMergedTransactionConfig coreMergedTransactionConfig = new CoreMergedTransactionConfig(this.config, optional.map(CoreTransactionOptions::create));
            CoreTransactionContext coreTransactionContext = new CoreTransactionContext(this.core.context().environment().requestTracer(), this.core.context().environment().eventBus(), UUID.randomUUID().toString(), coreMergedTransactionConfig, this.core.transactionsCleanup());
            coreTransactionContext.LOGGER.info(configDebug(this.config, null, this.core));
            Mono<CoreTransactionAttemptContext> fromCallable = Mono.fromCallable(() -> {
                return createAttemptContext(coreTransactionContext, coreMergedTransactionConfig, UUID.randomUUID().toString());
            });
            AtomicReference atomicReference = new AtomicReference();
            return executeTransaction(fromCallable, coreMergedTransactionConfig, coreTransactionContext, coreTransactionAttemptContext -> {
                return Mono.defer(() -> {
                    return coreTransactionAttemptContext.queryBlocking(str, str2, str3, objectNode, true).doOnNext(bufferedQueryResponse -> {
                        atomicReference.set(bufferedQueryResponse);
                    }).then();
                });
            }, true).then(Mono.defer(() -> {
                return atomicReference.get() != null ? Mono.just((CoreTransactionAttemptContext.BufferedQueryResponse) atomicReference.get()) : Mono.error(new CoreTransactionFailedException(new IllegalStateException("No query has been run"), coreTransactionContext.LOGGER, coreTransactionContext.transactionId()));
            }));
        });
    }
}
