/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.service.kv;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.CoreKeyspace;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.api.kv.CoreKvResponseMetadata;
import com.couchbase.client.core.api.kv.CoreReadPreference;
import com.couchbase.client.core.api.kv.CoreSubdocGetCommand;
import com.couchbase.client.core.api.kv.CoreSubdocGetResult;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.cnc.events.request.IndividualReplicaGetFailedEvent;
import com.couchbase.client.core.config.BucketCapabilities;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.error.CommonExceptions;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.DefaultErrorUtil;
import com.couchbase.client.core.error.DocumentUnretrievableException;
import com.couchbase.client.core.error.FeatureNotAvailableException;
import com.couchbase.client.core.error.context.AggregateErrorContext;
import com.couchbase.client.core.error.context.ReducedKeyValueErrorContext;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.msg.kv.GetRequest;
import com.couchbase.client.core.msg.kv.GetResponse;
import com.couchbase.client.core.msg.kv.ReplicaGetRequest;
import com.couchbase.client.core.msg.kv.ReplicaSubdocGetRequest;
import com.couchbase.client.core.msg.kv.SubdocGetRequest;
import com.couchbase.client.core.msg.kv.SubdocGetResponse;
import com.couchbase.client.core.retry.RetryStrategy;
import com.couchbase.client.core.service.kv.NodeIndexCalculator;
import com.couchbase.client.core.util.Validators;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Stability.Internal
public class ReplicaHelper {
    private ReplicaHelper() {
        throw new AssertionError((Object)"not instantiable");
    }

    public static Flux<GetReplicaResponse> getAllReplicasReactive(Core core, CollectionIdentifier collectionIdentifier, String documentId, Duration timeout, RetryStrategy retryStrategy, Map<String, Object> clientContext, RequestSpan parentSpan, CoreReadPreference readPreference) {
        Validators.notNullOrEmpty(documentId, "Id", () -> ReducedKeyValueErrorContext.create(documentId, collectionIdentifier));
        RequestSpan getAllSpan = core.context().coreResources().requestTracer().requestSpan("get_all_replicas", parentSpan);
        return Reactor.toMono(() -> ReplicaHelper.getAllReplicasRequests(core, collectionIdentifier, documentId, clientContext, retryStrategy, timeout, getAllSpan, readPreference)).flux().flatMap(Flux::fromStream).flatMap(request -> Reactor.wrap(request, ReplicaHelper.get(core, request), true).onErrorResume(t -> {
            core.environment().eventBus().publish(new IndividualReplicaGetFailedEvent(request.context()));
            return Mono.empty();
        }).map(response -> new GetReplicaResponse((GetResponse)response, request instanceof ReplicaGetRequest))).doFinally(signalType -> getAllSpan.end());
    }

    public static Flux<CoreSubdocGetResult> lookupInAllReplicasReactive(Core core, CollectionIdentifier collectionIdentifier, String documentId, List<CoreSubdocGetCommand> commands, Duration timeout, RetryStrategy retryStrategy, Map<String, Object> clientContext, RequestSpan parentSpan, CoreReadPreference readPreference) {
        Validators.notNullOrEmpty(documentId, "Id", () -> ReducedKeyValueErrorContext.create(documentId, collectionIdentifier));
        CoreEnvironment env = core.context().environment();
        RequestSpan getAllSpan = core.context().coreResources().requestTracer().requestSpan("lookup_in_all_replicas", parentSpan);
        return Reactor.toMono(() -> ReplicaHelper.lookupInAllReplicasRequests(core, collectionIdentifier, documentId, commands, clientContext, retryStrategy, timeout, getAllSpan, readPreference)).flux().flatMap(Flux::fromStream).flatMap(request -> Reactor.wrap(request, ReplicaHelper.get(core, request), true).onErrorResume(t -> {
            env.eventBus().publish(new IndividualReplicaGetFailedEvent(request.context()));
            return Mono.empty();
        }).map(response -> new CoreSubdocGetResult(CoreKeyspace.from(collectionIdentifier), documentId, CoreKvResponseMetadata.from(response.flexibleExtras()), Arrays.asList(response.values()), response.cas(), response.isDeleted(), request instanceof ReplicaSubdocGetRequest))).doFinally(signalType -> getAllSpan.end());
    }

