/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.kv;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.config.BucketCapabilities;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.error.AuthenticationFailureException;
import com.couchbase.client.core.error.CollectionNotFoundException;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.FeatureNotAvailableException;
import com.couchbase.client.core.error.InternalServerFailureException;
import com.couchbase.client.core.error.InvalidArgumentException;
import com.couchbase.client.core.error.MutationTokenOutdatedException;
import com.couchbase.client.core.error.RangeScanPartitionFailedException;
import com.couchbase.client.core.error.RequestCanceledException;
import com.couchbase.client.core.error.UnambiguousTimeoutException;
import com.couchbase.client.core.error.context.CancellationErrorContext;
import com.couchbase.client.core.error.context.KeyValueErrorContext;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.kv.CoreRangeScan;
import com.couchbase.client.core.kv.CoreRangeScanId;
import com.couchbase.client.core.kv.CoreRangeScanItem;
import com.couchbase.client.core.kv.CoreSamplingScan;
import com.couchbase.client.core.kv.CoreScanOptions;
import com.couchbase.client.core.kv.LastCoreRangeScanItem;
import com.couchbase.client.core.kv.RangeScanContext;
import com.couchbase.client.core.msg.CancellationReason;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.msg.kv.MutationToken;
import com.couchbase.client.core.msg.kv.RangeScanCancelRequest;
import com.couchbase.client.core.msg.kv.RangeScanContinueRequest;
import com.couchbase.client.core.msg.kv.RangeScanCreateRequest;
import com.couchbase.client.core.util.Validators;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.util.retry.Retry;

@Stability.Internal
public class RangeScanOrchestrator {
    public static final int RANGE_SCAN_DEFAULT_BATCH_BYTE_LIMIT = 15000;
    public static final int RANGE_SCAN_DEFAULT_BATCH_ITEM_LIMIT = 50;
    private final Core core;
    private final CollectionIdentifier collectionIdentifier;
    private volatile BucketConfig currentBucketConfig;
    private volatile boolean capabilityEnabled = false;

    public RangeScanOrchestrator(Core core, CollectionIdentifier collectionIdentifier) {
        this.core = Validators.notNull(core, "Core");
        this.collectionIdentifier = Validators.notNull(collectionIdentifier, "CollectionIdentifier");
        core.configurationProvider().configs().subscribe(cc -> {
            BucketConfig bucketConfig = cc.bucketConfig(collectionIdentifier.bucket());
            if (bucketConfig != null) {
                this.currentBucketConfig = bucketConfig;
                this.capabilityEnabled = bucketConfig.bucketCapabilities().contains((Object)BucketCapabilities.RANGE_SCAN);
            }
        });
    }

    public Flux<CoreRangeScanItem> rangeScan(CoreRangeScan rangeScan, CoreScanOptions options) {
        return Flux.defer(() -> {
            if (this.currentBucketConfig == null) {
                return Mono.delay((Duration)Duration.ofMillis(100L), (Scheduler)this.core.context().environment().scheduler()).flatMapMany(ign -> this.rangeScan(rangeScan, options));
            }
            if (!(this.currentBucketConfig instanceof CouchbaseBucketConfig)) {
                return Flux.error((Throwable)new IllegalStateException("Only Couchbase buckets are supported with KV Range Scan"));
            }
            Map<Short, MutationToken> consistencyMap = options.consistencyMap();
            return this.streamForPartitions((partition, start) -> {
                byte[] actualStartTerm = start == null ? rangeScan.from().id().getBytes(StandardCharsets.UTF_8) : start;
                return RangeScanCreateRequest.forRangeScan(actualStartTerm, rangeScan, options, partition, this.core.context(), this.collectionIdentifier, consistencyMap);
            }, options);
        });
    }

    public Flux<CoreRangeScanItem> samplingScan(CoreSamplingScan samplingScan, CoreScanOptions options) {
        return Flux.defer(() -> {
            if (this.currentBucketConfig == null) {
                return Mono.delay((Duration)Duration.ofMillis(100L), (Scheduler)this.core.context().environment().scheduler()).flatMapMany(ign -> this.samplingScan(samplingScan, options));
            }
            if (!(this.currentBucketConfig instanceof CouchbaseBucketConfig)) {
                return Flux.error((Throwable)new IllegalStateException("Only Couchbase buckets are supported with KV Range Scan"));
            }
            Map<Short, MutationToken> consistencyMap = options.consistencyMap();
            return this.streamForPartitions((partition, ignored) -> RangeScanCreateRequest.forSamplingScan(samplingScan, options, partition, this.core.context(), this.collectionIdentifier, consistencyMap), options);
        }).take(samplingScan.limit());
    }

    private Flux<CoreRangeScanItem> streamForPartitions(BiFunction<Short, byte[], RangeScanCreateRequest> createSupplier, CoreScanOptions options) {
        if (!this.capabilityEnabled) {
            return Flux.error((Throwable)FeatureNotAvailableException.rangeScan());
        }
        AtomicLong itemsStreamed = new AtomicLong();
        short s = ((CouchbaseBucketConfig)this.currentBucketConfig).numberOfPartitions();
        ArrayList<Flux<CoreRangeScanItem>> partitionStreams = new ArrayList<Flux<CoreRangeScanItem>>(s);
        for (short i = 0; i < s; i = (short)(i + 1)) {
            partitionStreams.add(this.streamForPartition(i, createSupplier, options));
        }
        Flux stream = Flux.concat(partitionStreams);
        return stream.doOnNext(item -> itemsStreamed.incrementAndGet()).timeout(options.commonOptions().timeout().orElse(this.core.context().environment().timeoutConfig().kvScanTimeout()), (Publisher)Mono.defer(() -> Mono.error((Throwable)new UnambiguousTimeoutException("RangeScan timed out", new CancellationErrorContext(new RangeScanContext(itemsStreamed.get()))))));
    }

