/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.cache.impl;

import com.hazelcast.cache.HazelcastCacheManager;
import com.hazelcast.cache.impl.CacheEventData;
import com.hazelcast.cache.impl.CacheEventListenerAdaptor;
import com.hazelcast.cache.impl.CacheProxyUtil;
import com.hazelcast.cache.impl.CacheSyncListenerCompleter;
import com.hazelcast.client.HazelcastClientNotActiveException;
import com.hazelcast.client.cache.impl.AbstractClientCacheProxyBase;
import com.hazelcast.client.cache.impl.CallbackAwareClientDelegatingFuture;
import com.hazelcast.client.cache.impl.HazelcastClientCacheManager;
import com.hazelcast.client.cache.impl.OneShotExecutionCallback;
import com.hazelcast.client.impl.clientside.ClientMessageDecoder;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.CacheAddEntryListenerCodec;
import com.hazelcast.client.impl.protocol.codec.CacheClearCodec;
import com.hazelcast.client.impl.protocol.codec.CacheGetAndRemoveCodec;
import com.hazelcast.client.impl.protocol.codec.CacheGetAndReplaceCodec;
import com.hazelcast.client.impl.protocol.codec.CachePutCodec;
import com.hazelcast.client.impl.protocol.codec.CachePutIfAbsentCodec;
import com.hazelcast.client.impl.protocol.codec.CacheRemoveAllCodec;
import com.hazelcast.client.impl.protocol.codec.CacheRemoveAllKeysCodec;
import com.hazelcast.client.impl.protocol.codec.CacheRemoveCodec;
import com.hazelcast.client.impl.protocol.codec.CacheReplaceCodec;
import com.hazelcast.client.impl.protocol.codec.CacheSetExpiryPolicyCodec;
import com.hazelcast.client.spi.ClientContext;
import com.hazelcast.client.spi.ClientListenerService;
import com.hazelcast.client.spi.ClientPartitionService;
import com.hazelcast.client.spi.EventHandler;
import com.hazelcast.client.spi.impl.ClientInvocation;
import com.hazelcast.client.spi.impl.ClientInvocationFuture;
import com.hazelcast.client.util.ClientDelegatingFuture;
import com.hazelcast.config.CacheConfig;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.nio.IOUtil;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.util.ExceptionUtil;
import java.io.Closeable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.CacheException;
import javax.cache.CacheManager;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.event.CacheEntryListener;
import javax.cache.expiry.ExpiryPolicy;

