/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.grpc.proxy;

import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.Empty;
import com.google.protobuf.Int32Value;
import com.oracle.coherence.cdi.Scope;
import com.oracle.coherence.cdi.SerializerProducer;
import com.oracle.coherence.grpc.AddIndexRequest;
import com.oracle.coherence.grpc.AggregateRequest;
import com.oracle.coherence.grpc.BinaryHelper;
import com.oracle.coherence.grpc.ClearRequest;
import com.oracle.coherence.grpc.ContainsEntryRequest;
import com.oracle.coherence.grpc.ContainsKeyRequest;
import com.oracle.coherence.grpc.ContainsValueRequest;
import com.oracle.coherence.grpc.DestroyRequest;
import com.oracle.coherence.grpc.Entry;
import com.oracle.coherence.grpc.EntryResult;
import com.oracle.coherence.grpc.EntrySetRequest;
import com.oracle.coherence.grpc.GetAllRequest;
import com.oracle.coherence.grpc.GetRequest;
import com.oracle.coherence.grpc.InvokeAllRequest;
import com.oracle.coherence.grpc.InvokeRequest;
import com.oracle.coherence.grpc.IsEmptyRequest;
import com.oracle.coherence.grpc.KeySetRequest;
import com.oracle.coherence.grpc.MapListenerRequest;
import com.oracle.coherence.grpc.MapListenerResponse;
import com.oracle.coherence.grpc.OptionalValue;
import com.oracle.coherence.grpc.PageRequest;
import com.oracle.coherence.grpc.PutAllRequest;
import com.oracle.coherence.grpc.PutIfAbsentRequest;
import com.oracle.coherence.grpc.PutRequest;
import com.oracle.coherence.grpc.RemoveIndexRequest;
import com.oracle.coherence.grpc.RemoveMappingRequest;
import com.oracle.coherence.grpc.RemoveRequest;
import com.oracle.coherence.grpc.ReplaceMappingRequest;
import com.oracle.coherence.grpc.ReplaceRequest;
import com.oracle.coherence.grpc.SizeRequest;
import com.oracle.coherence.grpc.TruncateRequest;
import com.oracle.coherence.grpc.ValuesRequest;
import com.oracle.coherence.grpc.proxy.CacheRequestHolder;
import com.oracle.coherence.grpc.proxy.DaemonPoolExecutor;
import com.oracle.coherence.grpc.proxy.ErrorsHelper;
import com.oracle.coherence.grpc.proxy.MapListenerProxy;
import com.oracle.coherence.grpc.proxy.NamedCacheClient;
import com.oracle.coherence.grpc.proxy.PagedQueryHelper;
import com.tangosol.internal.util.collection.ConvertingNamedCache;
import com.tangosol.internal.util.processor.BinaryProcessors;
import com.tangosol.io.DefaultSerializer;
import com.tangosol.io.Serializer;
import com.tangosol.io.pof.ConfigurablePofContext;
import com.tangosol.net.AsyncNamedCache;
import com.tangosol.net.AsyncNamedMap;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.CacheService;
import com.tangosol.net.Cluster;
import com.tangosol.net.ConfigurableCacheFactory;
import com.tangosol.net.DistributedCacheService;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PartitionedService;
import com.tangosol.net.cache.NearCache;
import com.tangosol.util.Aggregators;
import com.tangosol.util.Base;
import com.tangosol.util.Binary;
import com.tangosol.util.ExternalizableHelper;
import com.tangosol.util.Filter;
import com.tangosol.util.Filters;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.NullImplementation;
import com.tangosol.util.ValueExtractor;
import com.tangosol.util.extractor.IdentityExtractor;
import com.tangosol.util.filter.AlwaysFilter;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.helidon.config.Config;
import io.helidon.grpc.core.ResponseHelper;
import io.helidon.microprofile.grpc.core.Bidirectional;
import io.helidon.microprofile.grpc.core.Grpc;
import io.helidon.microprofile.grpc.core.ServerStreaming;
import io.helidon.microprofile.grpc.core.Unary;
import java.lang.annotation.Annotation;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.spi.BeanManager;
import javax.inject.Inject;
import org.eclipse.microprofile.metrics.annotation.Metered;