    private Flux<CoreRangeScanItem> streamForPartition(short partition, BiFunction<Short, byte[], RangeScanCreateRequest> createSupplier, CoreScanOptions options) {
        AtomicReference lastStreamed = new AtomicReference();
        AtomicBoolean needToCancel = new AtomicBoolean(false);
        return Flux.defer(() -> {
            RangeScanCreateRequest request = (RangeScanCreateRequest)createSupplier.apply(partition, (byte[])lastStreamed.get());
            this.core.send(request);
            Flux inner = Reactor.wrap(request, request.response(), true).flatMapMany(res -> {
                if (res.status().success()) {
                    if (needToCancel.get()) {
                        return this.cancel(res.rangeScanId(), partition, options).thenMany((Publisher)Flux.empty());
                    }
                    return this.continueScan(partition, res.rangeScanId(), options, needToCancel);
                }
                KeyValueErrorContext errorContext = KeyValueErrorContext.completedRequest(request, res);
                switch (res.status()) {
                    case NOT_FOUND: {
                        return Flux.empty();
                    }
                    case INTERNAL_SERVER_ERROR: {
                        return Flux.error((Throwable)new InternalServerFailureException(errorContext));
                    }
                    case VBUUID_NOT_EQUAL: {
                        return Flux.error((Throwable)new MutationTokenOutdatedException(errorContext));
                    }
                }
                return Flux.error((Throwable)new CouchbaseException(res.toString(), errorContext));
            });
            return Reactor.shieldFromCancellation(inner);
        }).doOnNext(item -> lastStreamed.set(item.keyBytes())).retryWhen(Retry.from(companion -> companion.map(rs -> {
            if (rs.failure() instanceof RangeScanPartitionFailedException && ((RangeScanPartitionFailedException)rs.failure()).status() == ResponseStatus.NOT_MY_VBUCKET) {
                return true;
            }
            throw Exceptions.propagate((Throwable)rs.failure());
        }))).doOnCancel(() -> needToCancel.set(true));
    }

    private Flux<CoreRangeScanItem> continueScan(short partition, CoreRangeScanId id, CoreScanOptions options, AtomicBoolean needToCancel) {
        AtomicBoolean complete = new AtomicBoolean(false);
        return Flux.defer(() -> {
            RangeScanContinueRequest request = new RangeScanContinueRequest(id, (Sinks.Many<CoreRangeScanItem>)Sinks.many().unicast().onBackpressureBuffer(), null, options, partition, this.core.context(), this.collectionIdentifier);
            this.core.send(request);
            return Reactor.wrap(request, request.response(), true).flatMapMany(res -> {
                if (res.status() == ResponseStatus.SUCCESS || res.status() == ResponseStatus.COMPLETE || res.status() == ResponseStatus.CONTINUE) {
                    if (needToCancel.get()) {
                        complete.set(true);
                        return this.cancel(id, partition, options).thenMany((Publisher)Flux.empty());
                    }
                    return res.items();
                }
                KeyValueErrorContext errorContext = KeyValueErrorContext.completedRequest(request, res);
                switch (res.status()) {
                    case NOT_FOUND: {
                        return Flux.error((Throwable)new CouchbaseException("The range scan internal partition UUID could not be found on the server", errorContext));
                    }
                    case INVALID_REQUEST: {
                        return Flux.error((Throwable)new InvalidArgumentException("The request failed the server-side input validation check.", null, errorContext));
                    }
                    case NO_ACCESS: {
                        return Flux.error((Throwable)new AuthenticationFailureException("The user is no longer authorized to perform this operation", errorContext, null));
                    }
                    case CANCELED: {
                        return Flux.error((Throwable)new RequestCanceledException("The range scan was cancelled.", CancellationReason.OTHER, new CancellationErrorContext(errorContext)));
                    }
                    case NOT_MY_VBUCKET: {
                        return Flux.error((Throwable)new RangeScanPartitionFailedException("Received \"Not My VBucket\" for the continue response", res.status()));
                    }
                    case UNKNOWN_COLLECTION: {
                        return Flux.error((Throwable)new CollectionNotFoundException(request.collectionIdentifier().collection().orElse("_default"), errorContext));
                    }
                    case SERVER_BUSY: {
                        return Flux.error((Throwable)new CouchbaseException("The range scan for this partition is already streaming on another connection - this is a SDK bug please report.", errorContext));
                    }
                }
                return Flux.error((Throwable)new CouchbaseException(res.toString(), errorContext));
            });
        }).map(item -> {
            if (item instanceof LastCoreRangeScanItem) {
                complete.set(true);
            }
            return item;
        }).repeat(() -> !complete.get()).filter(item -> !(item instanceof LastCoreRangeScanItem));
    }

    private Mono<Void> cancel(CoreRangeScanId scanId, short partition, CoreScanOptions options) {
        return Mono.defer(() -> {
            RangeScanCancelRequest cancelRequest = new RangeScanCancelRequest(scanId, options, partition, this.core.context(), this.collectionIdentifier);
            this.core.send(cancelRequest);
            return Reactor.wrap(cancelRequest, cancelRequest.response(), true);
        }).onErrorResume(ignore -> Mono.empty()).then();
    }
}