    public static <R> CompletableFuture<List<CompletableFuture<R>>> getAllReplicasAsync(Core core, CollectionIdentifier collectionIdentifier, String documentId, Duration timeout, RetryStrategy retryStrategy, Map<String, Object> clientContext, RequestSpan parentSpan, CoreReadPreference readPreference, Function<GetReplicaResponse, R> responseMapper) {
        RequestSpan getAllSpan = core.context().coreResources().requestTracer().requestSpan("lookup_in_all_replicas", parentSpan);
        return ((CompletableFuture)ReplicaHelper.getAllReplicasRequests(core, collectionIdentifier, documentId, clientContext, retryStrategy, timeout, getAllSpan, readPreference).thenApply(stream -> stream.map(request -> ((CompletableFuture)ReplicaHelper.get(core, request).thenApply(response -> new GetReplicaResponse((GetResponse)response, request instanceof ReplicaGetRequest))).thenApply(responseMapper)).collect(Collectors.toList()))).whenComplete((completableFutures, throwable) -> {
            AtomicInteger toComplete = new AtomicInteger(completableFutures.size());
            for (CompletableFuture cf : completableFutures) {
                cf.whenComplete((a, b) -> {
                    if (toComplete.decrementAndGet() == 0) {
                        getAllSpan.end();
                    }
                });
            }
        });
    }

    public static <R> CompletableFuture<List<CompletableFuture<R>>> lookupInAllReplicasAsync(Core core, CollectionIdentifier collectionIdentifier, String documentId, List<CoreSubdocGetCommand> commands, Duration timeout, RetryStrategy retryStrategy, Map<String, Object> clientContext, RequestSpan parentSpan, CoreReadPreference readPreference, Function<CoreSubdocGetResult, R> responseMapper) {
        RequestSpan getAllSpan = core.context().coreResources().requestTracer().requestSpan("get_all_replicas", parentSpan);
        return ((CompletableFuture)ReplicaHelper.lookupInAllReplicasRequests(core, collectionIdentifier, documentId, commands, clientContext, retryStrategy, timeout, getAllSpan, readPreference).thenApply(stream -> stream.map(request -> ((CompletableFuture)ReplicaHelper.get(core, request).thenApply(response -> new CoreSubdocGetResult(CoreKeyspace.from(collectionIdentifier), documentId, CoreKvResponseMetadata.from(response.flexibleExtras()), Arrays.asList(response.values()), response.cas(), response.isDeleted(), request instanceof ReplicaSubdocGetRequest))).thenApply(responseMapper)).collect(Collectors.toList()))).whenComplete((completableFutures, throwable) -> {
            AtomicInteger toComplete = new AtomicInteger(completableFutures.size());
            for (CompletableFuture cf : completableFutures) {
                cf.whenComplete((a, b) -> {
                    if (toComplete.decrementAndGet() == 0) {
                        getAllSpan.end();
                    }
                });
            }
        });
    }

    public static <R> CompletableFuture<R> getAnyReplicaAsync(Core core, CollectionIdentifier collectionIdentifier, String documentId, Duration timeout, RetryStrategy retryStrategy, Map<String, Object> clientContext, RequestSpan parentSpan, CoreReadPreference readPreference, Function<GetReplicaResponse, R> responseMapper) {
        RequestSpan getAnySpan = core.context().coreResources().requestTracer().requestSpan("get_any_replica", parentSpan);
        CompletableFuture<List<CompletableFuture<R>>> listOfFutures = ReplicaHelper.getAllReplicasAsync(core, collectionIdentifier, documentId, timeout, retryStrategy, clientContext, getAnySpan, readPreference, responseMapper);
        CompletableFuture<R> anyReplicaFuture = ReplicaHelper.aggregate(listOfFutures, responseMapper);
        return anyReplicaFuture.whenComplete((getReplicaResult, throwable) -> getAnySpan.end());
    }

    public static <R> CompletableFuture<R> lookupInAnyReplicaAsync(Core core, CollectionIdentifier collectionIdentifier, String documentId, List<CoreSubdocGetCommand> commands, Duration timeout, RetryStrategy retryStrategy, Map<String, Object> clientContext, RequestSpan parentSpan, CoreReadPreference readPreference, Function<CoreSubdocGetResult, R> responseMapper) {
        RequestSpan getAnySpan = core.context().coreResources().requestTracer().requestSpan("lookup_in_any_replica", parentSpan);
        CompletableFuture<List<CompletableFuture<R>>> listOfFutures = ReplicaHelper.lookupInAllReplicasAsync(core, collectionIdentifier, documentId, commands, timeout, retryStrategy, clientContext, getAnySpan, readPreference, responseMapper);
        CompletableFuture<R> anyReplicaFuture = ReplicaHelper.aggregate(listOfFutures, responseMapper);
        return anyReplicaFuture.whenComplete((lookupInReplicaResult, throwable) -> getAnySpan.end());
    }

