/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.grpc.client.common.v1;

import com.google.protobuf.Any;
import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.Empty;
import com.google.protobuf.Int32Value;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.oracle.coherence.common.base.Exceptions;
import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.grpc.MaybeByteString;
import com.oracle.coherence.grpc.SafeStreamObserver;
import com.oracle.coherence.grpc.client.common.AsyncNamedCacheClient;
import com.oracle.coherence.grpc.client.common.BaseNamedCacheClientChannel;
import com.oracle.coherence.grpc.client.common.FutureStreamObserver;
import com.oracle.coherence.grpc.client.common.GrpcConnection;
import com.oracle.coherence.grpc.client.common.NamedCacheClientChannel;
import com.oracle.coherence.grpc.client.common.StreamStreamObserver;
import com.oracle.coherence.grpc.messages.cache.v1.EnsureCacheRequest;
import com.oracle.coherence.grpc.messages.cache.v1.ExecuteRequest;
import com.oracle.coherence.grpc.messages.cache.v1.IndexRequest;
import com.oracle.coherence.grpc.messages.cache.v1.KeyOrFilter;
import com.oracle.coherence.grpc.messages.cache.v1.KeysOrFilter;
import com.oracle.coherence.grpc.messages.cache.v1.MapEventMessage;
import com.oracle.coherence.grpc.messages.cache.v1.MapListenerRequest;
import com.oracle.coherence.grpc.messages.cache.v1.NamedCacheRequest;
import com.oracle.coherence.grpc.messages.cache.v1.NamedCacheRequestType;
import com.oracle.coherence.grpc.messages.cache.v1.NamedCacheResponse;
import com.oracle.coherence.grpc.messages.cache.v1.PutAllRequest;
import com.oracle.coherence.grpc.messages.cache.v1.PutRequest;
import com.oracle.coherence.grpc.messages.cache.v1.QueryRequest;
import com.oracle.coherence.grpc.messages.cache.v1.ReplaceMappingRequest;
import com.oracle.coherence.grpc.messages.cache.v1.ResponseType;
import com.oracle.coherence.grpc.messages.common.v1.BinaryKeyAndValue;
import com.oracle.coherence.grpc.messages.common.v1.CollectionOfBytesValues;
import com.oracle.coherence.grpc.messages.common.v1.OptionalValue;
import com.tangosol.net.cache.CacheEvent;
import com.tangosol.util.SimpleMapEntry;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class NamedCacheClientChannel_V1
extends BaseNamedCacheClientChannel {
    private final Lock f_lock = new ReentrantLock();
    private final int f_nCacheId;
    private EventObserver m_eventObserver;

    public NamedCacheClientChannel_V1(AsyncNamedCacheClient.Dependencies dependencies, GrpcConnection connection) {
        super(dependencies, connection);
        EnsureCacheRequest ensureCache = EnsureCacheRequest.newBuilder().setCache(dependencies.getName()).build();
        NamedCacheRequest request = NamedCacheRequest.newBuilder().setType(NamedCacheRequestType.EnsureCache).setMessage(Any.pack((Message)ensureCache)).build();
        NamedCacheResponse response = (NamedCacheResponse)connection.send((Message)request);
        this.f_nCacheId = response.getCacheId();
    }

    @Override
    public CompletableFuture<Void> addIndex(ByteString extractor, boolean fOrdered, ByteString comparator) {
        IndexRequest request = IndexRequest.newBuilder().setAdd(true).setExtractor(extractor).setSorted(fOrdered).setComparator(Objects.requireNonNullElse(comparator, ByteString.EMPTY)).build();
        return this.poll(NamedCacheRequestType.Index, (Message)request).thenApply(r -> VOID);
    }

    @Override
    public CompletableFuture<Void> addMapListener(ByteString key, boolean fLite, boolean fPriming, boolean fSynchronous) {
        MapListenerRequest request = MapListenerRequest.newBuilder().setKeyOrFilter(KeyOrFilter.newBuilder().setKey(key).build()).setLite(fLite).setPriming(fPriming).setSubscribe(true).setSynchronous(fSynchronous).build();
        return this.poll(NamedCacheRequestType.MapListener, (Message)request).thenApply(r -> VOID);
    }

    @Override
    public CompletableFuture<Void> addMapListener(ByteString filterBytes, long nFilterId, boolean fLite, ByteString triggerBytes, boolean fSynchronous) {
        MapListenerRequest.Builder builder = MapListenerRequest.newBuilder().setKeyOrFilter(KeyOrFilter.newBuilder().setFilter(filterBytes).build()).setFilterId(nFilterId).setLite(fLite).setSynchronous(fSynchronous).setSubscribe(true);
        if (triggerBytes != null) {
            builder.setTrigger(triggerBytes);
        }
        return this.poll(NamedCacheRequestType.MapListener, (Message)builder.build()).thenApply(r -> VOID);
    }

    @Override
    public CompletableFuture<BytesValue> aggregate(List<ByteString> collKeys, ByteString aggregator, long nDeadline) {
        CollectionOfBytesValues binaryKeys = CollectionOfBytesValues.newBuilder().addAllValues(collKeys).build();
        ExecuteRequest request = ExecuteRequest.newBuilder().setAgent(aggregator).setKeys(KeysOrFilter.newBuilder().setKeys(binaryKeys).build()).build();
        return this.poll(NamedCacheRequestType.Aggregate, (Message)request).thenApply(this::unpackBytes);
    }

    @Override
    public CompletableFuture<BytesValue> aggregate(ByteString filter, ByteString aggregator, long nDeadline) {
        ExecuteRequest request = ExecuteRequest.newBuilder().setAgent(aggregator).setKeys(KeysOrFilter.newBuilder().setFilter(filter).build()).build();
        return this.poll(NamedCacheRequestType.Aggregate, (Message)request).thenApply(this::unpackBytes);
    }

    @Override
    public CompletableFuture<Void> clear() {
        return this.poll(NamedCacheRequestType.Clear).thenApply(r -> VOID);
    }

    @Override
    public CompletionStage<BoolValue> containsEntry(ByteString key, ByteString value) {
        return this.poll(NamedCacheRequestType.ContainsEntry, (Message)this.toBinaryKeyAndValue(key, value)).thenApply(this::unpackBoolean);
    }

    @Override
    public CompletionStage<BoolValue> containsKey(ByteString key) {
        return this.poll(NamedCacheRequestType.ContainsKey, (Message)this.toBinaryValue(key)).thenApply(this::unpackBoolean);
    }

    @Override
    public CompletionStage<BoolValue> containsValue(ByteString value) {
        return this.poll(NamedCacheRequestType.ContainsValue, (Message)this.toBinaryValue(value)).thenApply(this::unpackBoolean);
    }

    @Override
    public CompletableFuture<Void> destroy() {
        return this.poll(NamedCacheRequestType.Destroy).thenApply(r -> VOID);
    }

    @Override
    public CompletableFuture<Map<ByteString, ByteString>> entrySet(ByteString filter) {
        QueryRequest request = QueryRequest.newBuilder().setFilter(filter).build();
        MapStreamObserver observer = new MapStreamObserver();
        this.poll(NamedCacheRequestType.QueryEntries, (Message)request, observer);
        return observer.future();
    }

    @Override
    public CompletableFuture<Map<ByteString, ByteString>> entrySet(ByteString filter, ByteString comparator) {
        QueryRequest request = QueryRequest.newBuilder().setFilter(filter).setComparator(comparator).build();
        MapStreamObserver observer = new MapStreamObserver();
        this.poll(NamedCacheRequestType.QueryEntries, (Message)request, observer);
        return observer.future();
    }

    @Override
    public CompletionStage<MaybeByteString> get(ByteString key) {
        return this.poll(NamedCacheRequestType.Get, (Message)this.toBinaryValue(key)).thenApply(response -> {
            OptionalValue optional = this.unpackMessage((NamedCacheResponse)response, (Class)OptionalValue.class);
            if (optional.getPresent()) {
                return MaybeByteString.ofNullable((ByteString)optional.getValue());
            }
            return MaybeByteString.empty();
        });
    }

    @Override
    public Stream<Map.Entry<ByteString, ByteString>> getAll(Iterable<ByteString> keys) {
        CollectionOfBytesValues values = CollectionOfBytesValues.newBuilder().addAllValues(keys).build();
        StreamStreamObserver<NamedCacheResponse> observer = new StreamStreamObserver<NamedCacheResponse>();
        this.poll(NamedCacheRequestType.GetAll, (Message)values, observer);
        try {
            return observer.future().get().stream().map(resp -> {
                BinaryKeyAndValue keyAndValue = this.unpackMessage((NamedCacheResponse)resp, (Class)BinaryKeyAndValue.class);
                return new SimpleMapEntry((Object)keyAndValue.getKey(), (Object)keyAndValue.getValue());
            });
        }
        catch (InterruptedException | ExecutionException e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
    }

    @Override
    public NamedCacheClientChannel.EntrySetPage getEntriesPage(ByteString cookie) {
        try {
            PageResultObserver observer = new PageResultObserver();
            BytesValue cookieBytes = cookie == null ? BytesValue.getDefaultInstance() : BytesValue.of((ByteString)cookie);
            this.poll(NamedCacheRequestType.PageOfEntries, (Message)cookieBytes, observer);
            List list = (List)observer.future().get();
            if (list.isEmpty()) {
                return new NamedCacheClientChannel.EntrySetPage(null, List.of());
            }
            Iterator iterator = list.iterator();
            BytesValue nextCookie = (BytesValue)((Any)iterator.next()).unpack(BytesValue.class);
            ArrayList<Map.Entry<ByteString, ByteString>> entries = new ArrayList<Map.Entry<ByteString, ByteString>>();
            while (iterator.hasNext()) {
                BinaryKeyAndValue keyAndValue = (BinaryKeyAndValue)((Any)iterator.next()).unpack(BinaryKeyAndValue.class);
                entries.add((Map.Entry<ByteString, ByteString>)new SimpleMapEntry((Object)keyAndValue.getKey(), (Object)keyAndValue.getValue()));
            }
            return new NamedCacheClientChannel.EntrySetPage(nextCookie.getValue(), entries);
        }
        catch (Exception e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
    }

    @Override
    public Stream<BytesValue> getKeysPage(ByteString cookie) {
        try {
            PageResultObserver observer = new PageResultObserver();
            BytesValue cookieBytes = cookie == null ? BytesValue.getDefaultInstance() : BytesValue.of((ByteString)cookie);
            this.poll(NamedCacheRequestType.PageOfKeys, (Message)cookieBytes, observer);
            List list = (List)observer.future().get();
            return list.stream().map(this::unpackBytes);
        }
        catch (Exception e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
    }

    @Override
    public CompletionStage<BytesValue> invoke(ByteString key, ByteString processor, long nDeadline) {
        ExecuteRequest request = ExecuteRequest.newBuilder().setAgent(processor).setKeys(KeysOrFilter.newBuilder().setKey(key).build()).build();
        MapStreamObserver observer = new MapStreamObserver();
        this.poll(NamedCacheRequestType.Invoke, (Message)request, observer);
        return observer.future().thenApply(map -> {
            ByteString bytes = (ByteString)map.get(key);
            return bytes == null ? BytesValue.of((ByteString)ByteString.empty()) : BytesValue.of((ByteString)((ByteString)map.get(key)));
        });
    }

    @Override
    public CompletableFuture<Map<ByteString, ByteString>> invokeAll(Collection<ByteString> colKeys, ByteString processor, long nDeadline) {
        CollectionOfBytesValues binaryKeys = CollectionOfBytesValues.newBuilder().addAllValues(colKeys).build();
        ExecuteRequest request = ExecuteRequest.newBuilder().setAgent(processor).setKeys(KeysOrFilter.newBuilder().setKeys(binaryKeys).build()).build();
        MapStreamObserver observer = new MapStreamObserver();
        this.poll(NamedCacheRequestType.Invoke, (Message)request, observer);
        return observer.future();
    }

    @Override
    public CompletableFuture<Void> invokeAll(Collection<ByteString> colKeys, ByteString processor, Consumer<Map.Entry<ByteString, ByteString>> callback) {
        CollectionOfBytesValues binaryKeys = CollectionOfBytesValues.newBuilder().addAllValues(colKeys).build();
        ExecuteRequest request = ExecuteRequest.newBuilder().setAgent(processor).setKeys(KeysOrFilter.newBuilder().setKeys(binaryKeys).build()).build();
        BiConsumer<ByteString, ByteString> consumer = (k, v) -> callback.accept((Map.Entry<ByteString, ByteString>)new SimpleMapEntry(k, v));
        ForwardingStreamObserver observer = new ForwardingStreamObserver(consumer);
        this.poll(NamedCacheRequestType.Invoke, (Message)request, observer);
        return observer.future().thenApply(r -> VOID);
    }

    @Override
    public CompletableFuture<Void> invokeAll(Collection<ByteString> colKeys, ByteString processor, BiConsumer<ByteString, ByteString> callback) {
        CollectionOfBytesValues binaryKeys = CollectionOfBytesValues.newBuilder().addAllValues(colKeys).build();
        ExecuteRequest request = ExecuteRequest.newBuilder().setAgent(processor).setKeys(KeysOrFilter.newBuilder().setKeys(binaryKeys).build()).build();
        ForwardingStreamObserver observer = new ForwardingStreamObserver(callback);
        this.poll(NamedCacheRequestType.Invoke, (Message)request, observer);
        return observer.future().thenApply(r -> VOID);
    }

    @Override
    public CompletableFuture<Map<ByteString, ByteString>> invokeAll(ByteString filter, ByteString processor) {
        ExecuteRequest request = ExecuteRequest.newBuilder().setAgent(processor).setKeys(KeysOrFilter.newBuilder().setFilter(filter).build()).build();
        MapStreamObserver observer = new MapStreamObserver();
        this.poll(NamedCacheRequestType.Invoke, (Message)request, observer);
        return observer.future();
    }

    @Override
    public CompletableFuture<Void> invokeAll(ByteString filter, ByteString processor, Consumer<Map.Entry<ByteString, ByteString>> callback) {
        ExecuteRequest request = ExecuteRequest.newBuilder().setAgent(processor).setKeys(KeysOrFilter.newBuilder().setFilter(filter).build()).build();
        BiConsumer<ByteString, ByteString> consumer = (k, v) -> callback.accept((Map.Entry<ByteString, ByteString>)new SimpleMapEntry(k, v));
        ForwardingStreamObserver observer = new ForwardingStreamObserver(consumer);
        this.poll(NamedCacheRequestType.Invoke, (Message)request, observer);
        return observer.future().thenApply(r -> VOID);
    }

    @Override
    public CompletableFuture<Void> invokeAll(ByteString filter, ByteString processor, BiConsumer<ByteString, ByteString> callback) {
        ExecuteRequest request = ExecuteRequest.newBuilder().setAgent(processor).setKeys(KeysOrFilter.newBuilder().setFilter(filter).build()).build();
        ForwardingStreamObserver observer = new ForwardingStreamObserver(callback);
        this.poll(NamedCacheRequestType.Invoke, (Message)request, observer);
        return observer.future().thenApply(r -> VOID);
    }

    @Override
    public CompletionStage<BoolValue> isEmpty() {
        return this.poll(NamedCacheRequestType.IsEmpty).thenApply(this::unpackBoolean);
    }

    @Override
    public CompletionStage<BoolValue> isReady() {
        return this.poll(NamedCacheRequestType.IsReady).thenApply(this::unpackBoolean);
    }

    @Override
    public CompletableFuture<Empty> putAll(Map<ByteString, ByteString> map, long ttl) {
        List<BinaryKeyAndValue> list = map.entrySet().stream().map(this::toBinaryKeyAndValue).toList();
        PutAllRequest request = PutAllRequest.newBuilder().addAllEntries(list).setTtl(ttl).build();
        return this.poll(NamedCacheRequestType.PutAll, (Message)request).thenApply(r -> Empty.getDefaultInstance());
    }

    @Override
    public CompletionStage<BytesValue> put(ByteString key, ByteString value, long ttl) {
        return this.poll(NamedCacheRequestType.Put, (Message)this.putRequest(key, value, ttl)).thenApply(this::unpackBytes);
    }

    @Override
    public CompletionStage<BytesValue> putIfAbsent(ByteString key, ByteString value) {
        return this.poll(NamedCacheRequestType.PutIfAbsent, (Message)this.putRequest(key, value)).thenApply(this::unpackBytes);
    }

    @Override
    public CompletionStage<BytesValue> remove(ByteString key) {
        return this.poll(NamedCacheRequestType.Remove, (Message)this.toBinaryValue(key)).thenApply(this::unpackBytes);
    }

    @Override
    public CompletionStage<BoolValue> remove(ByteString key, ByteString value) {
        return this.poll(NamedCacheRequestType.RemoveMapping, (Message)this.toBinaryKeyAndValue(key, value)).thenApply(this::unpackBoolean);
    }

    @Override
    public CompletionStage<Empty> removeIndex(ByteString extractor) {
        IndexRequest request = IndexRequest.newBuilder().setAdd(false).setExtractor(extractor).build();
        return this.poll(NamedCacheRequestType.Index, (Message)request).thenApply(r -> Empty.getDefaultInstance());
    }

    @Override
    public CompletableFuture<Void> removeMapListener(ByteString key, boolean fPriming) {
        MapListenerRequest request = MapListenerRequest.newBuilder().setKeyOrFilter(KeyOrFilter.newBuilder().setKey(key).build()).setPriming(fPriming).setSubscribe(false).build();
        return this.poll(NamedCacheRequestType.MapListener, (Message)request).thenApply(r -> VOID);
    }

    @Override
    public CompletableFuture<Void> removeMapListener(ByteString filterBytes, long nFilterId, ByteString triggerBytes) {
        MapListenerRequest.Builder builder = MapListenerRequest.newBuilder().setKeyOrFilter(KeyOrFilter.newBuilder().setFilter(filterBytes).build()).setFilterId(nFilterId).setSubscribe(false);
        if (triggerBytes != null) {
            builder.setTrigger(triggerBytes);
        }
        return this.poll(NamedCacheRequestType.MapListener, (Message)builder.build()).thenApply(r -> VOID);
    }

    @Override
    public CompletionStage<BytesValue> replace(ByteString key, ByteString value) {
        return this.poll(NamedCacheRequestType.Replace, (Message)this.toBinaryKeyAndValue(key, value)).thenApply(this::unpackBytes);
    }

    @Override
    public CompletionStage<BoolValue> replaceMapping(ByteString key, ByteString oldValue, ByteString newValue) {
        ReplaceMappingRequest request = ReplaceMappingRequest.newBuilder().setKey(key).setPreviousValue(oldValue).setNewValue(newValue).build();
        return this.poll(NamedCacheRequestType.ReplaceMapping, (Message)request).thenApply(this::unpackBoolean);
    }

    @Override
    public void setEventDispatcher(NamedCacheClientChannel.EventDispatcher dispatcher) {
        block4: {
            this.f_lock.lock();
            try {
                if (this.m_eventObserver == null) {
                    this.m_eventObserver = new EventObserver(dispatcher);
                    this.f_connection.addResponseObserver(new GrpcConnection.Listener<NamedCacheResponse>(this.m_eventObserver, m -> m.getCacheId() == this.getCacheId()));
                    break block4;
                }
                throw new IllegalStateException("Event dispatcher is already set");
            }
            finally {
                this.f_lock.unlock();
            }
        }
    }

    @Override
    public CompletionStage<Int32Value> size() {
        return this.poll(NamedCacheRequestType.Size).thenApply(this::unpackInteger);
    }

    @Override
    public CompletionStage<Empty> truncate() {
        return this.poll(NamedCacheRequestType.Truncate).thenApply(r -> Empty.getDefaultInstance());
    }

    protected int getCacheId() {
        return this.f_nCacheId;
    }

    protected BytesValue toBinaryValue(ByteString bytes) {
        return BytesValue.newBuilder().setValue(bytes).build();
    }

    protected BinaryKeyAndValue toBinaryKeyAndValue(Map.Entry<ByteString, ByteString> entry) {
        return this.toBinaryKeyAndValue(entry.getKey(), entry.getValue());
    }

    protected BinaryKeyAndValue toBinaryKeyAndValue(ByteString key, ByteString value) {
        return BinaryKeyAndValue.newBuilder().setKey(key).setValue(value).build();
    }

    protected PutRequest putRequest(ByteString key, ByteString value) {
        return this.putRequest(key, value, 0L);
    }

    protected PutRequest putRequest(ByteString key, ByteString value, long ttl) {
        return PutRequest.newBuilder().setKey(key).setValue(value).setTtl(ttl).build();
    }

    protected BoolValue unpackBoolean(NamedCacheResponse response) {
        return this.unpackMessage(response, BoolValue.class);
    }

    protected Int32Value unpackInteger(NamedCacheResponse response) {
        return this.unpackMessage(response, Int32Value.class);
    }

    protected BytesValue unpackBytes(NamedCacheResponse response) {
        return this.unpackMessage(response, BytesValue.class);
    }

    protected BytesValue unpackBytes(Any any) {
        try {
            return (BytesValue)any.unpack(BytesValue.class);
        }
        catch (InvalidProtocolBufferException e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
    }

    protected <T extends Message> T unpackMessage(NamedCacheResponse response, Class<T> type) {
        try {
            return (T)response.getMessage().unpack(type);
        }
        catch (InvalidProtocolBufferException e) {
            throw Exceptions.ensureRuntimeException((Throwable)e);
        }
    }

    protected CompletableFuture<NamedCacheResponse> poll(NamedCacheRequestType type) {
        return this.poll(type, null);
    }

    protected CompletableFuture<NamedCacheResponse> poll(NamedCacheRequestType type, Message message) {
        NamedCacheRequest.Builder builder = NamedCacheRequest.newBuilder().setCacheId(this.f_nCacheId).setType(type);
        if (message != null) {
            builder.setMessage(Any.pack((Message)message));
        } else {
            builder.setMessage(Any.pack((Message)Empty.getDefaultInstance()));
        }
        return this.f_connection.poll((Message)builder.build());
    }

    protected void poll(NamedCacheRequestType type, Message message, StreamObserver<NamedCacheResponse> observer) {
        NamedCacheRequest.Builder builder = NamedCacheRequest.newBuilder().setCacheId(this.f_nCacheId).setType(type);
        if (message != null) {
            builder.setMessage(Any.pack((Message)message));
        }
        this.f_connection.poll((Message)builder.build(), SafeStreamObserver.ensureSafeObserver(observer));
    }

    protected static class MapStreamObserver
    extends FutureStreamObserver<NamedCacheResponse, Map<ByteString, ByteString>> {
        public MapStreamObserver() {
            super(new CompletableFuture(), new HashMap(), MapStreamObserver::onNext);
        }

        protected static Map<ByteString, ByteString> onNext(NamedCacheResponse response, Map<ByteString, ByteString> map) {
            try {
                BinaryKeyAndValue keyAndValue = (BinaryKeyAndValue)response.getMessage().unpack(BinaryKeyAndValue.class);
                map.put(keyAndValue.getKey(), keyAndValue.getValue());
                return map;
            }
            catch (InvalidProtocolBufferException e) {
                throw Exceptions.ensureRuntimeException((Throwable)e);
            }
        }
    }

    protected static class PageResultObserver
    extends FutureStreamObserver<NamedCacheResponse, List<Any>> {
        public PageResultObserver() {
            super(new CompletableFuture(), new ArrayList(), PageResultObserver::onNext);
        }

        protected static List<Any> onNext(NamedCacheResponse response, List<Any> list) {
            list.add(response.getMessage());
            return list;
        }
    }

    protected static class ForwardingStreamObserver
    extends FutureStreamObserver<NamedCacheResponse, BiConsumer<ByteString, ByteString>> {
        public ForwardingStreamObserver(BiConsumer<ByteString, ByteString> consumer) {
            super(new CompletableFuture(), consumer, ForwardingStreamObserver::onNext);
        }

        protected static BiConsumer<ByteString, ByteString> onNext(NamedCacheResponse response, BiConsumer<ByteString, ByteString> consumer) {
            try {
                BinaryKeyAndValue keyAndValue = (BinaryKeyAndValue)response.getMessage().unpack(BinaryKeyAndValue.class);
                consumer.accept(keyAndValue.getKey(), keyAndValue.getValue());
                return consumer;
            }
            catch (InvalidProtocolBufferException e) {
                throw Exceptions.ensureRuntimeException((Throwable)e);
            }
        }
    }

    protected class EventObserver
    implements StreamObserver<NamedCacheResponse> {
        private final NamedCacheClientChannel.EventDispatcher m_dispatcher;

        public EventObserver(NamedCacheClientChannel.EventDispatcher dispatcher) {
            this.m_dispatcher = dispatcher;
        }

        public void onNext(NamedCacheResponse response) {
            if (this.m_dispatcher != null) {
                ResponseType type = response.getType();
                switch (type) {
                    case MapEvent: {
                        MapEventMessage event = NamedCacheClientChannel_V1.this.unpackMessage(response, MapEventMessage.class);
                        CacheEvent.TransformationState transformState = CacheEvent.TransformationState.valueOf((String)event.getTransformationState().toString());
                        this.m_dispatcher.dispatch(event.getFilterIdsList(), event.getId(), event.getKey(), event.getOldValue(), event.getNewValue(), event.getSynthetic(), event.getPriming(), transformState);
                        break;
                    }
                    case Destroyed: {
                        this.m_dispatcher.onDestroy();
                        break;
                    }
                    case Truncated: {
                        this.m_dispatcher.onTruncate();
                        break;
                    }
                    default: {
                        Logger.err((String)("Event observer received unexpected NamedCacheResponse type: " + String.valueOf(type)));
                    }
                }
            }
        }

        public void onError(Throwable t) {
            Logger.err((String)"Event observer received an error", (Throwable)t);
        }

        public void onCompleted() {
        }
    }
}