@Grpc(name="coherence.NamedCacheService")
@ApplicationScoped
public class NamedCacheService
implements NamedCacheClient {
    protected static final String CONFIG_PREFIX = "coherence.named_cache_service";
    protected static final String CONFIG_USE_DAEMON_POOL = "use_daemon_pool";
    protected static final String CONFIG_TRANSFER_THRESHOLD = "transfer_threshold";
    protected static final Void VOID = null;
    public static final String SERVICE_NAME = "coherence.NamedCacheService";
    protected final Function<String, ConfigurableCacheFactory> cacheFactorySupplier;
    protected final SerializerProducer serializerProducer;
    protected final Executor executor;
    protected long transferThreshold = 524288L;

    @Inject
    public NamedCacheService(Cluster cluster, BeanManager beanManager, SerializerProducer serializer, Config config) {
        this(cluster, new CdiCacheFactorySupplier(beanManager), serializer, config);
    }

    public NamedCacheService(Cluster cluster, Function<String, ConfigurableCacheFactory> fn, SerializerProducer serializer, Config config) {
        this.serializerProducer = serializer;
        this.cacheFactorySupplier = fn;
        Config serviceConfig = config.get(CONFIG_PREFIX);
        serviceConfig.get(CONFIG_TRANSFER_THRESHOLD).asLong().ifPresent(this::setTransferThreshold);
        if (((Boolean)serviceConfig.get(CONFIG_USE_DAEMON_POOL).asBoolean().orElse((Object)true)).booleanValue()) {
            DaemonPoolExecutor pool = DaemonPoolExecutor.builder(serviceConfig).name(SERVICE_NAME).registry(() -> ((Cluster)cluster).getManagement()).build();
            pool.start();
            this.executor = pool;
        } else {
            this.executor = ForkJoinPool.commonPool();
        }
    }

    public static NamedCacheService create() {
        return NamedCacheService.builder().build();
    }

    public static Builder builder() {
        return NamedCacheService.builder(Config.empty());
    }

    public static Builder builder(Config config) {
        return new Builder(config);
    }

    long getTransferThreshold() {
        return this.transferThreshold;
    }

    void setTransferThreshold(long lSize) {
        this.transferThreshold = lSize;
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<Empty> addIndex(AddIndexRequest request) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(this::addIndex, this.executor);
    }

    protected Empty addIndex(CacheRequestHolder<AddIndexRequest, Void> holder) {
        AddIndexRequest request = holder.getRequest();
        NamedCache<Binary, Binary> cache = holder.getCache();
        Serializer serializer = holder.getSerializer();
        ValueExtractor<?, ?> extractor = this.ensureValueExtractor(request.getExtractor(), serializer);
        Comparator comparator = (Comparator)BinaryHelper.fromByteString((ByteString)request.getComparator(), (Serializer)serializer);
        cache.addIndex(extractor, request.getSorted(), comparator);
        return BinaryHelper.EMPTY;
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<BytesValue> aggregate(AggregateRequest request) {
        ByteString processorBytes = request.getAggregator();
        if (processorBytes.isEmpty()) {
            CompletableFuture<BytesValue> future = new CompletableFuture<BytesValue>();
            future.completeExceptionally((Throwable)Status.INVALID_ARGUMENT.withDescription("the request does not contain a serialized entry aggregator").asRuntimeException());
            return future;
        }
        try {
            if (request.getKeysCount() != 0) {
                return this.aggregateWithKeys(request);
            }
            return this.aggregateWithFilter(request);
        }
        catch (Throwable t) {
            throw ErrorsHelper.ensureStatusRuntimeException(t);
        }
    }

    protected CompletionStage<BytesValue> aggregateWithFilter(AggregateRequest request) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::aggregateWithFilter, this.executor).handleAsync(this::handleError, this.executor);
    }

    protected CompletionStage<BytesValue> aggregateWithFilter(CacheRequestHolder<AggregateRequest, Void> holder) {
        AggregateRequest request = holder.getRequest();
        ByteString filterBytes = request.getFilter();
        Filter filter = filterBytes.isEmpty() ? Filters.always() : (Filter)BinaryHelper.fromByteString((ByteString)filterBytes, (Serializer)holder.getSerializer());
        ByteString processorBytes = request.getAggregator();
        InvocableMap.EntryAggregator aggregator = (InvocableMap.EntryAggregator)BinaryHelper.fromByteString((ByteString)processorBytes, (Serializer)holder.getSerializer());
        return holder.runAsync(holder.getAsyncCache().aggregate(filter, aggregator)).thenApplyAsync(h -> BinaryHelper.toBytesValue(h.getResult(), (Serializer)h.getSerializer()), this.executor);
    }

    protected CompletionStage<BytesValue> aggregateWithKeys(AggregateRequest request) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::aggregateWithKeys, this.executor).handleAsync(this::handleError, this.executor);
    }

    protected CompletionStage<BytesValue> aggregateWithKeys(CacheRequestHolder<AggregateRequest, Void> holder) {
        AggregateRequest request = holder.getRequest();
        List keys = request.getKeysList().stream().map(holder::convertKeyDown).collect(Collectors.toList());
        InvocableMap.EntryAggregator aggregator = (InvocableMap.EntryAggregator)BinaryHelper.fromByteString((ByteString)request.getAggregator(), (Serializer)holder.getSerializer());
        return holder.runAsync(holder.getAsyncCache().aggregate(keys, aggregator)).thenApplyAsync(h -> BinaryHelper.toBytesValue(h.getResult(), (Serializer)h.getSerializer()), this.executor);
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<Empty> clear(ClearRequest request) {
        return this.getAsyncCache(request.getScope(), request.getCache()).thenComposeAsync(cache -> cache.invokeAll((Filter)AlwaysFilter.INSTANCE(), (InvocableMap.EntryProcessor)BinaryProcessors.BinarySyntheticRemoveBlindProcessor.INSTANCE), this.executor).thenApplyAsync(this::empty, this.executor);
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<BoolValue> containsEntry(ContainsEntryRequest request) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::containsEntry, this.executor).thenApplyAsync(h -> this.toBoolValue((Binary)h.getResult(), h.getCacheSerializer()), this.executor);
    }

    protected CompletionStage<CacheRequestHolder<ContainsEntryRequest, Binary>> containsEntry(CacheRequestHolder<ContainsEntryRequest, Void> holder) {
        ContainsEntryRequest request = holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        Binary value = holder.convertDown(request.getValue());
        InvocableMap.EntryProcessor<Binary, Binary, Binary> processor = this.castProcessor((InvocableMap.EntryProcessor<Binary, Binary, ?>)new BinaryProcessors.BinaryContainsValueProcessor(value));
        return holder.runAsync(holder.getAsyncCache().invoke((Object)key, processor));
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<BoolValue> containsKey(ContainsKeyRequest request) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::containsKey, this.executor).thenApplyAsync(h -> BoolValue.of((boolean)((Boolean)h.getDeserializedResult())), this.executor);
    }

    protected CompletionStage<CacheRequestHolder<ContainsKeyRequest, Boolean>> containsKey(CacheRequestHolder<ContainsKeyRequest, Void> holder) {
        ContainsKeyRequest request = holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        return holder.runAsync(holder.getAsyncCache().containsKey((Object)key));
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<BoolValue> containsValue(ContainsValueRequest request) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::containsValue, this.executor).thenApplyAsync(h -> BoolValue.of(((Integer)h.getResult() > 0 ? 1 : 0) != 0), this.executor);
    }

    protected CompletionStage<CacheRequestHolder<ContainsValueRequest, Integer>> containsValue(CacheRequestHolder<ContainsValueRequest, Void> holder) {
        ContainsValueRequest request = holder.getRequest();
        Object value = BinaryHelper.fromByteString((ByteString)request.getValue(), (Serializer)holder.getSerializer());
        Filter filter = Filters.equal((ValueExtractor)IdentityExtractor.INSTANCE(), (Object)value);
        return holder.runAsync(holder.getAsyncCache().aggregate(filter, (InvocableMap.EntryAggregator)Aggregators.count()));
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<Empty> destroy(DestroyRequest request) {
        return this.getAsyncCache(request.getScope(), request.getCache()).thenApplyAsync(cache -> this.execute(() -> cache.getNamedCache().destroy()), this.executor);
    }

    @Override
    @ServerStreaming
    @Metered
    public void entrySet(EntrySetRequest request, StreamObserver<Entry> observer) {
        this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(h -> this.entrySet((CacheRequestHolder<EntrySetRequest, Void>)h, observer), this.executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer), this.executor);
    }

    protected Void entrySet(CacheRequestHolder<EntrySetRequest, Void> holder, StreamObserver<Entry> observer) {
        try {
            EntrySetRequest request = holder.getRequest();
            Serializer serializer = holder.getSerializer();
            Filter filter = this.ensureFilter(request.getFilter(), serializer);
            Comparator comparator = this.deserializeComparator(request.getComparator(), serializer);
            holder.runAsync(holder.getAsyncCache().entrySet(filter, comparator)).handleAsync((h, err) -> this.handleSetOfEntries((CacheRequestHolder<?, Set<Map.Entry<Binary, Binary>>>)h, (Throwable)err, observer, false), this.executor);
        }
        catch (Throwable t) {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException(t));
        }
        return VOID;
    }

    @Override
    @Bidirectional
    @Metered
    public StreamObserver<MapListenerRequest> events(StreamObserver<MapListenerResponse> observer) {
        return new MapListenerProxy(this, observer);
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<OptionalValue> get(GetRequest request) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::get, this.executor).thenApplyAsync(h -> h.toOptionalValue((Binary)h.getDeserializedResult()), this.executor);
    }

    protected CompletionStage<CacheRequestHolder<GetRequest, Binary>> get(CacheRequestHolder<GetRequest, Void> holder) {
        Binary key = holder.convertKeyDown(holder.getRequest().getKey());
        InvocableMap.EntryProcessor processor = BinaryProcessors.get();
        return holder.runAsync(holder.getAsyncCache().invoke((Object)key, processor));
    }

    @Override
    @ServerStreaming
    @Metered
    public void getAll(GetAllRequest request, StreamObserver<Entry> observer) {
        if (request.getKeyList().isEmpty()) {
            observer.onCompleted();
        } else {
            this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(h -> this.getAll((CacheRequestHolder<GetAllRequest, Void>)h, observer), this.executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer), this.executor);
        }
    }

    protected Void getAll(CacheRequestHolder<GetAllRequest, Void> holder, StreamObserver<Entry> observer) {
        holder.runAsync(this.convertKeys(holder)).thenComposeAsync(h -> h.runAsync(h.getAsyncCache().invokeAll((Collection)h.getResult(), BinaryProcessors.get())), this.executor).handleAsync((h, err) -> this.handleMapOfEntries((CacheRequestHolder<?, Map<Binary, Binary>>)h, (Throwable)err, observer, true), this.executor);
        return VOID;
    }

    protected CompletionStage<List<Binary>> convertKeys(CacheRequestHolder<GetAllRequest, Void> holder) {
        return CompletableFuture.supplyAsync(() -> {
            GetAllRequest request = (GetAllRequest)holder.getRequest();
            return request.getKeyList().stream().map(holder::convertKeyDown).collect(Collectors.toList());
        }, this.executor);
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<BytesValue> invoke(InvokeRequest request) {
        ByteString processorBytes = request.getProcessor();
        if (processorBytes.isEmpty()) {
            CompletableFuture<BytesValue> future = new CompletableFuture<BytesValue>();
            future.completeExceptionally((Throwable)Status.INVALID_ARGUMENT.withDescription("the request does not contain a serialized entry processor").asRuntimeException());
            return future;
        }
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::invoke, this.executor).thenApplyAsync(h -> BinaryHelper.toBytesValue((Binary)h.convertUp((Binary)h.getResult())), this.executor);
    }

    protected CompletionStage<CacheRequestHolder<InvokeRequest, Binary>> invoke(CacheRequestHolder<InvokeRequest, Void> holder) {
        InvokeRequest request = holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        InvocableMap.EntryProcessor processor = (InvocableMap.EntryProcessor)BinaryHelper.fromByteString((ByteString)request.getProcessor(), (Serializer)holder.getSerializer());
        return holder.runAsync(holder.getAsyncCache().invoke((Object)key, processor));
    }

    @Override
    @ServerStreaming
    @Metered
    public void invokeAll(InvokeAllRequest request, StreamObserver<Entry> observer) {
        ByteString processorBytes = request.getProcessor();
        if (processorBytes.isEmpty()) {
            observer.onError((Throwable)Status.INVALID_ARGUMENT.withDescription("the request does not contain a serialized entry processor").asRuntimeException());
        } else {
            try {
                CompletionStage<Void> future = request.getKeysCount() != 0 ? this.invokeAllWithKeys(request, observer) : this.invokeAllWithFilter(request, observer);
                future.handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer), this.executor);
            }
            catch (Throwable t) {
                observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException(t));
            }
        }
    }

    protected CompletionStage<Void> invokeAllWithFilter(InvokeAllRequest request, StreamObserver<Entry> observer) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(h -> this.invokeAllWithFilter((CacheRequestHolder<InvokeAllRequest, Void>)h, observer), this.executor);
    }

    protected CompletionStage<Void> invokeAllWithFilter(CacheRequestHolder<InvokeAllRequest, Void> holder, StreamObserver<Entry> observer) {
        InvokeAllRequest request = holder.getRequest();
        ByteString filterBytes = request.getFilter();
        Filter filter = filterBytes.isEmpty() ? Filters.always() : (Filter)BinaryHelper.fromByteString((ByteString)filterBytes, (Serializer)holder.getSerializer());
        ByteString processorBytes = request.getProcessor();
        InvocableMap.EntryProcessor processor = (InvocableMap.EntryProcessor)BinaryHelper.fromByteString((ByteString)processorBytes, (Serializer)holder.getSerializer());
        return holder.runAsync(holder.getAsyncCache().invokeAll(filter, processor)).handleAsync((h, err) -> this.handleMapOfEntries((CacheRequestHolder<?, Map<Binary, Binary>>)h, (Throwable)err, observer, false), this.executor);
    }

    protected CompletionStage<Void> invokeAllWithKeys(InvokeAllRequest request, StreamObserver<Entry> observer) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(h -> this.invokeAllWithKeys((CacheRequestHolder<InvokeAllRequest, Void>)h, observer), this.executor);
    }

    protected CompletionStage<Void> invokeAllWithKeys(CacheRequestHolder<InvokeAllRequest, Void> holder, StreamObserver<Entry> observer) {
        InvokeAllRequest request = holder.getRequest();
        List keys = request.getKeysList().stream().map(holder::convertKeyDown).collect(Collectors.toList());
        InvocableMap.EntryProcessor processor = (InvocableMap.EntryProcessor)BinaryHelper.fromByteString((ByteString)request.getProcessor(), (Serializer)holder.getSerializer());
        return holder.runAsync(holder.getAsyncCache().invokeAll(keys, processor)).handleAsync((h, err) -> this.handleMapOfEntries((CacheRequestHolder<?, Map<Binary, Binary>>)h, (Throwable)err, observer, false), this.executor);
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<BoolValue> isEmpty(IsEmptyRequest request) {
        return this.getAsyncCache(request.getScope(), request.getCache()).thenComposeAsync(AsyncNamedMap::isEmpty, this.executor).thenApplyAsync(BoolValue::of, this.executor);
    }

    @Override
    @ServerStreaming
    @Metered
    public void keySet(KeySetRequest request, StreamObserver<BytesValue> observer) {
        this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(h -> this.keySet((CacheRequestHolder<KeySetRequest, Void>)h, observer), this.executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer), this.executor);
    }

    protected Void keySet(CacheRequestHolder<KeySetRequest, Void> holder, StreamObserver<BytesValue> observer) {
        try {
            KeySetRequest request = holder.getRequest();
            Serializer serializer = holder.getSerializer();
            Filter filter = this.ensureFilter(request.getFilter(), serializer);
            holder.runAsync(holder.getAsyncCache().keySet(filter)).handleAsync((h, err) -> this.handleStream((CacheRequestHolder<?, ? extends Iterable<Binary>>)h, (Throwable)err, observer), this.executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer), this.executor);
        }
        catch (Throwable t) {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException(t));
        }
        return VOID;
    }

    @Override
    @ServerStreaming
    @Metered
    public void nextKeySetPage(PageRequest request, StreamObserver<BytesValue> observer) {
        this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(h -> PagedQueryHelper.keysPagedQuery(h, this.getTransferThreshold()), this.executor).handleAsync((stream, err) -> this.handleStream((Stream)stream, (Throwable)err, (StreamObserver)observer), this.executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer));
    }

    @Override
    @ServerStreaming
    @Metered
    public void nextEntrySetPage(PageRequest request, StreamObserver<EntryResult> observer) {
        this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(h -> PagedQueryHelper.entryPagedQuery(h, this.getTransferThreshold()), this.executor).handleAsync((stream, err) -> this.handleStream((Stream)stream, (Throwable)err, (StreamObserver)observer), this.executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer));
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<BytesValue> put(PutRequest request) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::put, this.executor);
    }

    protected CompletionStage<BytesValue> put(CacheRequestHolder<PutRequest, Void> holder) {
        PutRequest request = holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        Binary value = holder.convertDown(request.getValue());
        return holder.getAsyncCache().invoke((Object)key, BinaryProcessors.put((Binary)value, (long)request.getTtl())).thenApplyAsync(holder::deserializeToBytesValue, this.executor);
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<Empty> putAll(PutAllRequest request) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::putAll, this.executor);
    }

    protected CompletionStage<Empty> putAll(CacheRequestHolder<PutAllRequest, Void> holder) {
        PutAllRequest request = holder.getRequest();
        if (request.getEntryCount() == 0) {
            return CompletableFuture.completedFuture(BinaryHelper.EMPTY);
        }
        HashMap<Binary, Binary> map = new HashMap<Binary, Binary>();
        for (Entry entry : request.getEntryList()) {
            Binary key = holder.convertKeyDown(entry.getKey());
            Binary value = holder.convertDown(entry.getValue());
            map.put(key, value);
        }
        if (holder.getCache().getCacheService() instanceof PartitionedService) {
            return this.partitionedPutAll(holder, map);
        }
        return this.plainPutAll(holder.getAsyncCache(), map);
    }

    protected CompletionStage<Empty> partitionedPutAll(CacheRequestHolder<PutAllRequest, Void> holder, Map<Binary, Binary> map) {
        try {
            HashMap<Member, Map> mapByOwner = new HashMap<Member, Map>();
            PartitionedService service = (PartitionedService)holder.getCache().getCacheService();
            for (Map.Entry<Binary, Binary> entry : map.entrySet()) {
                Binary key = entry.getKey();
                Member member = service.getKeyOwner((Object)key);
                Map mapForMember2 = mapByOwner.computeIfAbsent(member, m -> new HashMap());
                mapForMember2.put(key, entry.getValue());
            }
            AsyncNamedCache<Binary, Binary> cache = holder.getAsyncCache();
            CompletableFuture[] futures = (CompletableFuture[])mapByOwner.values().stream().map(mapForMember -> this.plainPutAll(cache, (Map<Binary, Binary>)mapForMember)).map(CompletionStage::toCompletableFuture).toArray(CompletableFuture[]::new);
            return CompletableFuture.allOf(futures).thenApply(v -> BinaryHelper.EMPTY);
        }
        catch (Throwable t) {
            CompletableFuture<Empty> future = new CompletableFuture<Empty>();
            future.completeExceptionally(t);
            return future;
        }
    }

    protected CompletionStage<Empty> plainPutAll(AsyncNamedCache<Binary, Binary> cache, Map<Binary, Binary> map) {
        return cache.invokeAll(map.keySet(), BinaryProcessors.putAll(map)).thenApplyAsync(v -> BinaryHelper.EMPTY, this.executor);
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<BytesValue> putIfAbsent(PutIfAbsentRequest request) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::putIfAbsent, this.executor);
    }

    protected CompletableFuture<BytesValue> putIfAbsent(CacheRequestHolder<PutIfAbsentRequest, Void> holder) {
        PutIfAbsentRequest request = holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        Binary value = holder.convertDown(() -> ((PutIfAbsentRequest)request).getValue());
        return holder.getAsyncCache().invoke((Object)key, BinaryProcessors.putIfAbsent((Binary)value, (long)request.getTtl())).thenApplyAsync(holder::deserializeToBytesValue, this.executor);
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<BytesValue> remove(RemoveRequest request) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(h -> h.runAsync(this.remove((CacheRequestHolder<RemoveRequest, Void>)h)), this.executor).thenApplyAsync(h -> h.toBytesValue((Binary)h.getResult()), this.executor);
    }

    protected CompletableFuture<Binary> remove(CacheRequestHolder<RemoveRequest, Void> holder) {
        RemoveRequest request = holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        return holder.getAsyncCache().invoke((Object)key, (InvocableMap.EntryProcessor)BinaryProcessors.BinaryRemoveProcessor.INSTANCE).thenApplyAsync(holder::fromCacheBinary, this.executor);
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<Empty> removeIndex(RemoveIndexRequest request) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(this::removeIndex, this.executor);
    }

    protected Empty removeIndex(CacheRequestHolder<RemoveIndexRequest, Void> holder) {
        RemoveIndexRequest request = holder.getRequest();
        NamedCache<Binary, Binary> cache = holder.getCache();
        ValueExtractor<?, ?> extractor = this.ensureValueExtractor(request.getExtractor(), holder.getSerializer());
        cache.removeIndex(extractor);
        return BinaryHelper.EMPTY;
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<BoolValue> removeMapping(RemoveMappingRequest request) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::removeMapping, this.executor).thenApplyAsync(h -> BoolValue.of((boolean)((Boolean)h.getDeserializedResult())), this.executor);
    }

    protected CompletionStage<CacheRequestHolder<RemoveMappingRequest, Boolean>> removeMapping(CacheRequestHolder<RemoveMappingRequest, Void> holder) {
        RemoveMappingRequest request = holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        Object value = BinaryHelper.fromByteString((ByteString)request.getValue(), (Serializer)holder.getSerializer());
        AsyncNamedCache<Binary, Binary> cache = holder.getAsyncCache();
        return holder.runAsync(cache.remove((Object)key, value));
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<BytesValue> replace(ReplaceRequest request) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(h -> h.runAsync(this.replace((CacheRequestHolder<ReplaceRequest, Void>)h)), this.executor).thenApplyAsync(h -> h.toBytesValue((Binary)h.getResult()), this.executor);
    }

    protected CompletableFuture<Binary> replace(CacheRequestHolder<ReplaceRequest, Void> holder) {
        ReplaceRequest request = holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        Binary value = holder.convertDown(request.getValue());
        return holder.getAsyncCache().invoke((Object)key, this.castProcessor((InvocableMap.EntryProcessor<Binary, Binary, ?>)new BinaryProcessors.BinaryReplaceProcessor(value))).thenApplyAsync(holder::fromCacheBinary, this.executor);
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<BoolValue> replaceMapping(ReplaceMappingRequest request) {
        return this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(h -> h.runAsync(this.replaceMapping((CacheRequestHolder<ReplaceMappingRequest, Void>)h)), this.executor).thenApplyAsync(h -> this.toBoolValue((Binary)h.getResult(), h.getCacheSerializer()), this.executor);
    }

    protected CompletableFuture<Binary> replaceMapping(CacheRequestHolder<ReplaceMappingRequest, Void> holder) {
        ReplaceMappingRequest request = holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        Binary prevValue = holder.convertDown(request.getPreviousValue());
        Binary newValue = holder.convertDown(request.getNewValue());
        return holder.getAsyncCache().invoke((Object)key, this.castProcessor((InvocableMap.EntryProcessor<Binary, Binary, ?>)new BinaryProcessors.BinaryReplaceMappingProcessor(prevValue, newValue)));
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<Int32Value> size(SizeRequest request) {
        CompletionStage<Int32Value> s = this.getAsyncCache(request.getScope(), request.getCache()).thenComposeAsync(AsyncNamedMap::size, this.executor).thenApplyAsync(Int32Value::of, this.executor);
        s.handle((sz, err) -> null);
        return s;
    }

    @Override
    @Unary
    @Metered
    public CompletionStage<Empty> truncate(TruncateRequest request) {
        return this.getAsyncCache(request.getScope(), request.getCache()).thenApplyAsync(cache -> this.execute(() -> cache.getNamedCache().truncate()), this.executor);
    }

    @Override
    @ServerStreaming
    @Metered
    public void values(ValuesRequest request, StreamObserver<BytesValue> observer) {
        this.createHolder(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(h -> this.values((CacheRequestHolder<ValuesRequest, Void>)h, observer), this.executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer), this.executor);
    }

    protected Void values(CacheRequestHolder<ValuesRequest, Void> holder, StreamObserver<BytesValue> observer) {
        try {
            ValuesRequest request = holder.getRequest();
            Serializer serializer = holder.getSerializer();
            Filter filter = this.ensureFilter(request.getFilter(), serializer);
            Comparator comparator = this.deserializeComparator(request.getComparator(), serializer);
            holder.runAsync(holder.getAsyncCache().values(filter, comparator)).handleAsync((h, err) -> this.handleStream((CacheRequestHolder<?, ? extends Iterable<Binary>>)h, (Throwable)err, observer), this.executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer), this.executor);
        }
        catch (Throwable t) {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException(t));
        }
        return VOID;
    }

    protected <V> Empty empty(V value) {
        return BinaryHelper.EMPTY;
    }

    protected Empty execute(Runnable task) {
        task.run();
        return BinaryHelper.EMPTY;
    }

    protected <T> T execute(Callable<T> task) {
        try {
            return task.call();
        }
        catch (Throwable t) {
            throw ErrorsHelper.ensureStatusRuntimeException(t);
        }
    }

    protected Void handleMapOfEntries(CacheRequestHolder<?, Map<Binary, Binary>> holder, Throwable err, StreamObserver<Entry> observer, boolean fDeserialize) {
        if (err == null) {
            this.handleStreamOfEntries(holder, holder.getResult().entrySet().stream(), observer, fDeserialize);
        } else {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException(err));
        }
        return VOID;
    }

    protected Void handleSetOfEntries(CacheRequestHolder<?, Set<Map.Entry<Binary, Binary>>> holder, Throwable err, StreamObserver<Entry> observer, boolean fDeserialize) {
        if (err == null) {
            this.handleStreamOfEntries(holder, holder.getResult().stream(), observer, fDeserialize);
        } else {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException(err));
        }
        return VOID;
    }

    protected Void handleStreamOfEntries(CacheRequestHolder<?, ?> holder, Stream<Map.Entry<Binary, Binary>> entries, StreamObserver<Entry> observer, boolean fDeserialize) {
        try {
            entries.forEach(entry -> {
                Binary binValue = (Binary)entry.getValue();
                if (fDeserialize) {
                    binValue = (Binary)holder.fromCacheBinary((Binary)entry.getValue());
                }
                observer.onNext((Object)holder.toEntry((Binary)entry.getKey(), binValue));
            });
            observer.onCompleted();
        }
        catch (Throwable thrown) {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException(thrown));
        }
        return VOID;
    }

    protected Void handleStream(CacheRequestHolder<?, ? extends Iterable<Binary>> holder, Throwable err, StreamObserver<BytesValue> observer) {
        if (err == null) {
            try {
                Iterable<Binary> iterable = holder.getResult();
                Stream<BytesValue> stream = StreamSupport.stream(iterable.spliterator(), false).map(bin -> BinaryHelper.toBytesValue((Binary)holder.convertUp((Binary)bin)));
                ResponseHelper.stream(observer, stream);
            }
            catch (Throwable t) {
                observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException(t));
            }
        } else {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException(err));
        }
        return VOID;
    }

    protected <Resp> Void handleStream(Stream<Resp> stream, Throwable err, StreamObserver<Resp> observer) {
        if (err == null) {
            try {
                ResponseHelper.stream(observer, stream);
            }
            catch (Throwable t) {
                observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException(t));
            }
        } else {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException(err));
        }
        return VOID;
    }

    protected <Resp> Void handleError(Throwable err, StreamObserver<Resp> observer) {
        if (err != null) {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException(err));
        }
        return VOID;
    }

    protected <Resp> Resp handleError(Resp response, Throwable err) {
        if (err != null) {
            throw ErrorsHelper.ensureStatusRuntimeException(err);
        }
        return response;
    }

    protected BoolValue toBoolValue(Binary binary, Serializer serializer) {
        return BoolValue.of((boolean)((Boolean)BinaryHelper.fromBinary((Binary)binary, (Serializer)serializer)));
    }

    ValueExtractor<?, ?> ensureValueExtractor(ByteString bytes, Serializer serializer) {
        if (bytes == null || bytes.isEmpty()) {
            throw Status.INVALID_ARGUMENT.withDescription("the request does not contain a serialized ValueExtractor").asRuntimeException();
        }
        return (ValueExtractor)BinaryHelper.fromByteString((ByteString)bytes, (Serializer)serializer);
    }

    <T> Filter<T> ensureFilter(ByteString bytes, Serializer serializer) {
        if (bytes == null || bytes.isEmpty()) {
            return Filters.always();
        }
        return (Filter)BinaryHelper.fromByteString((ByteString)bytes, (Serializer)serializer);
    }

    <T> Filter<T> getFilter(ByteString bytes, Serializer serializer) {
        if (bytes == null || bytes.isEmpty()) {
            return null;
        }
        return (Filter)BinaryHelper.fromByteString((ByteString)bytes, (Serializer)serializer);
    }

    <T> Comparator<T> deserializeComparator(ByteString bytes, Serializer serializer) {
        if (bytes == null || bytes.isEmpty()) {
            return null;
        }
        return (Comparator)BinaryHelper.fromByteString((ByteString)bytes, (Serializer)serializer);
    }

    protected CompletionStage<AsyncNamedCache<Binary, Binary>> getAsyncCache(String scope, String cacheName) {
        return CompletableFuture.supplyAsync(() -> this.getPassThroughCache(scope, cacheName).async(), this.executor);
    }

    protected NamedCache<Binary, Binary> getPassThroughCache(String scope, String cacheName) {
        return this.getCache(scope, cacheName, true);
    }

    protected NamedCache<Binary, Binary> getCache(String scope, String cacheName, boolean passThru) {
        CacheService service;
        if (cacheName == null || cacheName.trim().length() == 0) {
            throw Status.INVALID_ARGUMENT.withDescription("invalid request, cache name cannot be null or empty").asRuntimeException();
        }
        ClassLoader loader = passThru ? NullImplementation.getClassLoader() : Base.getContextClassLoader();
        ConfigurableCacheFactory ccf = this.cacheFactorySupplier.apply(scope);
        NamedCache cache = ccf.ensureCache(cacheName, loader);
        boolean near = cache instanceof NearCache;
        if (near && (service = cache.getCacheService()) instanceof DistributedCacheService && ((DistributedCacheService)service).isLocalStorageEnabled()) {
            cache = ((NearCache)cache).getBackCache();
        }
        if (near) {
            return new ConvertingNamedCache(cache, NullImplementation.getConverter(), ExternalizableHelper.CONVERTER_STRIP_INTDECO, NullImplementation.getConverter(), NullImplementation.getConverter());
        }
        return cache;
    }

    protected InvocableMap.EntryProcessor<Binary, Binary, Binary> castProcessor(InvocableMap.EntryProcessor<Binary, Binary, ?> ep) {
        return ep;
    }

    <Req> CompletionStage<CacheRequestHolder<Req, Void>> createHolder(Req request, String sScope, String sCacheName, String format) {
        return CompletableFuture.supplyAsync(() -> this.supplyHolderInternal(request, sScope, sCacheName, format), this.executor);
    }

    <Req> CacheRequestHolder<Req, Void> supplyHolderInternal(Req request, String sScope, String sCacheName, String format) {
        Serializer serializerRequest;
        if (request == null) {
            throw Status.INVALID_ARGUMENT.withDescription("invalid request, the request cannot be null").asRuntimeException();
        }
        if (sCacheName == null || sCacheName.isEmpty()) {
            throw Status.INVALID_ARGUMENT.withDescription("invalid request, cache name cannot be null or empty").asRuntimeException();
        }
        NamedCache<Binary, Binary> c = this.getCache(sScope, sCacheName, true);
        NamedCache<Binary, Binary> nonPassThrough = this.getCache(sScope, sCacheName, false);
        AsyncNamedCache cache = c.async();
        CacheService cacheService = cache.getNamedCache().getCacheService();
        Serializer serializerCache = cacheService.getSerializer();
        String cacheFormat = serializerCache.getName();
        if ((cacheFormat == null || cacheFormat.isEmpty()) && serializerCache instanceof DefaultSerializer) {
            cacheFormat = "java";
        } else if ((cacheFormat == null || cacheFormat.isEmpty()) && serializerCache instanceof ConfigurablePofContext) {
            cacheFormat = "pof";
        }
        if (format == null || format.trim().isEmpty() || format.equals(cacheFormat)) {
            serializerRequest = serializerCache;
        } else {
            ClassLoader loader = cacheService.getContextClassLoader();
            serializerRequest = this.serializerProducer.getNamedSerializer(format, loader);
        }
        if (serializerRequest == null) {
            throw Status.INVALID_ARGUMENT.withDescription("invalid request format, cannot find serializer with name '" + format + "'").asRuntimeException();
        }
        return new CacheRequestHolder(request, (AsyncNamedCache<Binary, Binary>)cache, () -> nonPassThrough, format, serializerRequest, this.executor);
    }

    public static class Builder
    implements io.helidon.common.Builder<NamedCacheService> {
        protected Function<String, ConfigurableCacheFactory> m_cacheFactorySupplier;
        protected Cluster m_cluster;
        protected SerializerProducer m_serializerProducer;
        protected Config m_config;

        protected Builder(Config config) {
            this.m_config = config == null ? Config.empty() : config;
            this.m_cacheFactorySupplier = new DefaultCacheFactorySupplier();
        }

        public NamedCacheService build() {
            return new NamedCacheService(this.ensureCluster(), this.m_cacheFactorySupplier, this.ensureSerializerProducer(), this.m_config);
        }

        public Builder configurableCacheFactorySupplier(Function<String, ConfigurableCacheFactory> fn) {
            this.m_cacheFactorySupplier = fn != null ? fn : new DefaultCacheFactorySupplier();
            return this;
        }

        public Builder configurableCacheFactories(ConfigurableCacheFactory ... ccfs) {
            this.m_cacheFactorySupplier = new FixedCacheFactorySupplier(ccfs);
            return this;
        }

        public Builder serializerProducer(SerializerProducer producer) {
            this.m_serializerProducer = producer;
            return this;
        }

        protected SerializerProducer ensureSerializerProducer() {
            if (this.m_serializerProducer == null) {
                return SerializerProducer.create();
            }
            return this.m_serializerProducer;
        }

        protected Cluster ensureCluster() {
            if (this.m_cluster == null) {
                this.m_cluster = CacheFactory.ensureCluster();
            }
            return this.m_cluster;
        }
    }

    public static class FixedCacheFactorySupplier
    implements Function<String, ConfigurableCacheFactory> {
        private final Map<String, ConfigurableCacheFactory> f_mapCCF;

        public FixedCacheFactorySupplier(ConfigurableCacheFactory ... ccfs) {
            HashMap<String, ConfigurableCacheFactory> map = new HashMap<String, ConfigurableCacheFactory>();
            for (ConfigurableCacheFactory ccf : ccfs) {
                map.put(ccf.getScopeName(), ccf);
            }
            this.f_mapCCF = map;
        }

        @Override
        public ConfigurableCacheFactory apply(String scope) {
            ConfigurableCacheFactory ccf;
            if (scope == null) {
                scope = "";
            }
            if ((ccf = this.f_mapCCF.get(scope)) != null) {
                return ccf;
            }
            if ("".equals(scope)) {
                return CacheFactory.getCacheFactoryBuilder().getConfigurableCacheFactory(Base.getContextClassLoader());
            }
            throw Status.INVALID_ARGUMENT.withDescription("cannot locate ConfigurableCacheFactory with scope name " + scope).asRuntimeException();
        }
    }

    public static class DefaultCacheFactorySupplier
    implements Function<String, ConfigurableCacheFactory> {
        @Override
        public ConfigurableCacheFactory apply(String scope) {
            if (scope == null || "".equals(scope)) {
                return CacheFactory.getCacheFactoryBuilder().getConfigurableCacheFactory(Base.getContextClassLoader());
            }
            throw Status.INVALID_ARGUMENT.withDescription("cannot locate ConfigurableCacheFactory with scope name " + scope + " from CDI").asRuntimeException();
        }
    }

    public static class CdiCacheFactorySupplier
    implements Function<String, ConfigurableCacheFactory> {
        private final BeanManager f_beanManager;

        public CdiCacheFactorySupplier(BeanManager beanManager) {
            this.f_beanManager = beanManager;
        }

        @Override
        public ConfigurableCacheFactory apply(String scope) {
            Instance instance;
            if (scope == null) {
                scope = "";
            }
            if ((instance = this.f_beanManager.createInstance().select(ConfigurableCacheFactory.class, new Annotation[]{Scope.Literal.of((String)scope)})).isResolvable()) {
                return (ConfigurableCacheFactory)instance.get();
            }
            throw Status.INVALID_ARGUMENT.withDescription("cannot locate ConfigurableCacheFactory with scope name " + scope + " from CDI").asRuntimeException();
        }
    }
}