    private static <R> CompletableFuture<R> aggregate(CompletableFuture<List<CompletableFuture<R>>> listOfFutures, Function<?, R> responseMapper) {
        CompletableFuture anyReplicaFuture = new CompletableFuture();
        listOfFutures.whenComplete((futures, throwable) -> {
            if (throwable != null) {
                anyReplicaFuture.completeExceptionally((Throwable)throwable);
            }
            AtomicBoolean successCompleted = new AtomicBoolean(false);
            AtomicInteger totalCompleted = new AtomicInteger(0);
            List nestedContexts = Collections.synchronizedList(new ArrayList());
            futures.forEach(individual -> individual.whenComplete((result, error) -> {
                try {
                    int completed = totalCompleted.incrementAndGet();
                    if (error != null && error instanceof CompletionException && error.getCause() instanceof CouchbaseException) {
                        nestedContexts.add(((CouchbaseException)error.getCause()).context());
                    }
                    if (result != null && successCompleted.compareAndSet(false, true)) {
                        anyReplicaFuture.complete(result);
                    }
                    if (!successCompleted.get() && completed == futures.size()) {
                        anyReplicaFuture.completeExceptionally(new DocumentUnretrievableException(new AggregateErrorContext(nestedContexts)));
                    }
                }
                catch (Throwable t) {
                    anyReplicaFuture.completeExceptionally(new RuntimeException(t.getClass().toString()));
                }
            }));
        });
        return anyReplicaFuture;
    }

    public static CompletableFuture<Stream<GetRequest>> getAllReplicasRequests(Core core, CollectionIdentifier collectionIdentifier, String documentId, Map<String, Object> clientContext, RetryStrategy retryStrategy, Duration timeout, RequestSpan parent, CoreReadPreference readPreference) {
        Validators.notNullOrEmpty(documentId, "Id");
        CoreContext coreContext = core.context();
        CoreEnvironment environment = coreContext.environment();
        BucketConfig config = core.clusterConfig().bucketConfig(collectionIdentifier.bucket());
        if (config instanceof CouchbaseBucketConfig) {
            CouchbaseBucketConfig topology = (CouchbaseBucketConfig)config;
            short s = topology.numberOfReplicas();
            ArrayList<GetRequest> requests = new ArrayList<GetRequest>(s + 1);
            NodeIndexCalculator allowedNodeIndexes = new NodeIndexCalculator(readPreference, topology, coreContext);
            if (allowedNodeIndexes.canUseNodeForActive(documentId)) {
                RequestSpan span = coreContext.coreResources().requestTracer().requestSpan("get", parent);
                GetRequest activeRequest = new GetRequest(documentId, timeout, coreContext, collectionIdentifier, retryStrategy, span);
                activeRequest.context().clientContext(clientContext);
                requests.add(activeRequest);
            }
            for (short replica = 1; replica <= s; replica = (short)((short)(replica + 1))) {
                if (!allowedNodeIndexes.canUseNodeForReplica(documentId, replica - 1)) continue;
                RequestSpan replicaSpan = coreContext.coreResources().requestTracer().requestSpan("get_replica", parent);
                ReplicaGetRequest replicaRequest = new ReplicaGetRequest(documentId, timeout, coreContext, collectionIdentifier, retryStrategy, replica, replicaSpan);
                replicaRequest.context().clientContext(clientContext);
                requests.add(replicaRequest);
            }
            if (requests.isEmpty()) {
                CompletableFuture<Stream<GetRequest>> future = new CompletableFuture<Stream<GetRequest>>();
                future.completeExceptionally(DocumentUnretrievableException.noReplicasSuitable());
                return future;
            }
            return CompletableFuture.completedFuture(requests.stream());
        }
        if (config == null) {
            Duration retryDelay = Duration.ofMillis(100L);
            CompletableFuture<Stream<GetRequest>> completableFuture = new CompletableFuture<Stream<GetRequest>>();
            coreContext.environment().timer().schedule(() -> ReplicaHelper.getAllReplicasRequests(core, collectionIdentifier, documentId, clientContext, retryStrategy, timeout.minus(retryDelay), parent, readPreference).whenComplete((getRequestStream, throwable) -> {
                if (throwable != null) {
                    future.completeExceptionally((Throwable)throwable);
                } else {
                    future.complete((Stream<GetRequest>)getRequestStream);
                }
            }), retryDelay);
            return completableFuture;
        }
        CompletableFuture<Stream<GetRequest>> future = new CompletableFuture<Stream<GetRequest>>();
        future.completeExceptionally(CommonExceptions.getFromReplicaNotCouchbaseBucket());
        return future;
    }

