/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.grpc.proxy.common.v0;

import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.Empty;
import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.grpc.BinaryHelper;
import com.oracle.coherence.grpc.ErrorsHelper;
import com.oracle.coherence.grpc.SafeStreamObserver;
import com.oracle.coherence.grpc.messages.cache.v0.AddIndexRequest;
import com.oracle.coherence.grpc.messages.cache.v0.AggregateRequest;
import com.oracle.coherence.grpc.messages.cache.v0.ClearRequest;
import com.oracle.coherence.grpc.messages.cache.v0.DestroyRequest;
import com.oracle.coherence.grpc.messages.cache.v0.GetAllRequest;
import com.oracle.coherence.grpc.messages.cache.v0.MapListenerRequest;
import com.oracle.coherence.grpc.messages.cache.v0.MapListenerResponse;
import com.oracle.coherence.grpc.messages.cache.v0.PutAllRequest;
import com.oracle.coherence.grpc.messages.cache.v0.RemoveIndexRequest;
import com.oracle.coherence.grpc.proxy.common.BaseGrpcServiceImpl;
import com.oracle.coherence.grpc.proxy.common.v0.MapListenerProxy;
import com.oracle.coherence.grpc.proxy.common.v0.NamedCacheService;
import com.oracle.coherence.grpc.proxy.common.v0.ResponseHandlers;
import com.oracle.coherence.grpc.v0.CacheRequestHolder;
import com.tangosol.internal.util.processor.BinaryProcessors;
import com.tangosol.io.Serializer;
import com.tangosol.net.AsyncNamedCache;
import com.tangosol.net.CacheService;
import com.tangosol.net.ConfigurableCacheFactory;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PartitionedService;
import com.tangosol.net.Service;
import com.tangosol.util.Binary;
import com.tangosol.util.Filter;
import com.tangosol.util.Filters;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.ValueExtractor;
import com.tangosol.util.filter.AlwaysFilter;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

