/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.spi.impl;

import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.MemberLeftException;
import com.hazelcast.core.PartitionAware;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.instance.OutOfMemoryErrorDispatcher;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.Packet;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.partition.InternalPartition;
import com.hazelcast.partition.PartitionService;
import com.hazelcast.partition.PartitionServiceImpl;
import com.hazelcast.partition.ReplicaErrorLogger;
import com.hazelcast.spi.BackupAwareOperation;
import com.hazelcast.spi.BackupCompletionCallback;
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.spi.InvocationBuilder;
import com.hazelcast.spi.Notifier;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.OperationAccessor;
import com.hazelcast.spi.OperationFactory;
import com.hazelcast.spi.OperationService;
import com.hazelcast.spi.PartitionAwareOperation;
import com.hazelcast.spi.ReadonlyOperation;
import com.hazelcast.spi.ResponseHandler;
import com.hazelcast.spi.UrgentSystemOperation;
import com.hazelcast.spi.WaitSupport;
import com.hazelcast.spi.annotation.PrivateApi;
import com.hazelcast.spi.exception.CallTimeoutException;
import com.hazelcast.spi.exception.CallerNotMemberException;
import com.hazelcast.spi.exception.PartitionMigratingException;
import com.hazelcast.spi.exception.WrongTargetException;
import com.hazelcast.spi.impl.Backup;
import com.hazelcast.spi.impl.BackupResponse;
import com.hazelcast.spi.impl.BasicInvocationBuilder;
import com.hazelcast.spi.impl.BasicOperationProcessor;
import com.hazelcast.spi.impl.BasicOperationScheduler;
import com.hazelcast.spi.impl.BasicPartitionInvocation;
import com.hazelcast.spi.impl.BasicTargetInvocation;
import com.hazelcast.spi.impl.InternalOperationService;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.NormalResponse;
import com.hazelcast.spi.impl.PartitionIteratingOperation;
import com.hazelcast.spi.impl.RemoteCall;
import com.hazelcast.spi.impl.Response;
import com.hazelcast.spi.impl.ResponseHandlerFactory;
import com.hazelcast.util.Clock;
import com.hazelcast.util.executor.ManagedExecutorService;
import com.hazelcast.util.executor.SingleExecutorThreadFactory;
import com.hazelcast.util.scheduler.EntryTaskScheduler;
import com.hazelcast.util.scheduler.EntryTaskSchedulerFactory;
import com.hazelcast.util.scheduler.ScheduleType;
import com.hazelcast.util.scheduler.ScheduledEntry;
import com.hazelcast.util.scheduler.ScheduledEntryProcessor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