    public static CompletableFuture<Stream<SubdocGetRequest>> lookupInAllReplicasRequests(Core core, CollectionIdentifier collectionIdentifier, String documentId, List<CoreSubdocGetCommand> commands, Map<String, Object> clientContext, RetryStrategy retryStrategy, Duration timeout, RequestSpan parent, CoreReadPreference readPreference) {
        Validators.notNullOrEmpty(documentId, "Id");
        CoreContext coreContext = core.context();
        CoreEnvironment environment = coreContext.environment();
        BucketConfig config = core.clusterConfig().bucketConfig(collectionIdentifier.bucket());
        if (config instanceof CouchbaseBucketConfig) {
            CouchbaseBucketConfig topology = (CouchbaseBucketConfig)config;
            if (!config.bucketCapabilities().contains((Object)BucketCapabilities.SUBDOC_READ_REPLICA)) {
                return ReplicaHelper.failedFuture(FeatureNotAvailableException.subdocReadReplica());
            }
            short s = topology.numberOfReplicas();
            ArrayList<SubdocGetRequest> requests = new ArrayList<SubdocGetRequest>(s + 1);
            NodeIndexCalculator allowedNodeIndexes = new NodeIndexCalculator(readPreference, topology, coreContext);
            if (allowedNodeIndexes.canUseNodeForActive(documentId)) {
                RequestSpan span = coreContext.coreResources().requestTracer().requestSpan("lookup_in", parent);
                SubdocGetRequest activeRequest = SubdocGetRequest.create(timeout, coreContext, collectionIdentifier, retryStrategy, documentId, (byte)0, commands, span);
                activeRequest.context().clientContext(clientContext);
                requests.add(activeRequest);
            }
            for (short replica = 1; replica <= s; replica = (short)((short)(replica + 1))) {
                if (!allowedNodeIndexes.canUseNodeForReplica(documentId, replica - 1)) continue;
                RequestSpan replicaSpan = coreContext.coreResources().requestTracer().requestSpan("lookup_in_all_replicas", parent);
                ReplicaSubdocGetRequest replicaRequest = ReplicaSubdocGetRequest.create(timeout, coreContext, collectionIdentifier, retryStrategy, documentId, (byte)0, commands, replica, replicaSpan);
                replicaRequest.context().clientContext(clientContext);
                requests.add(replicaRequest);
            }
            if (requests.isEmpty()) {
                CompletableFuture<Stream<SubdocGetRequest>> future = new CompletableFuture<Stream<SubdocGetRequest>>();
                future.completeExceptionally(DocumentUnretrievableException.noReplicasSuitable());
                return future;
            }
            return CompletableFuture.completedFuture(requests.stream());
        }
        if (config == null) {
            Duration retryDelay = Duration.ofMillis(100L);
            CompletableFuture<Stream<SubdocGetRequest>> completableFuture = new CompletableFuture<Stream<SubdocGetRequest>>();
            coreContext.environment().timer().schedule(() -> ReplicaHelper.lookupInAllReplicasRequests(core, collectionIdentifier, documentId, commands, clientContext, retryStrategy, timeout.minus(retryDelay), parent, readPreference).whenComplete((getRequestStream, throwable) -> {
                if (throwable != null) {
                    future.completeExceptionally((Throwable)throwable);
                } else {
                    future.complete((Stream<SubdocGetRequest>)getRequestStream);
                }
            }), retryDelay);
            return completableFuture;
        }
        return ReplicaHelper.failedFuture(CommonExceptions.getFromReplicaNotCouchbaseBucket());
    }

    private static <T> CompletableFuture<T> failedFuture(Throwable t) {
        CompletableFuture future = new CompletableFuture();
        future.completeExceptionally(t);
        return future;
    }

    private static CompletableFuture<GetResponse> get(Core core, GetRequest request) {
        core.send(request);
        return ((CompletableFuture)request.response().thenApply(response -> {
            if (!response.status().success()) {
                throw DefaultErrorUtil.keyValueStatusToException(request, response);
            }
            return response;
        })).whenComplete((t, e) -> request.context().logicallyComplete((Throwable)e));
    }

    private static CompletableFuture<SubdocGetResponse> get(Core core, SubdocGetRequest request) {
        core.send(request);
        return ((CompletableFuture)request.response().thenApply(response -> {
            if (!response.status().success()) {
                throw DefaultErrorUtil.keyValueStatusToException(request, response);
            }
            return response;
        })).whenComplete((t, e) -> request.context().logicallyComplete((Throwable)e));
    }

    @Deprecated
    public static class GetReplicaResponse {
        private final GetResponse response;
        private final boolean fromReplica;

        public GetReplicaResponse(GetResponse response, boolean fromReplica) {
            this.response = Objects.requireNonNull(response);
            this.fromReplica = fromReplica;
        }

        public boolean isFromReplica() {
            return this.fromReplica;
        }

        public GetResponse getResponse() {
            return this.response;
        }
    }
}

