/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.grpc.proxy;

import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.Empty;
import com.google.protobuf.Int32Value;
import com.oracle.coherence.grpc.AddIndexRequest;
import com.oracle.coherence.grpc.AggregateRequest;
import com.oracle.coherence.grpc.BinaryHelper;
import com.oracle.coherence.grpc.CacheRequestHolder;
import com.oracle.coherence.grpc.ClearRequest;
import com.oracle.coherence.grpc.ContainsEntryRequest;
import com.oracle.coherence.grpc.ContainsKeyRequest;
import com.oracle.coherence.grpc.ContainsValueRequest;
import com.oracle.coherence.grpc.DestroyRequest;
import com.oracle.coherence.grpc.Entry;
import com.oracle.coherence.grpc.EntryResult;
import com.oracle.coherence.grpc.EntrySetRequest;
import com.oracle.coherence.grpc.ErrorsHelper;
import com.oracle.coherence.grpc.GetAllRequest;
import com.oracle.coherence.grpc.GetRequest;
import com.oracle.coherence.grpc.InvokeAllRequest;
import com.oracle.coherence.grpc.InvokeRequest;
import com.oracle.coherence.grpc.IsEmptyRequest;
import com.oracle.coherence.grpc.KeySetRequest;
import com.oracle.coherence.grpc.MapListenerRequest;
import com.oracle.coherence.grpc.MapListenerResponse;
import com.oracle.coherence.grpc.OptionalValue;
import com.oracle.coherence.grpc.PageRequest;
import com.oracle.coherence.grpc.PutAllRequest;
import com.oracle.coherence.grpc.PutIfAbsentRequest;
import com.oracle.coherence.grpc.PutRequest;
import com.oracle.coherence.grpc.RemoveIndexRequest;
import com.oracle.coherence.grpc.RemoveMappingRequest;
import com.oracle.coherence.grpc.RemoveRequest;
import com.oracle.coherence.grpc.ReplaceMappingRequest;
import com.oracle.coherence.grpc.ReplaceRequest;
import com.oracle.coherence.grpc.SafeStreamObserver;
import com.oracle.coherence.grpc.SizeRequest;
import com.oracle.coherence.grpc.TruncateRequest;
import com.oracle.coherence.grpc.ValuesRequest;
import com.oracle.coherence.grpc.proxy.ConfigurableCacheFactorySuppliers;
import com.oracle.coherence.grpc.proxy.DaemonPoolExecutor;
import com.oracle.coherence.grpc.proxy.GrpcProxyMetrics;
import com.oracle.coherence.grpc.proxy.GrpcServiceDependencies;
import com.oracle.coherence.grpc.proxy.MapListenerProxy;
import com.oracle.coherence.grpc.proxy.NamedCacheService;
import com.oracle.coherence.grpc.proxy.PagedQueryHelper;
import com.tangosol.internal.util.DefaultDaemonPoolDependencies;
import com.tangosol.internal.util.collection.ConvertingNamedCache;
import com.tangosol.internal.util.processor.BinaryProcessors;
import com.tangosol.io.NamedSerializerFactory;
import com.tangosol.io.Serializer;
import com.tangosol.net.AsyncNamedCache;
import com.tangosol.net.AsyncNamedMap;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.CacheService;
import com.tangosol.net.ConfigurableCacheFactory;
import com.tangosol.net.DistributedCacheService;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PartitionedService;
import com.tangosol.net.cache.NearCache;
import com.tangosol.net.management.Registry;
import com.tangosol.util.Aggregators;
import com.tangosol.util.Base;
import com.tangosol.util.Binary;
import com.tangosol.util.ExternalizableHelper;
import com.tangosol.util.Filter;
import com.tangosol.util.Filters;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.NullImplementation;
import com.tangosol.util.ValueExtractor;
import com.tangosol.util.extractor.IdentityExtractor;
import com.tangosol.util.filter.AlwaysFilter;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class NamedCacheServiceImpl
implements NamedCacheService {
    protected static final Void VOID = null;
    public static final long DEFAULT_TRANSFER_THRESHOLD = 524288L;
    public static final String MBEAN_NAME = "type=GrpcNamedCacheProxy";
    protected final Function<String, ConfigurableCacheFactory> f_cacheFactorySupplier;
    protected final NamedSerializerFactory f_serializerProducer;
    protected final Executor f_executor;
    protected final GrpcProxyMetrics f_metrics;
    protected long transferThreshold = 524288L;

    public NamedCacheServiceImpl(Dependencies dependencies) {
        this.f_executor = dependencies.getExecutor().orElseGet(NamedCacheServiceImpl::createDefaultExecutor);
        this.f_cacheFactorySupplier = dependencies.getCacheFactorySupplier().orElse(ConfigurableCacheFactorySuppliers.DEFAULT);
        this.f_serializerProducer = dependencies.getNamedSerializerFactory().orElse(NamedSerializerFactory.DEFAULT);
        dependencies.getTransferThreshold().ifPresent(this::setTransferThreshold);
        DaemonPoolExecutor.DaemonPoolManagement management = this.f_executor instanceof DaemonPoolExecutor ? ((DaemonPoolExecutor)this.f_executor).getManagement() : null;
        Registry registry = dependencies.getRegistry().orElseGet(() -> CacheFactory.getCluster().getManagement());
        this.f_metrics = new GrpcProxyMetrics(MBEAN_NAME, management);
        this.f_metrics.registerMBean(registry);
    }

    public static NamedCacheServiceImpl newInstance(Dependencies deps) {
        return new NamedCacheServiceImpl(deps);
    }

    public static NamedCacheServiceImpl newInstance() {
        return NamedCacheServiceImpl.newInstance(new DefaultDependencies());
    }

    @Override
    public GrpcProxyMetrics getMetrics() {
        return this.f_metrics;
    }

    long getTransferThreshold() {
        return this.transferThreshold;
    }

    void setTransferThreshold(long lSize) {
        this.transferThreshold = lSize;
    }

    @Override
    public CompletionStage<Empty> addIndex(AddIndexRequest request) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(this::addIndex, this.f_executor);
    }

    protected Empty addIndex(CacheRequestHolder<AddIndexRequest, Void> holder) {
        AddIndexRequest request = (AddIndexRequest)holder.getRequest();
        NamedCache cache = holder.getCache();
        Serializer serializer = holder.getSerializer();
        ValueExtractor<?, ?> extractor = this.ensureValueExtractor(request.getExtractor(), serializer);
        Comparator comparator = (Comparator)BinaryHelper.fromByteString((ByteString)request.getComparator(), (Serializer)serializer);
        cache.addIndex(extractor, request.getSorted(), comparator);
        return BinaryHelper.EMPTY;
    }

    @Override
    public CompletionStage<BytesValue> aggregate(AggregateRequest request) {
        ByteString processorBytes = request.getAggregator();
        if (processorBytes.isEmpty()) {
            CompletableFuture<BytesValue> future = new CompletableFuture<BytesValue>();
            future.completeExceptionally((Throwable)Status.INVALID_ARGUMENT.withDescription("the request does not contain a serialized entry aggregator").asRuntimeException());
            return future;
        }
        try {
            if (request.getKeysCount() != 0) {
                return this.aggregateWithKeys(request);
            }
            return this.aggregateWithFilter(request);
        }
        catch (Throwable t) {
            throw ErrorsHelper.ensureStatusRuntimeException((Throwable)t);
        }
    }

    protected CompletionStage<BytesValue> aggregateWithFilter(AggregateRequest request) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::aggregateWithFilter, this.f_executor).handleAsync(this::handleError, this.f_executor);
    }

    protected CompletionStage<BytesValue> aggregateWithFilter(CacheRequestHolder<AggregateRequest, Void> holder) {
        AggregateRequest request = (AggregateRequest)holder.getRequest();
        ByteString filterBytes = request.getFilter();
        Filter filter = filterBytes.isEmpty() ? Filters.always() : (Filter)BinaryHelper.fromByteString((ByteString)filterBytes, (Serializer)holder.getSerializer());
        ByteString processorBytes = request.getAggregator();
        InvocableMap.EntryAggregator aggregator = (InvocableMap.EntryAggregator)BinaryHelper.fromByteString((ByteString)processorBytes, (Serializer)holder.getSerializer());
        return holder.runAsync((CompletionStage)holder.getAsyncCache().aggregate(filter, aggregator)).thenApplyAsync(h -> BinaryHelper.toBytesValue((Object)h.getResult(), (Serializer)h.getSerializer()), this.f_executor);
    }

    protected CompletionStage<BytesValue> aggregateWithKeys(AggregateRequest request) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::aggregateWithKeys, this.f_executor).handleAsync(this::handleError, this.f_executor);
    }

    protected CompletionStage<BytesValue> aggregateWithKeys(CacheRequestHolder<AggregateRequest, Void> holder) {
        AggregateRequest request = (AggregateRequest)holder.getRequest();
        List keys = request.getKeysList().stream().map(arg_0 -> holder.convertKeyDown(arg_0)).collect(Collectors.toList());
        InvocableMap.EntryAggregator aggregator = (InvocableMap.EntryAggregator)BinaryHelper.fromByteString((ByteString)request.getAggregator(), (Serializer)holder.getSerializer());
        return holder.runAsync((CompletionStage)holder.getAsyncCache().aggregate(keys, aggregator)).thenApplyAsync(h -> BinaryHelper.toBytesValue((Object)h.getResult(), (Serializer)h.getSerializer()), this.f_executor);
    }

    @Override
    public CompletionStage<Empty> clear(ClearRequest request) {
        return this.getAsyncCache(request.getScope(), request.getCache()).thenComposeAsync(cache -> cache.invokeAll((Filter)AlwaysFilter.INSTANCE(), (InvocableMap.EntryProcessor)BinaryProcessors.BinarySyntheticRemoveBlindProcessor.INSTANCE), this.f_executor).thenApplyAsync(this::empty, this.f_executor);
    }

    @Override
    public CompletionStage<BoolValue> containsEntry(ContainsEntryRequest request) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::containsEntry, this.f_executor).thenApplyAsync(h -> this.toBoolValue((Binary)h.getResult(), h.getCacheSerializer()), this.f_executor);
    }

    protected CompletionStage<CacheRequestHolder<ContainsEntryRequest, Binary>> containsEntry(CacheRequestHolder<ContainsEntryRequest, Void> holder) {
        ContainsEntryRequest request = (ContainsEntryRequest)holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        Binary value = holder.convertDown(request.getValue());
        InvocableMap.EntryProcessor<Binary, Binary, Binary> processor = this.castProcessor((InvocableMap.EntryProcessor<Binary, Binary, ?>)new BinaryProcessors.BinaryContainsValueProcessor(value));
        return holder.runAsync((CompletionStage)holder.getAsyncCache().invoke((Object)key, processor));
    }

    @Override
    public CompletionStage<BoolValue> containsKey(ContainsKeyRequest request) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::containsKey, this.f_executor).thenApplyAsync(h -> BoolValue.of((boolean)((Boolean)h.getDeserializedResult())), this.f_executor);
    }

    protected CompletionStage<CacheRequestHolder<ContainsKeyRequest, Boolean>> containsKey(CacheRequestHolder<ContainsKeyRequest, Void> holder) {
        ContainsKeyRequest request = (ContainsKeyRequest)holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        return holder.runAsync((CompletionStage)holder.getAsyncCache().containsKey((Object)key));
    }

    @Override
    public CompletionStage<BoolValue> containsValue(ContainsValueRequest request) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::containsValue, this.f_executor).thenApplyAsync(h -> BoolValue.of(((Integer)h.getResult() > 0 ? 1 : 0) != 0), this.f_executor);
    }

    protected CompletionStage<CacheRequestHolder<ContainsValueRequest, Integer>> containsValue(CacheRequestHolder<ContainsValueRequest, Void> holder) {
        ContainsValueRequest request = (ContainsValueRequest)holder.getRequest();
        Object value = BinaryHelper.fromByteString((ByteString)request.getValue(), (Serializer)holder.getSerializer());
        Filter filter = Filters.equal((ValueExtractor)IdentityExtractor.INSTANCE(), (Object)value);
        return holder.runAsync((CompletionStage)holder.getAsyncCache().aggregate(filter, (InvocableMap.EntryAggregator)Aggregators.count()));
    }

    @Override
    public CompletionStage<Empty> destroy(DestroyRequest request) {
        return this.getAsyncCache(request.getScope(), request.getCache()).thenApplyAsync(cache -> this.execute(() -> cache.getNamedCache().destroy()), this.f_executor);
    }

    @Override
    public void entrySet(EntrySetRequest request, StreamObserver<Entry> observer) {
        this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(h -> this.entrySet((CacheRequestHolder<EntrySetRequest, Void>)h, observer), this.f_executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer), this.f_executor);
    }

    protected Void entrySet(CacheRequestHolder<EntrySetRequest, Void> holder, StreamObserver<Entry> observer) {
        try {
            EntrySetRequest request = (EntrySetRequest)holder.getRequest();
            Serializer serializer = holder.getSerializer();
            Filter filter = this.ensureFilter(request.getFilter(), serializer);
            Comparator comparator = this.deserializeComparator(request.getComparator(), serializer);
            holder.runAsync((CompletionStage)holder.getAsyncCache().entrySet(filter, comparator)).handleAsync((h, err) -> this.handleSetOfEntries((CacheRequestHolder<?, Set<Map.Entry<Binary, Binary>>>)h, (Throwable)err, observer, false), this.f_executor);
        }
        catch (Throwable t) {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException((Throwable)t));
        }
        return VOID;
    }

    @Override
    public StreamObserver<MapListenerRequest> events(StreamObserver<MapListenerResponse> observer) {
        return new MapListenerProxy(this, observer);
    }

    @Override
    public CompletionStage<OptionalValue> get(GetRequest request) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::get, this.f_executor).thenApplyAsync(h -> h.toOptionalValue((Binary)h.getDeserializedResult()), this.f_executor);
    }

    protected CompletionStage<CacheRequestHolder<GetRequest, Binary>> get(CacheRequestHolder<GetRequest, Void> holder) {
        Binary key = holder.convertKeyDown(((GetRequest)holder.getRequest()).getKey());
        InvocableMap.EntryProcessor processor = BinaryProcessors.get();
        return holder.runAsync((CompletionStage)holder.getAsyncCache().invoke((Object)key, processor));
    }

    @Override
    public void getAll(GetAllRequest request, StreamObserver<Entry> observer) {
        if (request.getKeyList().isEmpty()) {
            observer.onCompleted();
        } else {
            this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(h -> this.getAll((CacheRequestHolder<GetAllRequest, Void>)h, observer), this.f_executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer), this.f_executor);
        }
    }

    protected Void getAll(CacheRequestHolder<GetAllRequest, Void> holder, StreamObserver<Entry> observer) {
        holder.runAsync(this.convertKeys(holder)).thenComposeAsync(h -> h.runAsync((CompletionStage)h.getAsyncCache().invokeAll((Collection)h.getResult(), BinaryProcessors.get())), this.f_executor).handleAsync((h, err) -> this.handleMapOfEntries((CacheRequestHolder<?, Map<Binary, Binary>>)h, (Throwable)err, observer, true), this.f_executor);
        return VOID;
    }

    protected CompletionStage<List<Binary>> convertKeys(CacheRequestHolder<GetAllRequest, Void> holder) {
        return CompletableFuture.supplyAsync(() -> {
            GetAllRequest request = (GetAllRequest)holder.getRequest();
            return request.getKeyList().stream().map(arg_0 -> ((CacheRequestHolder)holder).convertKeyDown(arg_0)).collect(Collectors.toList());
        }, this.f_executor);
    }

    @Override
    public CompletionStage<BytesValue> invoke(InvokeRequest request) {
        ByteString processorBytes = request.getProcessor();
        if (processorBytes.isEmpty()) {
            CompletableFuture<BytesValue> future = new CompletableFuture<BytesValue>();
            future.completeExceptionally((Throwable)Status.INVALID_ARGUMENT.withDescription("the request does not contain a serialized entry processor").asRuntimeException());
            return future;
        }
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::invoke, this.f_executor).thenApplyAsync(h -> BinaryHelper.toBytesValue((Binary)h.convertUp((Binary)h.getResult())), this.f_executor);
    }

    protected CompletionStage<CacheRequestHolder<InvokeRequest, Binary>> invoke(CacheRequestHolder<InvokeRequest, Void> holder) {
        InvokeRequest request = (InvokeRequest)holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        InvocableMap.EntryProcessor processor = (InvocableMap.EntryProcessor)BinaryHelper.fromByteString((ByteString)request.getProcessor(), (Serializer)holder.getSerializer());
        return holder.runAsync((CompletionStage)holder.getAsyncCache().invoke((Object)key, processor));
    }

    @Override
    public void invokeAll(InvokeAllRequest request, StreamObserver<Entry> observer) {
        ByteString processorBytes = request.getProcessor();
        if (processorBytes.isEmpty()) {
            observer.onError((Throwable)Status.INVALID_ARGUMENT.withDescription("the request does not contain a serialized entry processor").asRuntimeException());
        } else {
            try {
                CompletionStage<Void> future = request.getKeysCount() != 0 ? this.invokeAllWithKeys(request, observer) : this.invokeAllWithFilter(request, observer);
                future.handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer), this.f_executor);
            }
            catch (Throwable t) {
                observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException((Throwable)t));
            }
        }
    }

    protected CompletionStage<Void> invokeAllWithFilter(InvokeAllRequest request, StreamObserver<Entry> observer) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(h -> this.invokeAllWithFilter((CacheRequestHolder<InvokeAllRequest, Void>)h, observer), this.f_executor);
    }

    protected CompletionStage<Void> invokeAllWithFilter(CacheRequestHolder<InvokeAllRequest, Void> holder, StreamObserver<Entry> observer) {
        InvokeAllRequest request = (InvokeAllRequest)holder.getRequest();
        ByteString filterBytes = request.getFilter();
        Filter filter = filterBytes.isEmpty() ? Filters.always() : (Filter)BinaryHelper.fromByteString((ByteString)filterBytes, (Serializer)holder.getSerializer());
        ByteString processorBytes = request.getProcessor();
        InvocableMap.EntryProcessor processor = (InvocableMap.EntryProcessor)BinaryHelper.fromByteString((ByteString)processorBytes, (Serializer)holder.getSerializer());
        return holder.runAsync((CompletionStage)holder.getAsyncCache().invokeAll(filter, processor)).handleAsync((h, err) -> this.handleMapOfEntries((CacheRequestHolder<?, Map<Binary, Binary>>)h, (Throwable)err, observer, false), this.f_executor);
    }

    protected CompletionStage<Void> invokeAllWithKeys(InvokeAllRequest request, StreamObserver<Entry> observer) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(h -> this.invokeAllWithKeys((CacheRequestHolder<InvokeAllRequest, Void>)h, observer), this.f_executor);
    }

    protected CompletionStage<Void> invokeAllWithKeys(CacheRequestHolder<InvokeAllRequest, Void> holder, StreamObserver<Entry> observer) {
        InvokeAllRequest request = (InvokeAllRequest)holder.getRequest();
        List keys = request.getKeysList().stream().map(arg_0 -> holder.convertKeyDown(arg_0)).collect(Collectors.toList());
        InvocableMap.EntryProcessor processor = (InvocableMap.EntryProcessor)BinaryHelper.fromByteString((ByteString)request.getProcessor(), (Serializer)holder.getSerializer());
        return holder.runAsync((CompletionStage)holder.getAsyncCache().invokeAll(keys, processor)).handleAsync((h, err) -> this.handleMapOfEntries((CacheRequestHolder<?, Map<Binary, Binary>>)h, (Throwable)err, observer, false), this.f_executor);
    }

    @Override
    public CompletionStage<BoolValue> isEmpty(IsEmptyRequest request) {
        return this.getAsyncCache(request.getScope(), request.getCache()).thenComposeAsync(AsyncNamedMap::isEmpty, this.f_executor).thenApplyAsync(BoolValue::of, this.f_executor);
    }

    @Override
    public void keySet(KeySetRequest request, StreamObserver<BytesValue> observer) {
        this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(h -> this.keySet((CacheRequestHolder<KeySetRequest, Void>)h, observer), this.f_executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer), this.f_executor);
    }

    protected Void keySet(CacheRequestHolder<KeySetRequest, Void> holder, StreamObserver<BytesValue> observer) {
        try {
            KeySetRequest request = (KeySetRequest)holder.getRequest();
            Serializer serializer = holder.getSerializer();
            Filter filter = this.ensureFilter(request.getFilter(), serializer);
            holder.runAsync((CompletionStage)holder.getAsyncCache().keySet(filter)).handleAsync((h, err) -> this.handleStream((CacheRequestHolder<?, ? extends Iterable<Binary>>)h, (Throwable)err, observer), this.f_executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer), this.f_executor);
        }
        catch (Throwable t) {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException((Throwable)t));
        }
        return VOID;
    }

    @Override
    public void nextKeySetPage(PageRequest request, StreamObserver<BytesValue> observer) {
        this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(h -> PagedQueryHelper.keysPagedQuery(h, this.getTransferThreshold()), this.f_executor).handleAsync((stream, err) -> this.handleStream((Stream)stream, (Throwable)err, (StreamObserver)observer), this.f_executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer));
    }

    @Override
    public void nextEntrySetPage(PageRequest request, StreamObserver<EntryResult> observer) {
        this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(h -> PagedQueryHelper.entryPagedQuery(h, this.getTransferThreshold()), this.f_executor).handleAsync((stream, err) -> this.handleStream((Stream)stream, (Throwable)err, (StreamObserver)observer), this.f_executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer));
    }

    @Override
    public CompletionStage<BytesValue> put(PutRequest request) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::put, this.f_executor);
    }

    protected CompletionStage<BytesValue> put(CacheRequestHolder<PutRequest, Void> holder) {
        PutRequest request = (PutRequest)holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        Binary value = holder.convertDown(request.getValue());
        return holder.getAsyncCache().invoke((Object)key, BinaryProcessors.put((Binary)value, (long)request.getTtl())).thenApplyAsync(arg_0 -> holder.deserializeToBytesValue(arg_0), this.f_executor);
    }

    @Override
    public CompletionStage<Empty> putAll(PutAllRequest request) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::putAll, this.f_executor);
    }

    protected CompletionStage<Empty> putAll(CacheRequestHolder<PutAllRequest, Void> holder) {
        PutAllRequest request = (PutAllRequest)holder.getRequest();
        if (request.getEntryCount() == 0) {
            return CompletableFuture.completedFuture(BinaryHelper.EMPTY);
        }
        HashMap<Binary, Binary> map = new HashMap<Binary, Binary>();
        for (Entry entry : request.getEntryList()) {
            Binary key = holder.convertKeyDown(entry.getKey());
            Binary value = holder.convertDown(entry.getValue());
            map.put(key, value);
        }
        if (holder.getCache().getCacheService() instanceof PartitionedService) {
            return this.partitionedPutAll(holder, map);
        }
        return this.plainPutAll((AsyncNamedCache<Binary, Binary>)holder.getAsyncCache(), map);
    }

    protected CompletionStage<Empty> partitionedPutAll(CacheRequestHolder<PutAllRequest, Void> holder, Map<Binary, Binary> map) {
        try {
            HashMap<Member, Map> mapByOwner = new HashMap<Member, Map>();
            PartitionedService service = (PartitionedService)holder.getCache().getCacheService();
            for (Map.Entry<Binary, Binary> entry : map.entrySet()) {
                Binary key = entry.getKey();
                Member member = service.getKeyOwner((Object)key);
                Map mapForMember2 = mapByOwner.computeIfAbsent(member, m -> new HashMap());
                mapForMember2.put(key, entry.getValue());
            }
            AsyncNamedCache cache = holder.getAsyncCache();
            CompletableFuture[] futures = (CompletableFuture[])mapByOwner.values().stream().map(mapForMember -> this.plainPutAll((AsyncNamedCache<Binary, Binary>)cache, (Map<Binary, Binary>)mapForMember)).map(CompletionStage::toCompletableFuture).toArray(CompletableFuture[]::new);
            return CompletableFuture.allOf(futures).thenApply(v -> BinaryHelper.EMPTY);
        }
        catch (Throwable t) {
            CompletableFuture<Empty> future = new CompletableFuture<Empty>();
            future.completeExceptionally(t);
            return future;
        }
    }

    protected CompletionStage<Empty> plainPutAll(AsyncNamedCache<Binary, Binary> cache, Map<Binary, Binary> map) {
        return cache.invokeAll(map.keySet(), BinaryProcessors.putAll(map)).thenApplyAsync(v -> BinaryHelper.EMPTY, this.f_executor);
    }

    @Override
    public CompletionStage<BytesValue> putIfAbsent(PutIfAbsentRequest request) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::putIfAbsent, this.f_executor);
    }

    protected CompletableFuture<BytesValue> putIfAbsent(CacheRequestHolder<PutIfAbsentRequest, Void> holder) {
        PutIfAbsentRequest request = (PutIfAbsentRequest)holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        Binary value = holder.convertDown(() -> ((PutIfAbsentRequest)request).getValue());
        return holder.getAsyncCache().invoke((Object)key, BinaryProcessors.putIfAbsent((Binary)value, (long)request.getTtl())).thenApplyAsync(arg_0 -> holder.deserializeToBytesValue(arg_0), this.f_executor);
    }

    @Override
    public CompletionStage<BytesValue> remove(RemoveRequest request) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(h -> h.runAsync(this.remove((CacheRequestHolder<RemoveRequest, Void>)h)), this.f_executor).thenApplyAsync(h -> h.toBytesValue((Binary)h.getResult()), this.f_executor);
    }

    protected CompletableFuture<Binary> remove(CacheRequestHolder<RemoveRequest, Void> holder) {
        RemoveRequest request = (RemoveRequest)holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        return holder.getAsyncCache().invoke((Object)key, (InvocableMap.EntryProcessor)BinaryProcessors.BinaryRemoveProcessor.INSTANCE).thenApplyAsync(arg_0 -> holder.fromCacheBinary(arg_0), this.f_executor);
    }

    @Override
    public CompletionStage<Empty> removeIndex(RemoveIndexRequest request) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(this::removeIndex, this.f_executor);
    }

    protected Empty removeIndex(CacheRequestHolder<RemoveIndexRequest, Void> holder) {
        RemoveIndexRequest request = (RemoveIndexRequest)holder.getRequest();
        NamedCache cache = holder.getCache();
        ValueExtractor<?, ?> extractor = this.ensureValueExtractor(request.getExtractor(), holder.getSerializer());
        cache.removeIndex(extractor);
        return BinaryHelper.EMPTY;
    }

    @Override
    public CompletionStage<BoolValue> removeMapping(RemoveMappingRequest request) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(this::removeMapping, this.f_executor).thenApplyAsync(h -> BoolValue.of((boolean)((Boolean)h.getDeserializedResult())), this.f_executor);
    }

    protected CompletionStage<CacheRequestHolder<RemoveMappingRequest, Boolean>> removeMapping(CacheRequestHolder<RemoveMappingRequest, Void> holder) {
        RemoveMappingRequest request = (RemoveMappingRequest)holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        Object value = BinaryHelper.fromByteString((ByteString)request.getValue(), (Serializer)holder.getSerializer());
        AsyncNamedCache cache = holder.getAsyncCache();
        return holder.runAsync((CompletionStage)cache.remove((Object)key, value));
    }

    @Override
    public CompletionStage<BytesValue> replace(ReplaceRequest request) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(h -> h.runAsync(this.replace((CacheRequestHolder<ReplaceRequest, Void>)h)), this.f_executor).thenApplyAsync(h -> h.toBytesValue((Binary)h.getResult()), this.f_executor);
    }

    protected CompletableFuture<Binary> replace(CacheRequestHolder<ReplaceRequest, Void> holder) {
        ReplaceRequest request = (ReplaceRequest)holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        Binary value = holder.convertDown(request.getValue());
        return holder.getAsyncCache().invoke((Object)key, this.castProcessor((InvocableMap.EntryProcessor<Binary, Binary, ?>)new BinaryProcessors.BinaryReplaceProcessor(value))).thenApplyAsync(arg_0 -> holder.fromCacheBinary(arg_0), this.f_executor);
    }

    @Override
    public CompletionStage<BoolValue> replaceMapping(ReplaceMappingRequest request) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(h -> h.runAsync(this.replaceMapping((CacheRequestHolder<ReplaceMappingRequest, Void>)h)), this.f_executor).thenApplyAsync(h -> this.toBoolValue((Binary)h.getResult(), h.getCacheSerializer()), this.f_executor);
    }

    protected CompletableFuture<Binary> replaceMapping(CacheRequestHolder<ReplaceMappingRequest, Void> holder) {
        ReplaceMappingRequest request = (ReplaceMappingRequest)holder.getRequest();
        Binary key = holder.convertKeyDown(request.getKey());
        Binary prevValue = holder.convertDown(request.getPreviousValue());
        Binary newValue = holder.convertDown(request.getNewValue());
        return holder.getAsyncCache().invoke((Object)key, this.castProcessor((InvocableMap.EntryProcessor<Binary, Binary, ?>)new BinaryProcessors.BinaryReplaceMappingProcessor(prevValue, newValue)));
    }

    @Override
    public CompletionStage<Int32Value> size(SizeRequest request) {
        CompletionStage<Int32Value> s = this.getAsyncCache(request.getScope(), request.getCache()).thenComposeAsync(AsyncNamedMap::size, this.f_executor).thenApplyAsync(Int32Value::of, this.f_executor);
        s.handle((sz, err) -> null);
        return s;
    }

    @Override
    public CompletionStage<Empty> truncate(TruncateRequest request) {
        return this.getAsyncCache(request.getScope(), request.getCache()).thenApplyAsync(cache -> this.execute(() -> cache.getNamedCache().truncate()), this.f_executor);
    }

    @Override
    public void values(ValuesRequest request, StreamObserver<BytesValue> observer) {
        this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenApplyAsync(h -> this.values((CacheRequestHolder<ValuesRequest, Void>)h, observer), this.f_executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer), this.f_executor);
    }

    protected Void values(CacheRequestHolder<ValuesRequest, Void> holder, StreamObserver<BytesValue> observer) {
        try {
            ValuesRequest request = (ValuesRequest)holder.getRequest();
            Serializer serializer = holder.getSerializer();
            Filter filter = this.ensureFilter(request.getFilter(), serializer);
            Comparator comparator = this.deserializeComparator(request.getComparator(), serializer);
            holder.runAsync((CompletionStage)holder.getAsyncCache().values(filter, comparator)).handleAsync((h, err) -> this.handleStream((CacheRequestHolder<?, ? extends Iterable<Binary>>)h, (Throwable)err, observer), this.f_executor).handleAsync((v, err) -> this.handleError((Throwable)err, (StreamObserver)observer), this.f_executor);
        }
        catch (Throwable t) {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException((Throwable)t));
        }
        return VOID;
    }

    protected <V> Empty empty(V value) {
        return BinaryHelper.EMPTY;
    }

    protected Empty execute(Runnable task) {
        task.run();
        return BinaryHelper.EMPTY;
    }

    protected <T> T execute(Callable<T> task) {
        try {
            return task.call();
        }
        catch (Throwable t) {
            throw ErrorsHelper.ensureStatusRuntimeException((Throwable)t);
        }
    }

    protected Void handleMapOfEntries(CacheRequestHolder<?, Map<Binary, Binary>> holder, Throwable err, StreamObserver<Entry> observer, boolean fDeserialize) {
        if (err == null) {
            this.handleStreamOfEntries(holder, ((Map)holder.getResult()).entrySet().stream(), observer, fDeserialize);
        } else {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException((Throwable)err));
        }
        return VOID;
    }

    protected Void handleSetOfEntries(CacheRequestHolder<?, Set<Map.Entry<Binary, Binary>>> holder, Throwable err, StreamObserver<Entry> observer, boolean fDeserialize) {
        if (err == null) {
            this.handleStreamOfEntries(holder, ((Set)holder.getResult()).stream(), observer, fDeserialize);
        } else {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException((Throwable)err));
        }
        return VOID;
    }

    protected void handleStreamOfEntries(CacheRequestHolder<?, ?> holder, Stream<Map.Entry<Binary, Binary>> entries, StreamObserver<Entry> observer, boolean fDeserialize) {
        try {
            entries.forEach(entry -> {
                Binary binValue = (Binary)entry.getValue();
                if (fDeserialize) {
                    binValue = (Binary)holder.fromCacheBinary((Binary)entry.getValue());
                }
                observer.onNext((Object)holder.toEntry((Binary)entry.getKey(), binValue));
            });
            observer.onCompleted();
        }
        catch (Throwable thrown) {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException((Throwable)thrown));
        }
    }

    protected Void handleStream(CacheRequestHolder<?, ? extends Iterable<Binary>> holder, Throwable err, StreamObserver<BytesValue> observer) {
        if (err == null) {
            try {
                Iterable iterable = (Iterable)holder.getResult();
                Stream<BytesValue> stream = StreamSupport.stream(iterable.spliterator(), false).map(bin -> BinaryHelper.toBytesValue((Binary)holder.convertUp(bin)));
                this.stream(observer, stream);
            }
            catch (Throwable t) {
                observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException((Throwable)t));
            }
        } else {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException((Throwable)err));
        }
        return VOID;
    }

    <T> void stream(StreamObserver<T> observer, Stream<? extends T> stream) {
        this.stream(observer, () -> stream);
    }

    <T> void stream(StreamObserver<T> observer, Supplier<Stream<? extends T>> supplier) {
        StreamObserver safe = SafeStreamObserver.ensureSafeObserver(observer);
        Throwable thrown = null;
        try {
            supplier.get().forEach(arg_0 -> ((StreamObserver)safe).onNext(arg_0));
        }
        catch (Throwable t) {
            thrown = t;
        }
        if (thrown == null) {
            safe.onCompleted();
        } else {
            safe.onError(thrown);
        }
    }

    protected <Resp> Void handleStream(Stream<Resp> stream, Throwable err, StreamObserver<Resp> observer) {
        if (err == null) {
            try {
                this.stream(observer, stream);
            }
            catch (Throwable t) {
                observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException((Throwable)t));
            }
        } else {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException((Throwable)err));
        }
        return VOID;
    }

    protected <Resp> Void handleError(Throwable err, StreamObserver<Resp> observer) {
        if (err != null) {
            observer.onError((Throwable)ErrorsHelper.ensureStatusRuntimeException((Throwable)err));
        }
        return VOID;
    }

    protected <Resp> Resp handleError(Resp response, Throwable err) {
        if (err != null) {
            throw ErrorsHelper.ensureStatusRuntimeException((Throwable)err);
        }
        return response;
    }

    protected BoolValue toBoolValue(Binary binary, Serializer serializer) {
        return BoolValue.of((boolean)((Boolean)BinaryHelper.fromBinary((Binary)binary, (Serializer)serializer)));
    }

    public ValueExtractor<?, ?> ensureValueExtractor(ByteString bytes, Serializer serializer) {
        if (bytes == null || bytes.isEmpty()) {
            throw Status.INVALID_ARGUMENT.withDescription("the request does not contain a serialized ValueExtractor").asRuntimeException();
        }
        return (ValueExtractor)BinaryHelper.fromByteString((ByteString)bytes, (Serializer)serializer);
    }

    @Override
    public <T> Filter<T> ensureFilter(ByteString bytes, Serializer serializer) {
        if (bytes == null || bytes.isEmpty()) {
            return Filters.always();
        }
        return (Filter)BinaryHelper.fromByteString((ByteString)bytes, (Serializer)serializer);
    }

    @Override
    public <T> Filter<T> getFilter(ByteString bytes, Serializer serializer) {
        if (bytes == null || bytes.isEmpty()) {
            return null;
        }
        return (Filter)BinaryHelper.fromByteString((ByteString)bytes, (Serializer)serializer);
    }

    public <T> Comparator<T> deserializeComparator(ByteString bytes, Serializer serializer) {
        if (bytes == null || bytes.isEmpty()) {
            return null;
        }
        return (Comparator)BinaryHelper.fromByteString((ByteString)bytes, (Serializer)serializer);
    }

    protected CompletionStage<AsyncNamedCache<Binary, Binary>> getAsyncCache(String scope, String cacheName) {
        return CompletableFuture.supplyAsync(() -> this.getPassThroughCache(scope, cacheName).async(), this.f_executor);
    }

    protected NamedCache<Binary, Binary> getPassThroughCache(String scope, String cacheName) {
        return this.getCache(scope, cacheName, true);
    }

    protected NamedCache<Binary, Binary> getCache(String scope, String cacheName, boolean passThru) {
        CacheService service;
        if (cacheName == null || cacheName.trim().length() == 0) {
            throw Status.INVALID_ARGUMENT.withDescription("invalid request, cache name cannot be null or empty").asRuntimeException();
        }
        ClassLoader loader = passThru ? NullImplementation.getClassLoader() : Base.getContextClassLoader();
        ConfigurableCacheFactory ccf = this.f_cacheFactorySupplier.apply(scope);
        NamedCache cache = ccf.ensureCache(cacheName, loader);
        boolean near = cache instanceof NearCache;
        if (near && (service = cache.getCacheService()) instanceof DistributedCacheService && ((DistributedCacheService)service).isLocalStorageEnabled()) {
            cache = ((NearCache)cache).getBackCache();
            near = false;
        }
        if (near) {
            return new ConvertingNamedCache(cache, NullImplementation.getConverter(), ExternalizableHelper.CONVERTER_STRIP_INTDECO, NullImplementation.getConverter(), NullImplementation.getConverter());
        }
        return cache;
    }

    protected InvocableMap.EntryProcessor<Binary, Binary, Binary> castProcessor(InvocableMap.EntryProcessor<Binary, Binary, ?> ep) {
        return ep;
    }

    public <Req> CompletionStage<CacheRequestHolder<Req, Void>> createHolderAsync(Req request, String sScope, String sCacheName, String format) {
        return CompletableFuture.supplyAsync(() -> this.createRequestHolder(request, sScope, sCacheName, format), this.f_executor);
    }

    @Override
    public <Req> CacheRequestHolder<Req, Void> createRequestHolder(Req request, String sScope, String sCacheName, String format) {
        Serializer serializerRequest;
        if (request == null) {
            throw Status.INVALID_ARGUMENT.withDescription("invalid request, the request cannot be null").asRuntimeException();
        }
        if (sCacheName == null || sCacheName.isEmpty()) {
            throw Status.INVALID_ARGUMENT.withDescription("invalid request, cache name cannot be null or empty").asRuntimeException();
        }
        NamedCache<Binary, Binary> c = this.getCache(sScope, sCacheName, true);
        NamedCache<Binary, Binary> nonPassThrough = this.getCache(sScope, sCacheName, false);
        AsyncNamedCache cache = c.async();
        CacheService cacheService = cache.getNamedCache().getCacheService();
        String cacheFormat = CacheRequestHolder.getCacheFormat((CacheService)cacheService);
        if (format == null || format.trim().isEmpty() || format.equals(cacheFormat)) {
            serializerRequest = cacheService.getSerializer();
        } else {
            ClassLoader loader = cacheService.getContextClassLoader();
            serializerRequest = this.f_serializerProducer.getNamedSerializer(format, loader);
        }
        if (serializerRequest == null) {
            throw Status.INVALID_ARGUMENT.withDescription("invalid request format, cannot find serializer with name '" + format + "'").asRuntimeException();
        }
        return new CacheRequestHolder(request, cache, () -> nonPassThrough, format, serializerRequest, this.f_executor);
    }

    static Executor createDefaultExecutor() {
        DefaultDaemonPoolDependencies deps = new DefaultDaemonPoolDependencies();
        deps.setName("GrpcNamedCacheProxy");
        deps.setThreadCountMin(1);
        deps.setThreadCount(1);
        deps.setThreadCountMax(Integer.MAX_VALUE);
        DaemonPoolExecutor executor = DaemonPoolExecutor.newInstance(deps);
        executor.start();
        return executor;
    }

    public static class DefaultDependencies
    extends GrpcServiceDependencies.DefaultDependencies
    implements Dependencies {
        private Function<String, ConfigurableCacheFactory> m_ccfSupplier;

        public DefaultDependencies() {
        }

        public DefaultDependencies(GrpcServiceDependencies deps) {
            super(deps);
        }

        public DefaultDependencies(Dependencies deps) {
            super(deps);
            if (deps != null) {
                this.m_ccfSupplier = deps.getCacheFactorySupplier().orElse(null);
            }
        }

        @Override
        public Optional<Function<String, ConfigurableCacheFactory>> getCacheFactorySupplier() {
            return Optional.ofNullable(this.m_ccfSupplier);
        }

        public void setConfigurableCacheFactorySupplier(Function<String, ConfigurableCacheFactory> ccfSupplier) {
            this.m_ccfSupplier = ccfSupplier;
        }
    }

    public static interface Dependencies
    extends GrpcServiceDependencies {
        public Optional<Function<String, ConfigurableCacheFactory>> getCacheFactorySupplier();
    }
}