final class BasicOperationService
implements InternalOperationService {
    private final AtomicLong executedOperationsCount = new AtomicLong();
    private final NodeEngineImpl nodeEngine;
    private final Node node;
    private final ILogger logger;
    private final AtomicLong callIdGen = new AtomicLong(0L);
    final ConcurrentMap<Long, RemoteCall> remoteCalls;
    private final ExecutorService responseExecutor;
    private final long defaultCallTimeout;
    private final Map<RemoteCallKey, RemoteCallKey> executingCalls;
    final ConcurrentMap<Long, BackupCompletionCallback> backupCalls;
    private final int operationThreadCount;
    private final EntryTaskScheduler<Object, ScheduledBackup> backupScheduler;
    private final BlockingQueue<Runnable> responseWorkQueue = new LinkedBlockingQueue<Runnable>();
    private final ExecutionService executionService;
    private final BasicOperationScheduler executor;

    BasicOperationService(NodeEngineImpl nodeEngine) {
        this.nodeEngine = nodeEngine;
        this.node = nodeEngine.getNode();
        this.logger = this.node.getLogger(OperationService.class);
        this.defaultCallTimeout = this.node.getGroupProperties().OPERATION_CALL_TIMEOUT_MILLIS.getLong();
        int coreSize = Runtime.getRuntime().availableProcessors();
        boolean reallyMultiCore = coreSize >= 8;
        int concurrencyLevel = reallyMultiCore ? coreSize * 4 : 16;
        this.remoteCalls = new ConcurrentHashMap<Long, RemoteCall>(1000, 0.75f, concurrencyLevel);
        int opThreadCount = this.node.getGroupProperties().OPERATION_THREAD_COUNT.getInteger();
        this.operationThreadCount = opThreadCount > 0 ? opThreadCount : coreSize * 2;
        this.executionService = nodeEngine.getExecutionService();
        this.executionService.register("hz:async", coreSize * 10, coreSize * 100000);
        this.responseExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, this.responseWorkQueue, new SingleExecutorThreadFactory(this.node.threadGroup, this.node.getConfigClassLoader(), this.node.getThreadNamePrefix("response")));
        this.executingCalls = new ConcurrentHashMap<RemoteCallKey, RemoteCallKey>(1000, 0.75f, concurrencyLevel);
        this.backupCalls = new ConcurrentHashMap<Long, BackupCompletionCallback>(1000, 0.75f, concurrencyLevel);
        this.backupScheduler = EntryTaskSchedulerFactory.newScheduler(this.executionService.getDefaultScheduledExecutor(), new ScheduledBackupProcessor(), ScheduleType.SCHEDULE_IF_NEW);
        this.executor = new BasicOperationScheduler(this.node, this.executionService, this.operationThreadCount, new BasicOperationProcessorImpl());
    }

    @Override
    public int getOperationThreadCount() {
        return this.operationThreadCount;
    }

    @Override
    public int getRunningOperationsCount() {
        return this.executingCalls.size();
    }

    @Override
    public long getExecutedOperationCount() {
        return this.executedOperationsCount.get();
    }

    @Override
    public int getRemoteOperationsCount() {
        return this.remoteCalls.size();
    }

    @Override
    public int getResponseQueueSize() {
        return this.responseWorkQueue.size();
    }

    @Override
    public int getOperationExecutorQueueSize() {
        return this.executor.getOperationExecutorQueueSize();
    }

    @Override
    public InvocationBuilder createInvocationBuilder(String serviceName, Operation op, int partitionId) {
        if (partitionId < 0) {
            throw new IllegalArgumentException("Partition id cannot be negative!");
        }
        return new BasicInvocationBuilder(this.nodeEngine, serviceName, op, partitionId);
    }

    @Override
    public InvocationBuilder createInvocationBuilder(String serviceName, Operation op, Address target) {
        if (target == null) {
            throw new IllegalArgumentException("Target cannot be null!");
        }
        return new BasicInvocationBuilder(this.nodeEngine, serviceName, op, target);
    }

    @Override
    @PrivateApi
    public void receive(Packet packet) {
        block4: {
            try {
                if (packet.isHeaderSet(1)) {
                    this.responseExecutor.execute(new ResponseProcessor(packet));
                } else {
                    int partitionId = packet.getPartitionId();
                    boolean systemOperation = packet.isUrgent();
                    this.executor.execute(packet, partitionId, systemOperation);
                }
            }
            catch (RejectedExecutionException e) {
                if (!this.nodeEngine.isActive()) break block4;
                throw e;
            }
        }
    }

    private int getPartitionIdForExecution(Operation op) {
        return op instanceof PartitionAwareOperation ? op.getPartitionId() : -1;
    }

    @Override
    public void runOperation(Operation op) {
        if (!this.isAllowedToRunInCurrentThread(op)) {
            throw new IllegalThreadStateException("Operation: " + op + " cannot be run in current thread! -> " + Thread.currentThread());
        }
        this.processOperation(op);
    }

    boolean isAllowedToRunInCurrentThread(Operation op) {
        return this.executor.isAllowedToRunInCurrentThread(this.getPartitionIdForExecution(op));
    }

    boolean isInvocationAllowedFromCurrentThread(Operation op) {
        return this.executor.isInvocationAllowedFromCurrentThread(this.getPartitionIdForExecution(op));
    }

    @Override
    public void executeOperation(Operation op) {
        String executorName = op.getExecutorName();
        if (executorName == null) {
            int partitionId = this.getPartitionIdForExecution(op);
            boolean urgent = op.isUrgent();
            this.executor.execute(op, partitionId, urgent);
        } else {
            ManagedExecutorService executor = this.executionService.getExecutor(executorName);
            if (executor == null) {
                throw new IllegalStateException("Could not found executor with name: " + executorName);
            }
            if (op instanceof PartitionAware) {
                throw new IllegalStateException("PartitionAwareOperation " + op + " can't be executed on a " + "custom executor with name: " + executorName);
            }
            if (op instanceof UrgentSystemOperation) {
                throw new IllegalStateException("UrgentSystemOperation " + op + " can't be executed on a custom " + "executor with name: " + executorName);
            }
            executor.execute(new LocalOperationProcessor(op));
        }
    }

    @Override
    public <E> InternalCompletableFuture<E> invokeOnPartition(String serviceName, Operation op, int partitionId) {
        return new BasicPartitionInvocation(this.nodeEngine, serviceName, op, partitionId, 0, 250, 500L, -1L, null, null, true).invoke();
    }

    @Override
    public <E> InternalCompletableFuture<E> invokeOnTarget(String serviceName, Operation op, Address target) {
        return new BasicTargetInvocation(this.nodeEngine, serviceName, op, target, 250, 500L, -1L, null, null, true).invoke();
    }

    private void processPacket(Packet packet) {
        Connection conn = packet.getConn();
        try {
            Address caller = conn.getEndPoint();
            Data data = packet.getData();
            Object object = this.nodeEngine.toObject(data);
            Operation op = (Operation)object;
            op.setNodeEngine(this.nodeEngine);
            OperationAccessor.setCallerAddress(op, caller);
            OperationAccessor.setConnection(op, conn);
            ResponseHandlerFactory.setRemoteResponseHandler(this.nodeEngine, op);
            if (!OperationAccessor.isJoinOperation(op) && this.node.clusterService.getMember(op.getCallerAddress()) == null) {
                CallerNotMemberException error = new CallerNotMemberException(op.getCallerAddress(), op.getPartitionId(), op.getClass().getName(), op.getServiceName());
                this.handleOperationError(op, error);
            } else {
                String executorName = op.getExecutorName();
                if (executorName == null) {
                    this.processOperation(op);
                } else {
                    ManagedExecutorService executor = this.executionService.getExecutor(executorName);
                    if (executor == null) {
                        throw new IllegalStateException("Could not found executor with name: " + executorName);
                    }
                    executor.execute(new LocalOperationProcessor(op));
                }
            }
        }
        catch (Throwable e) {
            this.logger.severe(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processOperation(Operation op) {
        RemoteCallKey callKey;
        block18: {
            WaitSupport waitSupport;
            block17: {
                this.executedOperationsCount.incrementAndGet();
                callKey = null;
                if (!this.isCallTimedOut(op)) break block17;
                CallTimeoutException response = new CallTimeoutException(op.getClass().getName(), op.getInvocationTime(), op.getCallTimeout());
                op.getResponseHandler().sendResponse(response);
                this.afterCallExecution(op, callKey);
                return;
            }
            callKey = this.beforeCallExecution(op);
            int partitionId = op.getPartitionId();
            if (op instanceof PartitionAwareOperation) {
                if (partitionId < 0) {
                    throw new IllegalArgumentException("Partition id cannot be negative! -> " + partitionId);
                }
                InternalPartition internalPartition = this.nodeEngine.getPartitionService().getPartition(partitionId);
                if (BasicOperationService.retryDuringMigration(op) && internalPartition.isMigrating()) {
                    throw new PartitionMigratingException(this.node.getThisAddress(), partitionId, op.getClass().getName(), op.getServiceName());
                }
                Address owner = internalPartition.getReplicaAddress(op.getReplicaIndex());
                if (op.validatesTarget() && !this.node.getThisAddress().equals(owner)) {
                    throw new WrongTargetException(this.node.getThisAddress(), owner, partitionId, op.getReplicaIndex(), op.getClass().getName(), op.getServiceName());
                }
            }
            OperationAccessor.setStartTime(op, Clock.currentTimeMillis());
            op.beforeRun();
            if (!(op instanceof WaitSupport) || !(waitSupport = (WaitSupport)((Object)op)).shouldWait()) break block18;
            this.nodeEngine.waitNotifyService.await(waitSupport);
            this.afterCallExecution(op, callKey);
            return;
        }
        try {
            Notifier notifier;
            op.run();
            boolean returnsResponse = op.returnsResponse();
            Object response = null;
            if (op instanceof BackupAwareOperation) {
                BackupAwareOperation backupAwareOp = (BackupAwareOperation)((Object)op);
                int syncBackupCount = 0;
                if (backupAwareOp.shouldBackup()) {
                    syncBackupCount = this.sendBackups(backupAwareOp);
                }
                if (returnsResponse) {
                    response = new NormalResponse(op.getResponse(), op.getCallId(), syncBackupCount, op.isUrgent());
                }
            }
            if (returnsResponse) {
                ResponseHandler responseHandler;
                if (response == null) {
                    response = op.getResponse();
                }
                if ((responseHandler = op.getResponseHandler()) == null) {
                    throw new IllegalStateException("ResponseHandler should not be null!");
                }
                responseHandler.sendResponse(response);
            }
            op.afterRun();
            if (op instanceof Notifier && (notifier = (Notifier)((Object)op)).shouldNotify()) {
                this.nodeEngine.waitNotifyService.notify(notifier);
            }
            this.afterCallExecution(op, callKey);
        }
        catch (Throwable e) {
            try {
                this.handleOperationError(op, e);
                this.afterCallExecution(op, callKey);
            }
            catch (Throwable throwable) {
                this.afterCallExecution(op, callKey);
                throw throwable;
            }
        }
    }

    private static boolean retryDuringMigration(Operation op) {
        return !(op instanceof ReadonlyOperation) && !OperationAccessor.isMigrationOperation(op);
    }

    @Override
    @PrivateApi
    public boolean isCallTimedOut(Operation op) {
        if (op.returnsResponse() && op.getCallId() != 0L) {
            long now;
            long callTimeout = op.getCallTimeout();
            long invocationTime = op.getInvocationTime();
            long expireTime = invocationTime + callTimeout;
            if (expireTime > 0L && expireTime < Long.MAX_VALUE && expireTime < (now = this.nodeEngine.getClusterTime())) {
                return true;
            }
        }
        return false;
    }

    private RemoteCallKey beforeCallExecution(Operation op) {
        RemoteCallKey current;
        RemoteCallKey callKey = null;
        if (op.getCallId() != 0L && op.returnsResponse() && (current = this.executingCalls.put(callKey = new RemoteCallKey(op), callKey)) != null) {
            this.logger.warning("Duplicate Call record! -> " + callKey + " / " + current + " == " + op.getClass().getName());
        }
        return callKey;
    }

    private void afterCallExecution(Operation op, RemoteCallKey callKey) {
        if (callKey != null && op.getCallId() != 0L && op.returnsResponse() && this.executingCalls.remove(callKey) == null) {
            this.logger.severe("No Call record has been found: -> " + callKey + " == " + op.getClass().getName());
        }
    }

    private int sendBackups(BackupAwareOperation backupAwareOp) throws Exception {
        int totalBackupCount;
        int asyncBackupCount;
        Operation op = (Operation)((Object)backupAwareOp);
        boolean returnsResponse = op.returnsResponse();
        PartitionServiceImpl partitionService = (PartitionServiceImpl)this.nodeEngine.getPartitionService();
        int maxBackups = Math.min(partitionService.getMemberGroupsSize() - 1, 6);
        int syncBackupCount = backupAwareOp.getSyncBackupCount() > 0 ? Math.min(maxBackups, backupAwareOp.getSyncBackupCount()) : 0;
        int n = asyncBackupCount = backupAwareOp.getAsyncBackupCount() > 0 && maxBackups > syncBackupCount ? Math.min(maxBackups - syncBackupCount, backupAwareOp.getAsyncBackupCount()) : 0;
        if (!returnsResponse) {
            asyncBackupCount += syncBackupCount;
            syncBackupCount = 0;
        }
        if ((totalBackupCount = syncBackupCount + asyncBackupCount) > 0) {
            String serviceName = op.getServiceName();
            int partitionId = op.getPartitionId();
            long[] replicaVersions = partitionService.incrementPartitionReplicaVersions(partitionId, totalBackupCount);
            PartitionServiceImpl.InternalPartitionImpl partition = partitionService.getPartition(partitionId);
            for (int replicaIndex = 1; replicaIndex <= totalBackupCount; ++replicaIndex) {
                Operation backupOp = backupAwareOp.getBackupOperation();
                if (backupOp == null) {
                    throw new IllegalArgumentException("Backup operation should not be null!");
                }
                backupOp.setPartitionId(partitionId).setReplicaIndex(replicaIndex).setServiceName(serviceName);
                Backup backup = new Backup(backupOp, op.getCallerAddress(), replicaVersions, replicaIndex <= syncBackupCount);
                backup.setPartitionId(partitionId).setReplicaIndex(replicaIndex).setServiceName(serviceName).setCallerUuid(this.nodeEngine.getLocalMember().getUuid());
                OperationAccessor.setCallId(backup, op.getCallId());
                Address target = partition.getReplicaAddress(replicaIndex);
                if (target != null) {
                    if (target.equals(this.node.getThisAddress())) {
                        throw new IllegalStateException("Normally shouldn't happen! Owner node and backup node are the same! " + partition);
                    }
                    this.send((Operation)backup, target);
                    continue;
                }
                this.scheduleBackup(op, backup, partitionId, replicaIndex);
            }
        }
        return syncBackupCount;
    }

    private void scheduleBackup(Operation op, Backup backup, int partitionId, int replicaIndex) {
        RemoteCallKey key = new RemoteCallKey(op);
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Scheduling -> " + backup);
        }
        this.backupScheduler.schedule(500L, key, new ScheduledBackup(backup, partitionId, replicaIndex));
    }

    private void handleOperationError(Operation op, Throwable e) {
        if (e instanceof OutOfMemoryError) {
            OutOfMemoryErrorDispatcher.onOutOfMemory((OutOfMemoryError)e);
        }
        op.logError(e);
        ResponseHandler responseHandler = op.getResponseHandler();
        if (op.returnsResponse() && responseHandler != null) {
            try {
                if (this.node.isActive()) {
                    responseHandler.sendResponse(e);
                } else if (responseHandler.isLocal()) {
                    responseHandler.sendResponse(new HazelcastInstanceNotActiveException());
                }
            }
            catch (Throwable t) {
                this.logger.warning("While sending op error...", t);
            }
        }
    }

    @Override
    public Map<Integer, Object> invokeOnAllPartitions(String serviceName, OperationFactory operationFactory) throws Exception {
        Map<Address, List<Integer>> memberPartitions = this.nodeEngine.getPartitionService().getMemberPartitionsMap();
        return this.invokeOnPartitions(serviceName, operationFactory, memberPartitions);
    }

    @Override
    public Map<Integer, Object> invokeOnPartitions(String serviceName, OperationFactory operationFactory, Collection<Integer> partitions) throws Exception {
        HashMap<Address, List<Integer>> memberPartitions = new HashMap<Address, List<Integer>>(3);
        for (int partition : partitions) {
            Address owner = this.nodeEngine.getPartitionService().getPartitionOwner(partition);
            if (!memberPartitions.containsKey(owner)) {
                memberPartitions.put(owner, new ArrayList());
            }
            ((List)memberPartitions.get(owner)).add(partition);
        }
        return this.invokeOnPartitions(serviceName, operationFactory, memberPartitions);
    }

    private Map<Integer, Object> invokeOnPartitions(String serviceName, OperationFactory operationFactory, Map<Address, List<Integer>> memberPartitions) throws Exception {
        Object result;
        Thread currentThread = Thread.currentThread();
        if (currentThread instanceof BasicOperationScheduler.PartitionThread) {
            throw new IllegalThreadStateException(currentThread + " cannot make invocation on multiple partitions!");
        }
        HashMap<Address, InternalCompletableFuture> responses = new HashMap<Address, InternalCompletableFuture>(memberPartitions.size());
        for (Map.Entry<Address, List<Integer>> mp : memberPartitions.entrySet()) {
            Address address = mp.getKey();
            List<Integer> partitions = mp.getValue();
            PartitionIteratingOperation pi = new PartitionIteratingOperation(partitions, operationFactory);
            InternalCompletableFuture future = this.createInvocationBuilder(serviceName, (Operation)pi, address).setTryCount(10).setTryPauseMillis(300L).invoke();
            responses.put(address, future);
        }
        HashMap<Integer, Object> partitionResults = new HashMap<Integer, Object>(this.nodeEngine.getPartitionService().getPartitionCount());
        for (Map.Entry response : responses.entrySet()) {
            try {
                PartitionIteratingOperation.PartitionResponse result2 = (PartitionIteratingOperation.PartitionResponse)this.nodeEngine.toObject(((Future)response.getValue()).get());
                partitionResults.putAll(result2.asMap());
            }
            catch (Throwable t) {
                if (this.logger.isFinestEnabled()) {
                    this.logger.finest(t);
                } else {
                    this.logger.warning(t.getMessage());
                }
                List<Integer> partitions = memberPartitions.get(response.getKey());
                for (Integer partition : partitions) {
                    partitionResults.put(partition, t);
                }
            }
        }
        LinkedList<Integer> failedPartitions = new LinkedList<Integer>();
        for (Map.Entry partitionResult : partitionResults.entrySet()) {
            int partitionId = (Integer)partitionResult.getKey();
            result = partitionResult.getValue();
            if (!(result instanceof Throwable)) continue;
            failedPartitions.add(partitionId);
        }
        for (Integer failedPartition : failedPartitions) {
            InternalCompletableFuture f = this.createInvocationBuilder(serviceName, operationFactory.createOperation(), failedPartition).invoke();
            partitionResults.put(failedPartition, f);
        }
        for (Integer failedPartition : failedPartitions) {
            Future f = (Future)partitionResults.get(failedPartition);
            result = f.get();
            partitionResults.put(failedPartition, result);
        }
        return partitionResults;
    }

    @Override
    public boolean send(Operation op, Address target) {
        if (target == null) {
            throw new IllegalArgumentException("Target is required!");
        }
        if (this.nodeEngine.getThisAddress().equals(target)) {
            throw new IllegalArgumentException("Target is this node! -> " + target + ", op: " + op);
        }
        return this.send(op, this.node.getConnectionManager().getOrConnect(target));
    }

    @Override
    public boolean send(Response response, Address target) {
        if (target == null) {
            throw new IllegalArgumentException("Target is required!");
        }
        if (this.nodeEngine.getThisAddress().equals(target)) {
            throw new IllegalArgumentException("Target is this node! -> " + target + ", response: " + response);
        }
        Data data = this.nodeEngine.toData(response);
        Packet packet = new Packet(data, this.nodeEngine.getSerializationContext());
        packet.setHeader(0);
        packet.setHeader(1);
        if (response.isUrgent()) {
            packet.setHeader(4);
        }
        return this.nodeEngine.send(packet, this.node.getConnectionManager().getOrConnect(target));
    }

    private boolean send(Operation op, Connection connection) {
        Data data = this.nodeEngine.toData(op);
        int partitionId = this.getPartitionIdForExecution(op);
        Packet packet = new Packet(data, partitionId, this.nodeEngine.getSerializationContext());
        packet.setHeader(0);
        if (op instanceof UrgentSystemOperation) {
            packet.setHeader(4);
        }
        return this.nodeEngine.send(packet, connection);
    }

    @PrivateApi
    long registerRemoteCall(RemoteCall call) {
        long callId = this.newCallId();
        this.remoteCalls.put(callId, call);
        return callId;
    }

    @PrivateApi
    long newCallId() {
        long callId = this.callIdGen.incrementAndGet();
        if (callId == 0L) {
            return this.newCallId();
        }
        return callId;
    }

    @PrivateApi
    RemoteCall deregisterRemoteCall(long callId) {
        return (RemoteCall)this.remoteCalls.remove(callId);
    }

    @PrivateApi
    void registerBackupCall(long callId, BackupCompletionCallback backupCompletionCallback) {
        BackupCompletionCallback current = this.backupCalls.put(callId, backupCompletionCallback);
        if (current != null) {
            this.logger.warning("Already registered a backup record for call[" + callId + "]!");
        }
    }

    @PrivateApi
    void deregisterBackupCall(long callId) {
        this.backupCalls.remove(callId);
    }

    @PrivateApi
    long getDefaultCallTimeout() {
        return this.defaultCallTimeout;
    }

    @PrivateApi
    boolean isOperationExecuting(Address callerAddress, String callerUuid, long operationCallId) {
        return this.executingCalls.containsKey(new RemoteCallKey(callerAddress, callerUuid, operationCallId));
    }

    @Override
    public void onMemberLeft(final MemberImpl member) {
        this.nodeEngine.getExecutionService().schedule(new Runnable(){

            @Override
            public void run() {
                Iterator iter = BasicOperationService.this.remoteCalls.values().iterator();
                while (iter.hasNext()) {
                    RemoteCall call = (RemoteCall)iter.next();
                    if (!call.isCallTarget(member)) continue;
                    iter.remove();
                    call.offerResponse(new MemberLeftException(member));
                }
            }
        }, 1111L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void shutdown() {
        this.logger.finest("Stopping operation threads...");
        this.responseExecutor.shutdown();
        HazelcastInstanceNotActiveException response = new HazelcastInstanceNotActiveException();
        for (RemoteCall call : this.remoteCalls.values()) {
            call.offerResponse(response);
        }
        this.remoteCalls.clear();
        this.backupCalls.clear();
        this.backupScheduler.cancelAll();
        this.executor.shutdown();
    }

    @Override
    public void notifyBackupCall(long callId) {
        try {
            BackupCompletionCallback callback = (BackupCompletionCallback)this.backupCalls.get(callId);
            if (callback != null) {
                callback.signalOneBackupComplete();
            }
        }
        catch (Exception e) {
            ReplicaErrorLogger.log(e, this.logger);
        }
    }

    private static class RemoteCallKey {
        private final long time = Clock.currentTimeMillis();
        private final Address callerAddress;
        private final String callerUuid;
        private final long callId;

        private RemoteCallKey(Address callerAddress, String callerUuid, long callId) {
            if (callerUuid == null) {
                throw new IllegalArgumentException("Caller UUID is required!");
            }
            this.callerAddress = callerAddress;
            if (callerAddress == null) {
                throw new IllegalArgumentException("Caller address is required!");
            }
            this.callerUuid = callerUuid;
            this.callId = callId;
        }

        private RemoteCallKey(Operation op) {
            this.callerUuid = op.getCallerUuid();
            if (this.callerUuid == null) {
                throw new IllegalArgumentException("Caller UUID is required! -> " + op);
            }
            this.callerAddress = op.getCallerAddress();
            if (this.callerAddress == null) {
                throw new IllegalArgumentException("Caller address is required! -> " + op);
            }
            this.callId = op.getCallId();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            RemoteCallKey callKey = (RemoteCallKey)o;
            if (this.callId != callKey.callId) {
                return false;
            }
            return this.callerUuid.equals(callKey.callerUuid);
        }

        public int hashCode() {
            int result = this.callerUuid.hashCode();
            result = 31 * result + (int)(this.callId ^ this.callId >>> 32);
            return result;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("RemoteCallKey");
            sb.append("{callerAddress=").append(this.callerAddress);
            sb.append(", callerUuid=").append(this.callerUuid);
            sb.append(", callId=").append(this.callId);
            sb.append(", time=").append(this.time);
            sb.append('}');
            return sb.toString();
        }
    }

    private class ResponseProcessor
    implements Runnable {
        final Packet packet;

        public ResponseProcessor(Packet packet) {
            this.packet = packet;
        }

        private void notifyRemoteCall(NormalResponse response) {
            RemoteCall call = BasicOperationService.this.deregisterRemoteCall(response.getCallId());
            if (call == null) {
                throw new HazelcastException("No call for response:" + response);
            }
            call.offerResponse(response);
        }

        @Override
        public void run() {
            block4: {
                try {
                    Data data = this.packet.getData();
                    Response response = (Response)BasicOperationService.this.nodeEngine.toObject(data);
                    if (response instanceof NormalResponse) {
                        this.notifyRemoteCall((NormalResponse)response);
                        break block4;
                    }
                    if (response instanceof BackupResponse) {
                        BasicOperationService.this.notifyBackupCall(response.getCallId());
                        break block4;
                    }
                    throw new IllegalStateException("Unrecognized response type: " + response);
                }
                catch (Throwable e) {
                    BasicOperationService.this.logger.severe("While processing response...", e);
                }
            }
        }
    }

    private class LocalOperationProcessor
    implements Runnable {
        private final Operation op;

        private LocalOperationProcessor(Operation op) {
            this.op = op;
        }

        @Override
        public void run() {
            BasicOperationService.this.processOperation(this.op);
        }
    }

    public class BasicOperationProcessorImpl
    implements BasicOperationProcessor {
        @Override
        public void process(Object o) {
            if (o == null) {
                throw new IllegalArgumentException();
            }
            if (o instanceof Operation) {
                BasicOperationService.this.processOperation((Operation)o);
            } else if (o instanceof Packet) {
                BasicOperationService.this.processPacket((Packet)o);
            } else if (o instanceof Runnable) {
                ((Runnable)o).run();
            } else {
                throw new IllegalArgumentException("Unrecognized task:" + o);
            }
        }
    }

    private class ScheduledBackup {
        final Backup backup;
        final int partitionId;
        final int replicaIndex;
        volatile int retries = 0;

        private ScheduledBackup(Backup backup, int partitionId, int replicaIndex) {
            this.backup = backup;
            this.partitionId = partitionId;
            this.replicaIndex = replicaIndex;
        }

        public boolean backup() {
            PartitionService partitionService = BasicOperationService.this.nodeEngine.getPartitionService();
            InternalPartition partition = partitionService.getPartition(this.partitionId);
            Address target = partition.getReplicaAddress(this.replicaIndex);
            if (target != null && !target.equals(BasicOperationService.this.node.getThisAddress())) {
                BasicOperationService.this.send((Operation)this.backup, target);
                return true;
            }
            return ++this.retries >= 10;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("ScheduledBackup{");
            sb.append("backup=").append(this.backup);
            sb.append(", partitionId=").append(this.partitionId);
            sb.append(", replicaIndex=").append(this.replicaIndex);
            sb.append('}');
            return sb.toString();
        }
    }

    private class ScheduledBackupProcessor
    implements ScheduledEntryProcessor<Object, ScheduledBackup> {
        private ScheduledBackupProcessor() {
        }

        @Override
        public void process(EntryTaskScheduler<Object, ScheduledBackup> scheduler, Collection<ScheduledEntry<Object, ScheduledBackup>> scheduledEntries) {
            for (ScheduledEntry<Object, ScheduledBackup> entry : scheduledEntries) {
                ScheduledBackup backup = entry.getValue();
                if (backup.backup()) continue;
                int retries = backup.retries;
                if (BasicOperationService.this.logger.isFinestEnabled()) {
                    BasicOperationService.this.logger.finest("Re-scheduling[" + retries + "] -> " + backup);
                }
                scheduler.schedule(entry.getScheduledDelayMillis() * (long)retries, entry.getKey(), backup);
            }
        }
    }
}

