/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.java.batch;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.Context;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.msg.Request;
import com.couchbase.client.core.msg.kv.MultiObserveViaCasRequest;
import com.couchbase.client.core.msg.kv.ObserveViaCasResponse;
import com.couchbase.client.core.node.KeyValueLocator;
import com.couchbase.client.core.topology.NodeIdentifier;
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.batch.BatchErrorContext;
import com.couchbase.client.java.batch.BatchHelperFailureException;
import com.couchbase.client.java.cnc.evnts.BatchHelperExistsCompletedEvent;
import com.couchbase.client.java.kv.GetResult;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

@Stability.Uncommitted
public class ReactiveBatchHelper {
    private static final Predicate<ObserveViaCasResponse.ObserveStatus> PMGET_PREDICATE = s -> s == ObserveViaCasResponse.ObserveStatus.FOUND_PERSISTED || s == ObserveViaCasResponse.ObserveStatus.FOUND_NOT_PERSISTED;

    @Stability.Uncommitted
    public static Mono<Map<String, GetResult>> getIfExists(Collection collection, java.util.Collection<String> ids) {
        return Mono.defer(() -> ReactiveBatchHelper.existsBytes(collection, ids).flatMap(e -> {
            String key = new String((byte[])e, StandardCharsets.UTF_8);
            return collection.reactive().get(key).map(r -> Tuples.of((Object)key, (Object)r));
        }).collectMap(Tuple2::getT1, Tuple2::getT2));
    }

    @Stability.Uncommitted
    public static Flux<String> exists(Collection collection, java.util.Collection<String> ids) {
        return Flux.defer(() -> ReactiveBatchHelper.existsBytes(collection, ids).map(i -> new String((byte[])i, StandardCharsets.UTF_8)));
    }

    private static Flux<byte[]> existsBytes(Collection collection, java.util.Collection<String> ids) {
        Core core = collection.core();
        CoreEnvironment env = core.context().environment();
        BucketConfig config = core.clusterConfig().bucketConfig(collection.bucketName());
        if (core.configurationProvider().bucketConfigLoadInProgress() || config == null) {
            return Mono.delay((Duration)Duration.ofMillis(100L), (Scheduler)env.scheduler()).flatMapMany(ign -> ReactiveBatchHelper.existsBytes(collection, ids));
        }
        long start = System.nanoTime();
        if (!(config instanceof CouchbaseBucketConfig)) {
            throw new IllegalStateException("Only couchbase (and ephemeral) buckets are supported at this point!");
        }
        HashMap nodeEntries = new HashMap(config.nodes().size());
        for (NodeInfo node : config.nodes()) {
            nodeEntries.put(node.id(), new HashMap(ids.size() / config.nodes().size()));
        }
        CouchbaseBucketConfig cbc = (CouchbaseBucketConfig)config;
        CollectionIdentifier ci = new CollectionIdentifier(collection.bucketName(), Optional.of(collection.scopeName()), Optional.of(collection.name()));
        for (String id : ids) {
            byte[] encodedId = id.getBytes(StandardCharsets.UTF_8);
            int partitionId = KeyValueLocator.partitionForKey((byte[])encodedId, (int)cbc.numberOfPartitions());
            short nodeId = cbc.nodeIndexForActive(partitionId, false);
            NodeInfo nodeInfo = cbc.nodeAtIndex((int)nodeId);
            ((Map)nodeEntries.get(nodeInfo.id())).put(encodedId, (short)partitionId);
        }
        ArrayList<Mono> responses = new ArrayList<Mono>(nodeEntries.size());
        ArrayList<MultiObserveViaCasRequest> requests = new ArrayList<MultiObserveViaCasRequest>(nodeEntries.size());
        for (Map.Entry node : nodeEntries.entrySet()) {
            if (((Map)node.getValue()).isEmpty()) continue;
            MultiObserveViaCasRequest request = new MultiObserveViaCasRequest(env.timeoutConfig().kvTimeout(), core.context(), env.retryStrategy(), ci, (NodeIdentifier)node.getKey(), (Map)node.getValue(), PMGET_PREDICATE);
            core.send((Request)request);
            requests.add(request);
            responses.add(Reactor.wrap((Request)request, (CompletableFuture)request.response(), (boolean)true));
        }
        return env.publishOnUserScheduler(Flux.merge(responses).flatMap(response -> Flux.fromIterable(response.observed().keySet())).onErrorMap(throwable -> {
            BatchErrorContext ctx = new BatchErrorContext(Collections.unmodifiableList(requests));
            return new BatchHelperFailureException("Failed to perform BatchHelper bulk operation", (Throwable)throwable, ctx);
        }).doOnComplete(() -> core.context().environment().eventBus().publish((Event)new BatchHelperExistsCompletedEvent(Duration.ofNanos(System.nanoTime() - start), (Context)new BatchErrorContext(Collections.unmodifiableList(requests))))));
    }
}