public abstract class BaseNamedCacheServiceImpl
extends BaseGrpcServiceImpl
implements NamedCacheService {
    public static final String MBEAN_NAME = "type=GrpcNamedCacheProxy";
    public static final String INVALID_REQUEST_MESSAGE = "invalid request, the request cannot be null";
    public static final String MISSING_PROCESSOR_MESSAGE = "the request does not contain a serialized entry processor";
    public static final String MISSING_EXTRACTOR_MESSAGE = "the request does not contain a serialized ValueExtractor";
    public static final String MISSING_AGGREGATOR_MESSAGE = "the request does not contain a serialized ValueExtractor";
    private final long m_nEventsHeartbeat;
    protected volatile boolean m_fClosed = false;
    protected final Lock f_lock = new ReentrantLock();

    public BaseNamedCacheServiceImpl(NamedCacheService.Dependencies dependencies) {
        super(dependencies, MBEAN_NAME, "GrpcNamedCacheProxy");
        this.m_nEventsHeartbeat = dependencies.getEventsHeartbeat();
    }

    @Override
    public void close() {
        this.f_lock.lock();
        try {
            this.m_fClosed = true;
        }
        finally {
            this.f_lock.unlock();
        }
        this.f_listCloseable.forEach(closeable -> {
            try {
                closeable.close();
            }
            catch (Exception e) {
                Logger.err((Throwable)e);
            }
        });
        this.f_listCloseable.clear();
    }

    protected Empty addIndex(CacheRequestHolder<AddIndexRequest, Void> holder) {
        AddIndexRequest request = (AddIndexRequest)holder.getRequest();
        NamedCache cache = holder.getCache();
        Serializer serializer = holder.getSerializer();
        ValueExtractor<?, ?> extractor = this.ensureValueExtractor(request.getExtractor(), serializer);
        Comparator comparator = (Comparator)BinaryHelper.fromByteString((ByteString)request.getComparator(), (Serializer)serializer);
        cache.addIndex(extractor, request.getSorted(), comparator);
        return BinaryHelper.EMPTY;
    }

    protected CompletionStage<BytesValue> aggregateWithFilter(AggregateRequest request, Executor executor) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(h -> this.aggregateWithFilter((CacheRequestHolder<AggregateRequest, Void>)h, executor), executor).handleAsync(ResponseHandlers::handleError, executor);
    }

    protected CompletionStage<BytesValue> aggregateWithFilter(CacheRequestHolder<AggregateRequest, Void> holder, Executor executor) {
        AggregateRequest request = (AggregateRequest)holder.getRequest();
        ByteString filterBytes = request.getFilter();
        AlwaysFilter filter = filterBytes.isEmpty() ? Filters.always() : (Filter)BinaryHelper.fromByteString((ByteString)filterBytes, (Serializer)holder.getSerializer());
        ByteString processorBytes = request.getAggregator();
        InvocableMap.EntryAggregator aggregator = (InvocableMap.EntryAggregator)BinaryHelper.fromByteString((ByteString)processorBytes, (Serializer)holder.getSerializer());
        return holder.runAsync((CompletionStage)holder.getAsyncCache().aggregate((Filter)filter, aggregator)).thenApplyAsync(h -> BinaryHelper.toBytesValue((Object)h.getResult(), (Serializer)h.getSerializer()), executor);
    }

    protected CompletionStage<BytesValue> aggregateWithKeys(AggregateRequest request, Executor executor) {
        return this.createHolderAsync(request, request.getScope(), request.getCache(), request.getFormat()).thenComposeAsync(h -> this.aggregateWithKeys((CacheRequestHolder<AggregateRequest, Void>)h, executor), executor).handleAsync(ResponseHandlers::handleError, executor);
    }

    protected CompletionStage<BytesValue> aggregateWithKeys(CacheRequestHolder<AggregateRequest, Void> holder, Executor executor) {
        AggregateRequest request = (AggregateRequest)holder.getRequest();
        List keys = request.getKeysList().stream().map(arg_0 -> holder.convertKeyDown(arg_0)).collect(Collectors.toList());
        InvocableMap.EntryAggregator aggregator = (InvocableMap.EntryAggregator)BinaryHelper.fromByteString((ByteString)request.getAggregator(), (Serializer)holder.getSerializer());
        return holder.runAsync((CompletionStage)holder.getAsyncCache().aggregate(keys, aggregator)).thenApplyAsync(h -> BinaryHelper.toBytesValue((Object)h.getResult(), (Serializer)h.getSerializer()), executor);
    }

    @Override
    public void clear(ClearRequest request, StreamObserver<Empty> observer) {
        StreamObserver safeObserver = SafeStreamObserver.ensureSafeObserver(observer);
        try {
            NamedCache<Binary, Binary> cache = this.getPassThroughCache(request.getScope(), request.getCache());
            cache.clear();
            ResponseHandlers.handleUnary(Empty.getDefaultInstance(), null, safeObserver);
        }
        catch (Throwable t) {
            ResponseHandlers.handleUnary(Empty.getDefaultInstance(), t, safeObserver);
        }
    }

    @Override
    public void destroy(DestroyRequest request, StreamObserver<Empty> observer) {
        StreamObserver safeObserver = SafeStreamObserver.ensureSafeObserver(observer);
        String sCacheName = request.getCache();
        try {
            if (sCacheName == null || sCacheName.trim().isEmpty()) {
                throw Status.INVALID_ARGUMENT.withDescription("invalid request, cache name cannot be null or empty").asRuntimeException();
            }
            Logger.finer((String)("Destroying cache " + sCacheName));
            ConfigurableCacheFactory ccf = this.getCCF(request.getScope());
            NamedCache cache = ccf.ensureCache(sCacheName, null);
            ccf.destroyCache(cache);
            ResponseHandlers.handleUnary(Empty.getDefaultInstance(), null, safeObserver);
            Logger.info((String)("Destroyed cache " + sCacheName));
        }
        catch (Throwable t) {
            Logger.err((String)("Caught exception destroying cache \"" + sCacheName + "\""), (Throwable)t);
            ResponseHandlers.handleUnary(Empty.getDefaultInstance(), t, safeObserver);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public StreamObserver<MapListenerRequest> events(StreamObserver<MapListenerResponse> observer) {
        this.f_lock.lock();
        try {
            if (this.m_fClosed) {
                throw Status.UNAVAILABLE.asRuntimeException();
            }
            MapListenerProxy proxy = new MapListenerProxy(this, (StreamObserver<MapListenerResponse>)SafeStreamObserver.ensureSafeObserver(observer), this.m_nEventsHeartbeat);
            this.addCloseable(proxy);
            MapListenerProxy mapListenerProxy = proxy;
            return mapListenerProxy;
        }
        finally {
            this.f_lock.unlock();
        }
    }

    protected List<Binary> convertKeysToBinary(CacheRequestHolder<GetAllRequest, Void> holder) {
        GetAllRequest request = (GetAllRequest)holder.getRequest();
        return request.getKeyList().stream().map(arg_0 -> holder.convertKeyDown(arg_0)).collect(Collectors.toList());
    }

    protected CompletionStage<Empty> partitionedPutAll(CacheRequestHolder<PutAllRequest, Void> holder, Map<Binary, Binary> map) {
        try {
            HashMap<Member, Map> mapByOwner = new HashMap<Member, Map>();
            PartitionedService service = (PartitionedService)holder.getCache().getCacheService();
            for (Map.Entry<Binary, Binary> entry : map.entrySet()) {
                Binary key = entry.getKey();
                Member member = service.getKeyOwner((Object)key);
                Map mapForMember2 = mapByOwner.computeIfAbsent(member, m -> new HashMap());
                mapForMember2.put(key, entry.getValue());
            }
            AsyncNamedCache cache = holder.getAsyncCache();
            long cMillis = ((PutAllRequest)holder.getRequest()).getTtl();
            CompletableFuture[] futures = (CompletableFuture[])mapByOwner.values().stream().map(mapForMember -> this.plainPutAll((AsyncNamedCache<Binary, Binary>)cache, (Map<Binary, Binary>)mapForMember, cMillis)).map(CompletionStage::toCompletableFuture).toArray(CompletableFuture[]::new);
            return CompletableFuture.allOf(futures).thenApply(v -> BinaryHelper.EMPTY);
        }
        catch (Throwable t) {
            CompletableFuture<Empty> future = new CompletableFuture<Empty>();
            future.completeExceptionally(t);
            return future;
        }
    }

    protected CompletionStage<Empty> plainPutAll(AsyncNamedCache<Binary, Binary> cache, Map<Binary, Binary> map, long cMillis) {
        return cache.invokeAll(map.keySet(), BinaryProcessors.putAll(map, (long)cMillis)).thenApplyAsync(v -> BinaryHelper.EMPTY, this.f_executor);
    }

    protected Empty removeIndex(CacheRequestHolder<RemoveIndexRequest, Void> holder) {
        RemoveIndexRequest request = (RemoveIndexRequest)holder.getRequest();
        NamedCache cache = holder.getCache();
        ValueExtractor<?, ?> extractor = this.ensureValueExtractor(request.getExtractor(), holder.getSerializer());
        cache.removeIndex(extractor);
        return BinaryHelper.EMPTY;
    }

    protected <V> Empty empty(V ignored) {
        return BinaryHelper.EMPTY;
    }

    protected Empty execute(Runnable task) {
        task.run();
        return BinaryHelper.EMPTY;
    }

    protected <T> T execute(Callable<T> task) {
        try {
            return task.call();
        }
        catch (Throwable t) {
            throw ErrorsHelper.ensureStatusRuntimeException((Throwable)t);
        }
    }

    protected BoolValue toBoolValue(Binary binary, Serializer serializer) {
        return BoolValue.of((boolean)((Boolean)BinaryHelper.fromBinary((Binary)binary, (Serializer)serializer)));
    }

    public ValueExtractor<?, ?> ensureValueExtractor(ByteString bytes, Serializer serializer) {
        if (bytes == null || bytes.isEmpty()) {
            throw Status.INVALID_ARGUMENT.withDescription("the request does not contain a serialized ValueExtractor").asRuntimeException();
        }
        return (ValueExtractor)BinaryHelper.fromByteString((ByteString)bytes, (Serializer)serializer);
    }

    @Override
    public <T> Filter<T> ensureFilter(ByteString bytes, Serializer serializer) {
        if (bytes == null || bytes.isEmpty()) {
            return Filters.always();
        }
        return (Filter)BinaryHelper.fromByteString((ByteString)bytes, (Serializer)serializer);
    }

    @Override
    public <T> Filter<T> getFilter(ByteString bytes, Serializer serializer) {
        if (bytes == null || bytes.isEmpty()) {
            return null;
        }
        return (Filter)BinaryHelper.fromByteString((ByteString)bytes, (Serializer)serializer);
    }

    public <T> Comparator<T> deserializeComparator(ByteString bytes, Serializer serializer) {
        if (bytes == null || bytes.isEmpty()) {
            return null;
        }
        return (Comparator)BinaryHelper.fromByteString((ByteString)bytes, (Serializer)serializer);
    }

    protected CompletionStage<AsyncNamedCache<Binary, Binary>> getAsyncCache(String scope, String cacheName) {
        return CompletableFuture.supplyAsync(() -> this.getPassThroughCache(scope, cacheName).async(), this.f_executor);
    }

    protected InvocableMap.EntryProcessor<Binary, Binary, Binary> castProcessor(InvocableMap.EntryProcessor<Binary, Binary, ?> ep) {
        return ep;
    }

    public <Req> CompletionStage<CacheRequestHolder<Req, Void>> createHolderAsync(Req request, String sScope, String sCacheName, String format) {
        return CompletableFuture.supplyAsync(() -> this.createRequestHolder(request, sScope, sCacheName, format), this.f_executor);
    }

    @Override
    public <Req> CacheRequestHolder<Req, Void> createRequestHolder(Req request, String sScope, String sCacheName, String format) {
        if (request == null) {
            throw Status.INVALID_ARGUMENT.withDescription(INVALID_REQUEST_MESSAGE).asRuntimeException();
        }
        if (sCacheName == null || sCacheName.isEmpty()) {
            throw Status.INVALID_ARGUMENT.withDescription("invalid request, cache name cannot be null or empty").asRuntimeException();
        }
        NamedCache<Binary, Binary> c = this.getCache(sScope, sCacheName, true);
        NamedCache<Binary, Binary> nonPassThrough = this.getCache(sScope, sCacheName, false);
        AsyncNamedCache cache = c.async();
        CacheService cacheService = cache.getNamedCache().getCacheService();
        String cacheFormat = CacheRequestHolder.getCacheFormat((Service)cacheService);
        Serializer serializer = this.getSerializer(format, cacheFormat, () -> ((CacheService)cacheService).getSerializer(), () -> ((CacheService)cacheService).getContextClassLoader());
        return new CacheRequestHolder(request, cache, () -> nonPassThrough, format, serializer, this.f_executor);
    }
}