abstract class AbstractClientInternalCacheProxy<K, V>
extends AbstractClientCacheProxyBase<K, V>
implements CacheSyncListenerCompleter {
    private static final long MAX_COMPLETION_LATCH_WAIT_TIME = TimeUnit.MINUTES.toMillis(5L);
    private static final long COMPLETION_LATCH_WAIT_TIME_STEP = TimeUnit.SECONDS.toMillis(1L);
    private static final ClientMessageDecoder GET_AND_REMOVE_RESPONSE_DECODER = new ClientMessageDecoder(){

        @Override
        public <T> T decodeClientMessage(ClientMessage clientMessage) {
            return (T)CacheGetAndRemoveCodec.decodeResponse((ClientMessage)clientMessage).response;
        }
    };
    private static final ClientMessageDecoder REMOVE_RESPONSE_DECODER = new ClientMessageDecoder(){

        @Override
        public <T> T decodeClientMessage(ClientMessage clientMessage) {
            return (T)Boolean.valueOf(CacheRemoveCodec.decodeResponse((ClientMessage)clientMessage).response);
        }
    };
    private static final ClientMessageDecoder REPLACE_RESPONSE_DECODER = new ClientMessageDecoder(){

        @Override
        public <T> T decodeClientMessage(ClientMessage clientMessage) {
            return (T)CacheReplaceCodec.decodeResponse((ClientMessage)clientMessage).response;
        }
    };
    private static final ClientMessageDecoder GET_AND_REPLACE_RESPONSE_DECODER = new ClientMessageDecoder(){

        @Override
        public <T> T decodeClientMessage(ClientMessage clientMessage) {
            return (T)CacheGetAndReplaceCodec.decodeResponse((ClientMessage)clientMessage).response;
        }
    };
    private static final ClientMessageDecoder PUT_RESPONSE_DECODER = new ClientMessageDecoder(){

        @Override
        public <T> T decodeClientMessage(ClientMessage clientMessage) {
            return (T)CachePutCodec.decodeResponse((ClientMessage)clientMessage).response;
        }
    };
    private static final ClientMessageDecoder PUT_IF_ABSENT_RESPONSE_DECODER = new ClientMessageDecoder(){

        @Override
        public <T> T decodeClientMessage(ClientMessage clientMessage) {
            return (T)Boolean.valueOf(CachePutIfAbsentCodec.decodeResponse((ClientMessage)clientMessage).response);
        }
    };
    private static final ClientMessageDecoder SET_EXPIRY_POLICY_DECODER = new ClientMessageDecoder(){

        @Override
        public <T> T decodeClientMessage(ClientMessage clientMessage) {
            return (T)Boolean.valueOf(CacheSetExpiryPolicyCodec.decodeResponse((ClientMessage)clientMessage).response);
        }
    };
    protected final AtomicReference<HazelcastClientCacheManager> cacheManagerRef = new AtomicReference();
    protected int partitionCount;
    private final ConcurrentMap<CacheEntryListenerConfiguration, String> asyncListenerRegistrations = new ConcurrentHashMap<CacheEntryListenerConfiguration, String>();
    private final ConcurrentMap<CacheEntryListenerConfiguration, String> syncListenerRegistrations = new ConcurrentHashMap<CacheEntryListenerConfiguration, String>();
    private final ConcurrentMap<String, Closeable> closeableListeners = new ConcurrentHashMap<String, Closeable>();
    private final ConcurrentMap<Integer, CountDownLatch> syncLocks = new ConcurrentHashMap<Integer, CountDownLatch>();

    AbstractClientInternalCacheProxy(CacheConfig<K, V> cacheConfig, ClientContext context) {
        super(cacheConfig, context);
    }

    @Override
    protected void onInitialize() {
        super.onInitialize();
        ClientPartitionService partitionService = this.getContext().getPartitionService();
        this.partitionCount = partitionService.getPartitionCount();
    }

    @Override
    public void setCacheManager(HazelcastCacheManager cacheManager) {
        assert (cacheManager instanceof HazelcastClientCacheManager);
        if (this.cacheManagerRef.get() == cacheManager) {
            return;
        }
        if (!this.cacheManagerRef.compareAndSet(null, (HazelcastClientCacheManager)cacheManager)) {
            if (this.cacheManagerRef.get() == cacheManager) {
                return;
            }
            throw new IllegalStateException("Cannot overwrite a Cache's CacheManager.");
        }
    }

    @Override
    public void resetCacheManager() {
        this.cacheManagerRef.set(null);
    }

    @Override
    protected void postDestroy() {
        CacheManager cacheManager = this.cacheManagerRef.get();
        if (cacheManager != null) {
            cacheManager.destroyCache(this.getName());
        }
        this.resetCacheManager();
    }

    @Override
    public void close() {
        if (this.statisticsEnabled) {
            this.statsHandler.clear();
        }
        super.close();
    }

    @Override
    protected void onDestroy() {
        if (this.statisticsEnabled) {
            this.statsHandler.clear();
        }
        super.onDestroy();
    }

    protected ClientInvocationFuture invoke(ClientMessage req, int partitionId, int completionId) {
        boolean completionOperation;
        boolean bl = completionOperation = completionId != -1;
        if (completionOperation) {
            this.registerCompletionLatch(completionId, 1);
        }
        try {
            ClientInvocation clientInvocation = new ClientInvocation(this.getClient(), req, this.name, partitionId);
            ClientInvocationFuture future = clientInvocation.invoke();
            if (completionOperation) {
                this.waitCompletionLatch(completionId, future);
            }
            return future;
        }
        catch (Throwable e) {
            if (e instanceof IllegalStateException) {
                this.close();
            }
            if (completionOperation) {
                this.deregisterCompletionLatch(completionId);
            }
            throw ExceptionUtil.rethrowAllowedTypeFirst(e, CacheException.class);
        }
    }

    protected ClientInvocationFuture invoke(ClientMessage req, Data keyData, int completionId) {
        int partitionId = this.getContext().getPartitionService().getPartitionId(keyData);
        return this.invoke(req, partitionId, completionId);
    }

    protected <T> T getSafely(Future<T> future) {
        try {
            return future.get();
        }
        catch (Throwable throwable) {
            throw ExceptionUtil.rethrow(throwable);
        }
    }

    protected <T> ICompletableFuture<T> getAndRemoveAsyncInternal(K key) {
        long startNanos = this.nowInNanosOrDefault();
        this.ensureOpen();
        CacheProxyUtil.validateNotNull(key);
        CacheProxyUtil.validateConfiguredTypes(this.cacheConfig, key);
        Data keyData = this.toData(key);
        ClientDelegatingFuture<T> delegatingFuture = this.getAndRemoveInternal(keyData, false);
        ExecutionCallback callback = !this.statisticsEnabled ? null : this.statsHandler.newOnRemoveCallback(true, startNanos);
        this.onGetAndRemoveAsyncInternal(key, keyData, delegatingFuture, callback);
        return delegatingFuture;
    }

    protected <T> ClientDelegatingFuture<T> getAndRemoveSyncInternal(K key) {
        this.ensureOpen();
        CacheProxyUtil.validateNotNull(key);
        CacheProxyUtil.validateConfiguredTypes(this.cacheConfig, key);
        Data keyData = this.toData(key);
        ClientDelegatingFuture<T> delegatingFuture = this.getAndRemoveInternal(keyData, true);
        this.onGetAndRemoveAsyncInternal(key, keyData, delegatingFuture, null);
        return delegatingFuture;
    }

    private <T> ClientDelegatingFuture<T> getAndRemoveInternal(Data keyData, boolean withCompletionEvent) {
        int completionId = withCompletionEvent ? this.nextCompletionId() : -1;
        ClientMessage request = CacheGetAndRemoveCodec.encodeRequest(this.nameWithPrefix, keyData, completionId);
        ClientInvocationFuture future = this.invoke(request, keyData, completionId);
        return this.newDelegatingFuture(future, GET_AND_REMOVE_RESPONSE_DECODER);
    }

    protected <T> void onGetAndRemoveAsyncInternal(K key, Data keyData, ClientDelegatingFuture<T> delegatingFuture, ExecutionCallback<T> callback) {
        AbstractClientInternalCacheProxy.addCallback(delegatingFuture, callback);
    }

    protected Object removeAsyncInternal(K key, V oldValue, boolean hasOldValue, boolean withCompletionEvent, boolean async) {
        long startNanos = this.nowInNanosOrDefault();
        this.ensureOpen();
        if (hasOldValue) {
            CacheProxyUtil.validateNotNull(key, oldValue);
            CacheProxyUtil.validateConfiguredTypes(this.cacheConfig, key, oldValue);
        } else {
            CacheProxyUtil.validateNotNull(key);
            CacheProxyUtil.validateConfiguredTypes(this.cacheConfig, key);
        }
        Data keyData = this.toData(key);
        Data oldValueData = this.toData(oldValue);
        int completionId = withCompletionEvent ? this.nextCompletionId() : -1;
        ClientMessage request = CacheRemoveCodec.encodeRequest(this.nameWithPrefix, keyData, oldValueData, completionId);
        ClientInvocationFuture future = this.invoke(request, keyData, completionId);
        ClientDelegatingFuture delegatingFuture = this.newDelegatingFuture(future, REMOVE_RESPONSE_DECODER);
        if (async) {
            ExecutionCallback callback = !this.statisticsEnabled ? null : this.statsHandler.newOnRemoveCallback(false, startNanos);
            this.onRemoveAsyncInternal(key, keyData, delegatingFuture, callback);
            return delegatingFuture;
        }
        try {
            Object result = delegatingFuture.get();
            this.onRemoveSyncInternal(key, keyData);
            return result;
        }
        catch (Throwable t) {
            throw ExceptionUtil.rethrow(t);
        }
    }

    public void onRemoveSyncInternal(Object key, Data keyData) {
    }

    protected void onRemoveAsyncInternal(Object key, Data keyData, ClientDelegatingFuture future, ExecutionCallback callback) {
        AbstractClientInternalCacheProxy.addCallback(future, callback);
    }

    protected boolean replaceSyncInternal(K key, V oldValue, V newValue, ExpiryPolicy expiryPolicy, boolean hasOldValue) {
        long startNanos = this.nowInNanosOrDefault();
        ICompletableFuture future = this.replaceAsyncInternal(key, oldValue, newValue, expiryPolicy, hasOldValue, true, false);
        try {
            boolean replaced = (Boolean)future.get();
            if (this.statisticsEnabled) {
                this.statsHandler.onReplace(false, startNanos, replaced);
            }
            return replaced;
        }
        catch (Throwable e) {
            throw ExceptionUtil.rethrowAllowedTypeFirst(e, CacheException.class);
        }
    }

    protected <T> ICompletableFuture<T> replaceAsyncInternal(K key, V oldValue, V newValue, ExpiryPolicy expiryPolicy, boolean hasOldValue, boolean withCompletionEvent, boolean async) {
        long startNanos = this.nowInNanosOrDefault();
        this.ensureOpen();
        if (hasOldValue) {
            CacheProxyUtil.validateNotNull(key, oldValue, newValue);
            CacheProxyUtil.validateConfiguredTypes(this.cacheConfig, key, oldValue, newValue);
        } else {
            CacheProxyUtil.validateNotNull(key, newValue);
            CacheProxyUtil.validateConfiguredTypes(this.cacheConfig, key, newValue);
        }
        Data keyData = this.toData(key);
        Data oldValueData = this.toData(oldValue);
        Data newValueData = this.toData(newValue);
        Data expiryPolicyData = this.toData(expiryPolicy);
        int completionId = withCompletionEvent ? this.nextCompletionId() : -1;
        ClientMessage request = CacheReplaceCodec.encodeRequest(this.nameWithPrefix, keyData, oldValueData, newValueData, expiryPolicyData, completionId);
        ClientInvocationFuture future = this.invoke(request, keyData, completionId);
        ClientDelegatingFuture delegatingFuture = this.newDelegatingFuture(future, REPLACE_RESPONSE_DECODER);
        ExecutionCallback callback = async && this.statisticsEnabled ? this.statsHandler.newOnReplaceCallback(startNanos) : null;
        this.onReplaceInternalAsync(key, newValue, keyData, newValueData, delegatingFuture, callback);
        return delegatingFuture;
    }

    protected <T> void onReplaceInternalAsync(K key, V value, Data keyData, Data valueData, ClientDelegatingFuture<T> delegatingFuture, ExecutionCallback<T> callback) {
        AbstractClientInternalCacheProxy.addCallback(delegatingFuture, callback);
    }

    protected <T> ICompletableFuture<T> replaceAndGetAsyncInternal(K key, V oldValue, V newValue, ExpiryPolicy expiryPolicy, boolean hasOldValue, boolean withCompletionEvent, boolean async) {
        long startNanos = this.nowInNanosOrDefault();
        this.ensureOpen();
        if (hasOldValue) {
            CacheProxyUtil.validateNotNull(key, oldValue, newValue);
            CacheProxyUtil.validateConfiguredTypes(this.cacheConfig, key, oldValue, newValue);
        } else {
            CacheProxyUtil.validateNotNull(key, newValue);
            CacheProxyUtil.validateConfiguredTypes(this.cacheConfig, key, newValue);
        }
        Data keyData = this.toData(key);
        Data newValueData = this.toData(newValue);
        Data expiryPolicyData = this.toData(expiryPolicy);
        int completionId = withCompletionEvent ? this.nextCompletionId() : -1;
        ClientMessage request = CacheGetAndReplaceCodec.encodeRequest(this.nameWithPrefix, keyData, newValueData, expiryPolicyData, completionId);
        ClientInvocationFuture future = this.invoke(request, keyData, completionId);
        ClientDelegatingFuture delegatingFuture = this.newDelegatingFuture(future, GET_AND_REPLACE_RESPONSE_DECODER);
        ExecutionCallback callback = async && this.statisticsEnabled ? this.statsHandler.newOnReplaceCallback(startNanos) : null;
        this.onReplaceAndGetAsync(key, newValue, keyData, newValueData, delegatingFuture, callback);
        return delegatingFuture;
    }

    protected <T> void onReplaceAndGetAsync(K key, V value, Data keyData, Data valueData, ClientDelegatingFuture<T> delegatingFuture, ExecutionCallback<T> callback) {
        AbstractClientInternalCacheProxy.addCallback(delegatingFuture, callback);
    }

    protected static <T> void addCallback(ClientDelegatingFuture<T> delegatingFuture, ExecutionCallback<T> callback) {
        if (callback == null) {
            return;
        }
        delegatingFuture.andThen(callback);
    }

    private ClientInvocationFuture putInternal(Data keyData, Data valueData, Data expiryPolicyData, boolean isGet, boolean withCompletionEvent) {
        int completionId = withCompletionEvent ? this.nextCompletionId() : -1;
        ClientMessage request = CachePutCodec.encodeRequest(this.nameWithPrefix, keyData, valueData, expiryPolicyData, isGet, completionId);
        return this.invoke(request, keyData, completionId);
    }

    protected V putSyncInternal(K key, V value, ExpiryPolicy expiryPolicy, boolean isGet) {
        long startNanos = this.nowInNanosOrDefault();
        this.ensureOpen();
        CacheProxyUtil.validateNotNull(key, value);
        CacheProxyUtil.validateConfiguredTypes(this.cacheConfig, key, value);
        Data keyData = this.toData(key);
        Data valueData = this.toData(value);
        Data expiryPolicyData = this.toData(expiryPolicy);
        try {
            ClientInvocationFuture invocationFuture = this.putInternal(keyData, valueData, expiryPolicyData, isGet, true);
            ClientDelegatingFuture delegatingFuture = this.newDelegatingFuture(invocationFuture, PUT_RESPONSE_DECODER);
            Object response = delegatingFuture.get();
            if (this.statisticsEnabled) {
                this.statsHandler.onPut(isGet, startNanos, response != null);
            }
            Object t = response;
            return (V)t;
        }
        catch (Throwable e) {
            throw ExceptionUtil.rethrowAllowedTypeFirst(e, CacheException.class);
        }
        finally {
            this.onPutSyncInternal(key, value, keyData, valueData);
        }
    }

    protected void onPutSyncInternal(K key, V value, Data keyData, Data valueData) {
    }

    protected ClientDelegatingFuture putAsyncInternal(K key, V value, ExpiryPolicy expiryPolicy, boolean isGet, boolean withCompletionEvent, OneShotExecutionCallback<V> callback) {
        this.ensureOpen();
        CacheProxyUtil.validateNotNull(key, value);
        CacheProxyUtil.validateConfiguredTypes(this.cacheConfig, key, value);
        Data keyData = this.toData(key);
        Data valueData = this.toData(value);
        Data expiryPolicyData = this.toData(expiryPolicy);
        ClientInvocationFuture invocationFuture = this.putInternal(keyData, valueData, expiryPolicyData, isGet, withCompletionEvent);
        return this.wrapPutAsyncFuture(key, value, keyData, valueData, invocationFuture, callback);
    }

    protected ClientDelegatingFuture<V> wrapPutAsyncFuture(K key, V value, Data keyData, Data valueData, ClientInvocationFuture invocationFuture, OneShotExecutionCallback<V> callback) {
        if (callback == null) {
            return this.newDelegatingFuture(invocationFuture, PUT_RESPONSE_DECODER);
        }
        CallbackAwareClientDelegatingFuture<V> future = new CallbackAwareClientDelegatingFuture<V>(invocationFuture, this.getSerializationService(), PUT_RESPONSE_DECODER, callback);
        future.andThen(callback);
        return future;
    }

    protected OneShotExecutionCallback<V> newStatsCallbackOrNull(boolean isGet) {
        if (!this.statisticsEnabled) {
            return null;
        }
        return this.statsHandler.newOnPutCallback(isGet, System.nanoTime());
    }

    protected boolean setExpiryPolicyInternal(K key, ExpiryPolicy expiryPolicy) {
        this.ensureOpen();
        CacheProxyUtil.validateNotNull(key);
        CacheProxyUtil.validateNotNull(expiryPolicy);
        Data keyData = this.toData(key);
        Data expiryPolicyData = this.toData(expiryPolicy);
        List<Data> list = Collections.singletonList(keyData);
        ClientMessage request = CacheSetExpiryPolicyCodec.encodeRequest(this.nameWithPrefix, list, expiryPolicyData);
        ClientInvocationFuture future = this.invoke(request, keyData, -1);
        ClientDelegatingFuture delegatingFuture = this.newDelegatingFuture(future, SET_EXPIRY_POLICY_DECODER);
        try {
            return (Boolean)delegatingFuture.get();
        }
        catch (Throwable e) {
            throw ExceptionUtil.rethrowAllowedTypeFirst(e, CacheException.class);
        }
    }

    protected Object putIfAbsentInternal(K key, V value, ExpiryPolicy expiryPolicy, boolean withCompletionEvent, boolean async) {
        long startNanos = this.nowInNanosOrDefault();
        this.ensureOpen();
        CacheProxyUtil.validateNotNull(key, value);
        CacheProxyUtil.validateConfiguredTypes(this.cacheConfig, key, value);
        Data keyData = this.toData(key);
        Data valueData = this.toData(value);
        Data expiryPolicyData = this.toData(expiryPolicy);
        int completionId = withCompletionEvent ? this.nextCompletionId() : -1;
        ClientMessage request = CachePutIfAbsentCodec.encodeRequest(this.nameWithPrefix, keyData, valueData, expiryPolicyData, completionId);
        ClientInvocationFuture future = this.invoke(request, keyData, completionId);
        ClientDelegatingFuture<Boolean> delegatingFuture = this.newDelegatingFuture(future, PUT_IF_ABSENT_RESPONSE_DECODER);
        if (async) {
            ExecutionCallback<Boolean> callback = !this.statisticsEnabled ? null : this.statsHandler.newOnPutIfAbsentCallback(startNanos);
            this.onPutIfAbsentAsyncInternal(key, value, keyData, valueData, delegatingFuture, callback);
            return delegatingFuture;
        }
        try {
            Object response = delegatingFuture.get();
            if (this.statisticsEnabled) {
                this.statsHandler.onPutIfAbsent(startNanos, (Boolean)response);
            }
            this.onPutIfAbsentSyncInternal(key, value, keyData, valueData);
            return response;
        }
        catch (Throwable e) {
            throw ExceptionUtil.rethrowAllowedTypeFirst(e, CacheException.class);
        }
    }

    protected void onPutIfAbsentAsyncInternal(K key, V value, Data keyData, Data valueData, ClientDelegatingFuture<Boolean> delegatingFuture, ExecutionCallback<Boolean> callback) {
        AbstractClientInternalCacheProxy.addCallback(delegatingFuture, callback);
    }

    protected void onPutIfAbsentSyncInternal(K key, V value, Data keyData, Data valueData) {
    }

    protected void removeAllKeysInternal(Set<? extends K> keys, Collection<Data> dataKeys, long startNanos) {
        int partitionCount = this.getContext().getPartitionService().getPartitionCount();
        int completionId = this.nextCompletionId();
        this.registerCompletionLatch(completionId, partitionCount);
        ClientMessage request = CacheRemoveAllKeysCodec.encodeRequest(this.nameWithPrefix, dataKeys, completionId);
        try {
            this.invoke(request);
            this.waitCompletionLatch(completionId, null);
            if (this.statisticsEnabled) {
                this.statsHandler.onBatchRemove(startNanos, dataKeys.size());
            }
        }
        catch (Throwable t) {
            this.deregisterCompletionLatch(completionId);
            throw ExceptionUtil.rethrowAllowedTypeFirst(t, CacheException.class);
        }
    }

    protected void removeAllInternal() {
        int partitionCount = this.getContext().getPartitionService().getPartitionCount();
        int completionId = this.nextCompletionId();
        this.registerCompletionLatch(completionId, partitionCount);
        ClientMessage request = CacheRemoveAllCodec.encodeRequest(this.nameWithPrefix, completionId);
        try {
            this.invoke(request);
            this.waitCompletionLatch(completionId, null);
            if (this.statisticsEnabled) {
                this.statsHandler.getStatistics().setLastUpdateTime(System.currentTimeMillis());
            }
        }
        catch (Throwable t) {
            this.deregisterCompletionLatch(completionId);
            throw ExceptionUtil.rethrowAllowedTypeFirst(t, CacheException.class);
        }
    }

    protected void clearInternal() {
        ClientMessage request = CacheClearCodec.encodeRequest(this.nameWithPrefix);
        try {
            this.invoke(request);
            if (this.statisticsEnabled) {
                this.statsHandler.getStatistics().setLastUpdateTime(System.currentTimeMillis());
            }
        }
        catch (Throwable t) {
            throw ExceptionUtil.rethrowAllowedTypeFirst(t, CacheException.class);
        }
    }

    protected void addListenerLocally(String regId, CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration, CacheEventListenerAdaptor<K, V> adaptor) {
        if (cacheEntryListenerConfiguration.isSynchronous()) {
            this.syncListenerRegistrations.putIfAbsent(cacheEntryListenerConfiguration, regId);
        } else {
            this.asyncListenerRegistrations.putIfAbsent(cacheEntryListenerConfiguration, regId);
        }
        CacheEntryListener<K, V> entryListener = adaptor.getCacheEntryListener();
        if (entryListener instanceof Closeable) {
            this.closeableListeners.putIfAbsent(regId, (Closeable)entryListener);
        }
    }

    protected String removeListenerLocally(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
        ConcurrentMap<CacheEntryListenerConfiguration, String> regs = cacheEntryListenerConfiguration.isSynchronous() ? this.syncListenerRegistrations : this.asyncListenerRegistrations;
        String registrationId = (String)regs.remove(cacheEntryListenerConfiguration);
        if (registrationId != null) {
            Closeable closeable = (Closeable)this.closeableListeners.remove(registrationId);
            IOUtil.closeResource(closeable);
        }
        return registrationId;
    }

    protected String getListenerIdLocal(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
        ConcurrentMap<CacheEntryListenerConfiguration, String> regs = cacheEntryListenerConfiguration.isSynchronous() ? this.syncListenerRegistrations : this.asyncListenerRegistrations;
        return (String)regs.get(cacheEntryListenerConfiguration);
    }

    private void deregisterAllCacheEntryListener(Collection<String> listenerRegistrations) {
        ClientListenerService listenerService = this.getContext().getListenerService();
        for (String regId : listenerRegistrations) {
            listenerService.deregisterListener(regId);
        }
    }

    @Override
    protected void closeListeners() {
        this.deregisterAllCacheEntryListener(this.syncListenerRegistrations.values());
        this.deregisterAllCacheEntryListener(this.asyncListenerRegistrations.values());
        this.syncListenerRegistrations.clear();
        this.asyncListenerRegistrations.clear();
        this.notifyAndClearSyncListenerLatches();
        for (Closeable closeable : this.closeableListeners.values()) {
            IOUtil.closeResource(closeable);
        }
    }

    private void notifyAndClearSyncListenerLatches() {
        Collection latches = this.syncLocks.values();
        Iterator iterator = latches.iterator();
        while (iterator.hasNext()) {
            CountDownLatch latch = (CountDownLatch)iterator.next();
            iterator.remove();
            while (latch.getCount() > 0L) {
                latch.countDown();
            }
        }
    }

    @Override
    public void countDownCompletionLatch(int countDownLatchId) {
        if (countDownLatchId != -1) {
            CountDownLatch countDownLatch = (CountDownLatch)this.syncLocks.get(countDownLatchId);
            if (countDownLatch == null) {
                return;
            }
            countDownLatch.countDown();
            if (countDownLatch.getCount() == 0L) {
                this.deregisterCompletionLatch(countDownLatchId);
            }
        }
    }

    protected Integer registerCompletionLatch(Integer countDownLatchId, int count) {
        if (!this.syncListenerRegistrations.isEmpty()) {
            int size = this.syncListenerRegistrations.size();
            CountDownLatch countDownLatch = new CountDownLatch(count * size);
            this.syncLocks.put(countDownLatchId, countDownLatch);
            return countDownLatchId;
        }
        return -1;
    }

    protected void deregisterCompletionLatch(Integer countDownLatchId) {
        if (countDownLatchId != -1) {
            this.syncLocks.remove(countDownLatchId);
        }
    }

    protected void waitCompletionLatch(Integer countDownLatchId, ICompletableFuture future) throws ExecutionException {
        CountDownLatch countDownLatch;
        if (countDownLatchId != -1 && (countDownLatch = (CountDownLatch)this.syncLocks.get(countDownLatchId)) != null) {
            this.awaitLatch(countDownLatch, future);
        }
    }

    private void awaitLatch(CountDownLatch countDownLatch, ICompletableFuture future) throws ExecutionException {
        try {
            for (long currentTimeoutMs = MAX_COMPLETION_LATCH_WAIT_TIME; currentTimeoutMs > 0L && !countDownLatch.await(COMPLETION_LATCH_WAIT_TIME_STEP, TimeUnit.MILLISECONDS); currentTimeoutMs -= COMPLETION_LATCH_WAIT_TIME_STEP) {
                Object response;
                if (future != null && future.isDone() && (response = future.get()) instanceof Throwable) {
                    return;
                }
                if (!this.getContext().isActive()) {
                    throw new HazelcastClientNotActiveException("Client is not active.");
                }
                if (!this.isClosed()) continue;
                throw new IllegalStateException("Cache (" + this.nameWithPrefix + ") is closed!");
            }
            if (countDownLatch.getCount() > 0L) {
                this.logger.finest("Countdown latch wait timeout after " + MAX_COMPLETION_LATCH_WAIT_TIME + " milliseconds!");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            ExceptionUtil.sneakyThrow(e);
        }
    }

    protected EventHandler createHandler(CacheEventListenerAdaptor<K, V> adaptor) {
        return new CacheEventHandler(adaptor);
    }

    private final class CacheEventHandler
    extends CacheAddEntryListenerCodec.AbstractEventHandler
    implements EventHandler<ClientMessage> {
        private final CacheEventListenerAdaptor<K, V> adaptor;

        private CacheEventHandler(CacheEventListenerAdaptor<K, V> adaptor) {
            this.adaptor = adaptor;
        }

        @Override
        public void handleCacheEventV10(int type, Collection<CacheEventData> keys, int completionId) {
            this.adaptor.handle(type, keys, completionId);
        }

        @Override
        public void beforeListenerRegister() {
        }

        @Override
        public void onListenerRegister() {
        }
    }
}

