/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.cp.internal;

import com.hazelcast.cluster.Member;
import com.hazelcast.cluster.impl.MemberImpl;
import com.hazelcast.cluster.memberselector.MemberSelectors;
import com.hazelcast.config.cp.CPSubsystemConfig;
import com.hazelcast.config.cp.RaftAlgorithmConfig;
import com.hazelcast.cp.CPGroup;
import com.hazelcast.cp.CPGroupId;
import com.hazelcast.cp.CPMember;
import com.hazelcast.cp.exception.CPGroupDestroyedException;
import com.hazelcast.cp.internal.CPGroupSummary;
import com.hazelcast.cp.internal.CPMemberInfo;
import com.hazelcast.cp.internal.MetadataRaftGroupManager;
import com.hazelcast.cp.internal.MetadataRaftGroupSnapshot;
import com.hazelcast.cp.internal.NodeEngineRaftIntegration;
import com.hazelcast.cp.internal.RaftGroupId;
import com.hazelcast.cp.internal.RaftGroupMembershipManager;
import com.hazelcast.cp.internal.RaftInvocationManager;
import com.hazelcast.cp.internal.RaftNodeLifecycleAwareService;
import com.hazelcast.cp.internal.RaftNodeMetrics;
import com.hazelcast.cp.internal.UnsafeModePartitionState;
import com.hazelcast.cp.internal.datastructures.spi.RaftManagedService;
import com.hazelcast.cp.internal.datastructures.spi.RaftRemoteService;
import com.hazelcast.cp.internal.exception.CannotRemoveCPMemberException;
import com.hazelcast.cp.internal.operation.ResetCPMemberOp;
import com.hazelcast.cp.internal.operation.unsafe.UnsafeStateReplicationOp;
import com.hazelcast.cp.internal.persistence.CPPersistenceService;
import com.hazelcast.cp.internal.raft.QueryPolicy;
import com.hazelcast.cp.internal.raft.SnapshotAwareService;
import com.hazelcast.cp.internal.raft.impl.RaftEndpoint;
import com.hazelcast.cp.internal.raft.impl.RaftNode;
import com.hazelcast.cp.internal.raft.impl.RaftNodeImpl;
import com.hazelcast.cp.internal.raft.impl.RaftNodeStatus;
import com.hazelcast.cp.internal.raft.impl.dto.AppendFailureResponse;
import com.hazelcast.cp.internal.raft.impl.dto.AppendRequest;
import com.hazelcast.cp.internal.raft.impl.dto.AppendSuccessResponse;
import com.hazelcast.cp.internal.raft.impl.dto.InstallSnapshot;
import com.hazelcast.cp.internal.raft.impl.dto.PreVoteRequest;
import com.hazelcast.cp.internal.raft.impl.dto.PreVoteResponse;
import com.hazelcast.cp.internal.raft.impl.dto.TriggerLeaderElection;
import com.hazelcast.cp.internal.raft.impl.dto.VoteRequest;
import com.hazelcast.cp.internal.raft.impl.dto.VoteResponse;
import com.hazelcast.cp.internal.raft.impl.log.RaftLog;
import com.hazelcast.cp.internal.raft.impl.persistence.LogFileStructure;
import com.hazelcast.cp.internal.raft.impl.persistence.RaftStateStore;
import com.hazelcast.cp.internal.raft.impl.persistence.RestoredRaftState;
import com.hazelcast.cp.internal.raft.impl.state.RaftState;
import com.hazelcast.cp.internal.raftop.GetInitialRaftGroupMembersIfCurrentGroupMemberOp;
import com.hazelcast.cp.internal.raftop.metadata.AddCPMemberOp;
import com.hazelcast.cp.internal.raftop.metadata.ForceDestroyRaftGroupOp;
import com.hazelcast.cp.internal.raftop.metadata.GetActiveCPMembersOp;
import com.hazelcast.cp.internal.raftop.metadata.GetActiveRaftGroupByNameOp;
import com.hazelcast.cp.internal.raftop.metadata.GetActiveRaftGroupIdsOp;
import com.hazelcast.cp.internal.raftop.metadata.GetRaftGroupIdsOp;
import com.hazelcast.cp.internal.raftop.metadata.GetRaftGroupOp;
import com.hazelcast.cp.internal.raftop.metadata.RaftServicePreJoinOp;
import com.hazelcast.cp.internal.raftop.metadata.RemoveCPMemberOp;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.internal.config.ConfigValidator;
import com.hazelcast.internal.diagnostics.MetricsPlugin;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.partition.MigrationAwareService;
import com.hazelcast.internal.partition.MigrationEndpoint;
import com.hazelcast.internal.partition.PartitionMigrationEvent;
import com.hazelcast.internal.partition.PartitionReplicationEvent;
import com.hazelcast.internal.services.GracefulShutdownAwareService;
import com.hazelcast.internal.services.ManagedService;
import com.hazelcast.internal.services.MemberAttributeServiceEvent;
import com.hazelcast.internal.services.MembershipAwareService;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.internal.services.PreJoinAwareService;
import com.hazelcast.internal.util.Clock;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.UuidUtil;
import com.hazelcast.internal.util.executor.ManagedExecutorService;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.exception.PartitionMigratingException;
import com.hazelcast.spi.impl.InternalCompletableFuture;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.executionservice.ExecutionService;
import com.hazelcast.spi.impl.operationexecutor.impl.PartitionOperationThread;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.OperationServiceImpl;
import com.hazelcast.spi.impl.servicemanager.ServiceInfo;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;

public class RaftService
implements ManagedService,
SnapshotAwareService<MetadataRaftGroupSnapshot>,
GracefulShutdownAwareService,
MembershipAwareService,
PreJoinAwareService,
RaftNodeLifecycleAwareService,
MigrationAwareService,
DynamicMetricsProvider {
    public static final String SERVICE_NAME = "hz:core:raft";
    public static final String CP_SUBSYSTEM_EXECUTOR = "hz:cpSubsystem";
    static final String CP_SUBSYSTEM_MANAGEMENT_EXECUTOR = "hz:cpSubsystemManagement";
    private static final long REMOVE_MISSING_MEMBER_TASK_PERIOD_SECONDS = 1L;
    private static final int AWAIT_DISCOVERY_STEP_MILLIS = 10;
    private final ReadWriteLock nodeLock = new ReentrantReadWriteLock();
    @Probe
    private final ConcurrentMap<CPGroupId, RaftNode> nodes = new ConcurrentHashMap<CPGroupId, RaftNode>();
    private final ConcurrentMap<CPGroupId, RaftNodeMetrics> nodeMetrics = new ConcurrentHashMap<CPGroupId, RaftNodeMetrics>();
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    @Probe
    private final Set<CPGroupId> destroyedGroupIds = Collections.newSetFromMap(new ConcurrentHashMap());
    @Probe
    private final Set<CPGroupId> terminatedRaftNodeGroupIds = Collections.newSetFromMap(new ConcurrentHashMap());
    private final CPSubsystemConfig config;
    private final RaftInvocationManager invocationManager;
    private final MetadataRaftGroupManager metadataGroupManager;
    @Probe
    private final ConcurrentMap<CPMemberInfo, Long> missingMembers = new ConcurrentHashMap<CPMemberInfo, Long>();
    private final int metricsPeriod;
    private final boolean cpSubsystemEnabled;
    private final UnsafeModePartitionState[] unsafeModeStates;

    public RaftService(NodeEngine nodeEngine) {
        this.nodeEngine = (NodeEngineImpl)nodeEngine;
        this.logger = nodeEngine.getLogger(this.getClass());
        CPSubsystemConfig cpSubsystemConfig = nodeEngine.getConfig().getCPSubsystemConfig();
        this.config = cpSubsystemConfig != null ? new CPSubsystemConfig(cpSubsystemConfig) : new CPSubsystemConfig();
        ConfigValidator.checkCPSubsystemConfig(this.config);
        this.cpSubsystemEnabled = this.config.getCPMemberCount() > 0;
        this.invocationManager = new RaftInvocationManager(nodeEngine, this);
        this.metadataGroupManager = new MetadataRaftGroupManager(this.nodeEngine, this, this.config);
        if (this.cpSubsystemEnabled) {
            this.unsafeModeStates = null;
        } else {
            this.unsafeModeStates = new UnsafeModePartitionState[nodeEngine.getPartitionService().getPartitionCount()];
            for (int i = 0; i < this.unsafeModeStates.length; ++i) {
                this.unsafeModeStates[i] = new UnsafeModePartitionState();
            }
        }
        MetricsRegistry metricsRegistry = this.nodeEngine.getMetricsRegistry();
        metricsRegistry.registerStaticMetrics(this, "raft");
        metricsRegistry.registerStaticMetrics(this.metadataGroupManager, "raft.metadata");
        metricsRegistry.registerDynamicMetricsProvider(this);
        this.metricsPeriod = nodeEngine.getProperties().getInteger(MetricsPlugin.PERIOD_SECONDS);
    }

    @Override
    public void init(NodeEngine nodeEngine, Properties properties) {
        if (!this.metadataGroupManager.init()) {
            return;
        }
        if (this.config.getMissingCPMemberAutoRemovalSeconds() > 0) {
            ExecutionService executionService = nodeEngine.getExecutionService();
            executionService.scheduleWithRepetition(CP_SUBSYSTEM_MANAGEMENT_EXECUTOR, new AutoRemoveMissingCPMemberTask(), 1L, 1L, TimeUnit.SECONDS);
        }
        MetricsRegistry metricsRegistry = this.nodeEngine.getMetricsRegistry();
        metricsRegistry.scheduleAtFixedRate(new PublishNodeMetricsTask(), this.metricsPeriod, TimeUnit.SECONDS, ProbeLevel.INFO);
    }

    @Override
    public void reset() {
    }

    @Override
    public void shutdown(boolean terminate) {
        if (this.getCPPersistenceService().isEnabled()) {
            ArrayList<InternalCompletableFuture> futures = new ArrayList<InternalCompletableFuture>(this.nodes.size());
            for (RaftNode raftNode : this.nodes.values()) {
                futures.add(raftNode.forceSetTerminatedStatus());
            }
            for (Future future : futures) {
                try {
                    future.get();
                }
                catch (Exception e) {
                    this.logger.severe("Error while terminating RaftNode", e);
                }
            }
        }
    }

    @Override
    public MetadataRaftGroupSnapshot takeSnapshot(CPGroupId groupId, long commitIndex) {
        return this.metadataGroupManager.takeSnapshot(groupId, commitIndex);
    }

    @Override
    public void restoreSnapshot(CPGroupId groupId, long commitIndex, MetadataRaftGroupSnapshot snapshot) {
        this.metadataGroupManager.restoreSnapshot(groupId, commitIndex, snapshot);
    }

    public InternalCompletableFuture<Collection<CPGroupId>> getAllCPGroupIds() {
        return this.invocationManager.query(this.getMetadataGroupId(), new GetRaftGroupIdsOp(), QueryPolicy.LINEARIZABLE);
    }

    public InternalCompletableFuture<Collection<CPGroupId>> getCPGroupIds() {
        return this.invocationManager.query(this.getMetadataGroupId(), new GetActiveRaftGroupIdsOp(), QueryPolicy.LINEARIZABLE);
    }

    public InternalCompletableFuture<CPGroup> getCPGroup(CPGroupId groupId) {
        return this.invocationManager.query(this.getMetadataGroupId(), new GetRaftGroupOp(groupId), QueryPolicy.LINEARIZABLE);
    }

    public InternalCompletableFuture<CPGroup> getCPGroup(String name) {
        return this.invocationManager.query(this.getMetadataGroupId(), new GetActiveRaftGroupByNameOp(name), QueryPolicy.LINEARIZABLE);
    }

    InternalCompletableFuture<Void> resetCPSubsystem() {
        Preconditions.checkState(this.cpSubsystemEnabled, "CP Subsystem is not enabled!");
        final InternalCompletableFuture<Void> future = this.newCompletableFuture();
        ClusterService clusterService = this.nodeEngine.getClusterService();
        final Collection<Member> members = clusterService.getMembers(MemberSelectors.NON_LOCAL_MEMBER_SELECTOR);
        if (!clusterService.isMaster()) {
            return RaftService.complete(future, new IllegalStateException("Only master can reset CP Subsystem!"));
        }
        if (this.config.getCPMemberCount() > members.size() + 1) {
            return RaftService.complete(future, new IllegalStateException("Not enough cluster members to reset CP Subsystem! Required: " + this.config.getCPMemberCount() + ", available: " + (members.size() + 1)));
        }
        BiConsumer<Void, Throwable> callback = new BiConsumer<Void, Throwable>(){
            final AtomicInteger latch;
            volatile Throwable failure;
            {
                this.latch = new AtomicInteger(members.size());
            }

            @Override
            public void accept(Void aVoid, Throwable throwable) {
                if (throwable == null) {
                    if (this.latch.decrementAndGet() == 0) {
                        if (this.failure == null) {
                            future.complete(null);
                        } else {
                            RaftService.complete(future, this.failure);
                        }
                    }
                } else {
                    this.failure = throwable;
                    if (this.latch.decrementAndGet() == 0) {
                        RaftService.complete(future, throwable);
                    }
                }
            }
        };
        long seed = this.newSeed();
        this.logger.warning("Resetting CP Subsystem with groupId seed: " + seed);
        this.resetLocal(seed);
        OperationServiceImpl operationService = this.nodeEngine.getOperationService();
        for (Member member : members) {
            ResetCPMemberOp op = new ResetCPMemberOp(seed);
            operationService.invokeOnTarget(SERVICE_NAME, op, member.getAddress()).whenCompleteAsync(callback);
        }
        return future;
    }

    private long newSeed() {
        long seed;
        long currentSeed = this.metadataGroupManager.getGroupIdSeed();
        for (seed = Clock.currentTimeMillis(); seed <= currentSeed; ++seed) {
        }
        return seed;
    }

    public void resetLocal(long seed) {
        if (seed == 0L) {
            throw new IllegalArgumentException("Seed cannot be zero!");
        }
        if (seed == this.metadataGroupManager.getGroupIdSeed()) {
            this.logger.severe("Ignoring reset request. Current groupId seed is already equal to " + seed);
            return;
        }
        this.nodeLock.writeLock().lock();
        try {
            this.resetLocalRaftState();
            this.getCPPersistenceService().reset();
            this.metadataGroupManager.restart(seed);
            this.logger.info("Local CP state is reset with groupId seed: " + seed);
        }
        finally {
            this.nodeLock.writeLock().unlock();
        }
    }

    private void resetLocalRaftState() {
        ArrayList<InternalCompletableFuture> futures = new ArrayList<InternalCompletableFuture>(this.nodes.size());
        this.destroyedGroupIds.addAll(this.nodes.keySet());
        for (RaftNode node : this.nodes.values()) {
            InternalCompletableFuture f = node.forceSetTerminatedStatus();
            futures.add(f);
        }
        for (InternalCompletableFuture future : futures) {
            try {
                future.get();
            }
            catch (Exception e) {
                this.logger.warning(e);
            }
        }
        this.nodes.clear();
        for (ServiceInfo serviceInfo : this.nodeEngine.getServiceInfos(RaftRemoteService.class)) {
            if (!(serviceInfo.getService() instanceof RaftManagedService)) continue;
            ((RaftManagedService)serviceInfo.getService()).onCPSubsystemRestart();
        }
        this.nodeMetrics.clear();
        this.missingMembers.clear();
        this.invocationManager.reset();
    }

    public InternalCompletableFuture<Void> promoteToCPMember() {
        InternalCompletableFuture<Void> future = this.newCompletableFuture();
        if (!this.metadataGroupManager.isDiscoveryCompleted()) {
            return RaftService.complete(future, new IllegalStateException("CP Subsystem discovery is not completed yet!"));
        }
        if (this.nodeEngine.getLocalMember().isLiteMember()) {
            return RaftService.complete(future, new IllegalStateException("Lite members cannot be promoted to CP member!"));
        }
        if (this.getLocalCPMember() != null) {
            future.complete(null);
            return future;
        }
        MemberImpl localMember = this.nodeEngine.getLocalMember();
        CPMemberInfo member = new CPMemberInfo(UuidUtil.newUnsecureUUID(), localMember.getAddress());
        this.logger.info("Adding new CP member: " + member);
        this.invocationManager.invoke(this.getMetadataGroupId(), new AddCPMemberOp(member)).whenCompleteAsync((response, t) -> {
            if (t == null) {
                this.metadataGroupManager.initPromotedCPMember(member);
                future.complete(null);
            } else {
                RaftService.complete(future, t);
            }
        });
        return future;
    }

    private <T> InternalCompletableFuture<T> newCompletableFuture() {
        ManagedExecutorService executor = this.nodeEngine.getExecutionService().getExecutor("hz:system");
        return InternalCompletableFuture.withExecutor(executor);
    }

    public InternalCompletableFuture<Void> removeCPMember(UUID cpMemberUuid) {
        ClusterService clusterService = this.nodeEngine.getClusterService();
        InternalCompletableFuture<Void> future = this.newCompletableFuture();
        BiConsumer<Void, Throwable> removeMemberCallback = (response, t) -> {
            if (t == null) {
                future.complete(null);
            } else {
                if (t instanceof CannotRemoveCPMemberException) {
                    t = new IllegalStateException(t.getMessage());
                }
                RaftService.complete(future, t);
            }
        };
        this.invocationManager.invoke(this.getMetadataGroupId(), new GetActiveCPMembersOp()).whenCompleteAsync((cpMembers, t) -> {
            if (t == null) {
                CPMemberInfo cpMemberToRemove = null;
                for (CPMember cpMember : cpMembers) {
                    if (!cpMember.getUuid().equals(cpMemberUuid)) continue;
                    cpMemberToRemove = (CPMemberInfo)cpMember;
                    break;
                }
                if (cpMemberToRemove == null) {
                    RaftService.complete(future, new IllegalArgumentException("No CPMember found with uuid: " + cpMemberUuid));
                    return;
                }
                MemberImpl member = clusterService.getMember(cpMemberToRemove.getAddress());
                if (member != null) {
                    this.logger.warning("Only unreachable/crashed CP members should be removed. " + member + " is alive but " + cpMemberToRemove + " with the same address is being removed.");
                }
                this.invokeTriggerRemoveMember(cpMemberToRemove).whenCompleteAsync(removeMemberCallback);
            } else {
                RaftService.complete(future, t);
            }
        });
        return future;
    }

    public InternalCompletableFuture<Void> forceDestroyCPGroup(String groupName) {
        return this.invocationManager.invoke(this.getMetadataGroupId(), new ForceDestroyRaftGroupOp(groupName));
    }

    public InternalCompletableFuture<Collection<CPMember>> getCPMembers() {
        return this.invocationManager.query(this.getMetadataGroupId(), new GetActiveCPMembersOp(), QueryPolicy.LINEARIZABLE);
    }

    public boolean isDiscoveryCompleted() {
        return this.metadataGroupManager.isDiscoveryCompleted();
    }

    public boolean awaitUntilDiscoveryCompleted(long timeout, TimeUnit timeUnit) throws InterruptedException {
        long sleepMillis;
        for (long timeoutMillis = timeUnit.toMillis(timeout); timeoutMillis > 0L && !this.metadataGroupManager.isDiscoveryCompleted(); timeoutMillis -= sleepMillis) {
            sleepMillis = Math.min(10L, timeoutMillis);
            Thread.sleep(sleepMillis);
        }
        return this.metadataGroupManager.isDiscoveryCompleted();
    }

    @Override
    public boolean onShutdown(long timeout, TimeUnit unit) {
        CPMemberInfo localMember = this.getLocalCPMember();
        if (localMember == null) {
            return true;
        }
        if (this.getCPPersistenceService().isEnabled()) {
            return true;
        }
        this.logger.fine("Triggering remove member procedure for " + localMember);
        if (this.ensureCPMemberRemoved(localMember, unit.toNanos(timeout))) {
            return true;
        }
        this.logger.fine("Remove member procedure NOT completed for " + localMember + " in " + unit.toMillis(timeout) + " ms.");
        return false;
    }

    private boolean ensureCPMemberRemoved(CPMemberInfo member, long remainingTimeNanos) {
        while (remainingTimeNanos > 0L) {
            long start = System.nanoTime();
            try {
                if (this.metadataGroupManager.getActiveMembers().size() == 1) {
                    this.logger.warning("I am one of the last 2 CP members...");
                    return true;
                }
                this.invokeTriggerRemoveMember(member).get();
                this.logger.fine(member + " is marked as being removed.");
                break;
            }
            catch (ExecutionException e) {
                if (!(e.getCause() instanceof CannotRemoveCPMemberException)) {
                    throw ExceptionUtil.rethrow(e);
                }
                if ((remainingTimeNanos -= System.nanoTime() - start) <= 0L) {
                    throw new IllegalStateException(e.getMessage());
                }
                try {
                    Thread.sleep(RaftGroupMembershipManager.MANAGEMENT_TASK_PERIOD_IN_MILLIS);
                }
                catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    return false;
                }
            }
            catch (Exception e) {
                throw ExceptionUtil.rethrow(e);
            }
        }
        return true;
    }

    @Override
    public Operation getPreJoinOperation() {
        if (!this.cpSubsystemEnabled) {
            return null;
        }
        boolean master = this.nodeEngine.getClusterService().isMaster();
        boolean discoveryCompleted = this.metadataGroupManager.isDiscoveryCompleted();
        RaftGroupId metadataGroupId = this.metadataGroupManager.getMetadataGroupId();
        return master ? new RaftServicePreJoinOp(discoveryCompleted, metadataGroupId) : null;
    }

    @Override
    public void memberAdded(MembershipServiceEvent event) {
        this.metadataGroupManager.broadcastActiveCPMembers();
        this.updateMissingMembers();
    }

    @Override
    public void memberRemoved(MembershipServiceEvent event) {
        this.updateMissingMembers();
    }

    @Override
    public void memberAttributeChanged(MemberAttributeServiceEvent event) {
    }

    void updateMissingMembers() {
        if (this.config.getMissingCPMemberAutoRemovalSeconds() == 0 || !this.metadataGroupManager.isDiscoveryCompleted() || !this.isStartCompleted() && this.getCPPersistenceService().getCPMetadataStore().containsLocalMemberFile()) {
            return;
        }
        Collection<CPMemberInfo> activeMembers = this.metadataGroupManager.getActiveMembers();
        this.missingMembers.keySet().retainAll(activeMembers);
        ClusterService clusterService = this.nodeEngine.getClusterService();
        for (CPMemberInfo cpMember : activeMembers) {
            if (clusterService.getMember(cpMember.getAddress()) == null) {
                if (this.missingMembers.putIfAbsent(cpMember, Clock.currentTimeMillis()) != null) continue;
                this.logger.warning(cpMember + " is not present in the cluster. It will be auto-removed after " + this.config.getMissingCPMemberAutoRemovalSeconds() + " seconds.");
                continue;
            }
            if (this.missingMembers.remove(cpMember) == null) continue;
            this.logger.info(cpMember + " is removed from the missing members list as it is in the cluster.");
        }
    }

    Collection<CPMemberInfo> getMissingMembers() {
        return Collections.unmodifiableSet(this.missingMembers.keySet());
    }

    public MetadataRaftGroupManager getMetadataGroupManager() {
        return this.metadataGroupManager;
    }

    public RaftInvocationManager getInvocationManager() {
        return this.invocationManager;
    }

    public void handlePreVoteRequest(CPGroupId groupId, PreVoteRequest request, RaftEndpoint target) {
        RaftNode node = this.getOrInitRaftNodeIfTargetLocalCPMember(groupId, request, target);
        if (node != null) {
            node.handlePreVoteRequest(request);
        }
    }

    public void handlePreVoteResponse(CPGroupId groupId, PreVoteResponse response, RaftEndpoint target) {
        RaftNode node = this.getOrInitRaftNodeIfTargetLocalCPMember(groupId, response, target);
        if (node != null) {
            node.handlePreVoteResponse(response);
        }
    }

    public void handleVoteRequest(CPGroupId groupId, VoteRequest request, RaftEndpoint target) {
        RaftNode node = this.getOrInitRaftNodeIfTargetLocalCPMember(groupId, request, target);
        if (node != null) {
            node.handleVoteRequest(request);
        }
    }

    public void handleVoteResponse(CPGroupId groupId, VoteResponse response, RaftEndpoint target) {
        RaftNode node = this.getOrInitRaftNodeIfTargetLocalCPMember(groupId, response, target);
        if (node != null) {
            node.handleVoteResponse(response);
        }
    }

    public void handleAppendEntries(CPGroupId groupId, AppendRequest request, RaftEndpoint target) {
        RaftNode node = this.getOrInitRaftNodeIfTargetLocalCPMember(groupId, request, target);
        if (node != null) {
            node.handleAppendRequest(request);
        }
    }

    public void handleAppendResponse(CPGroupId groupId, AppendSuccessResponse response, RaftEndpoint target) {
        RaftNode node = this.getOrInitRaftNodeIfTargetLocalCPMember(groupId, response, target);
        if (node != null) {
            node.handleAppendResponse(response);
        }
    }

    public void handleAppendResponse(CPGroupId groupId, AppendFailureResponse response, RaftEndpoint target) {
        RaftNode node = this.getOrInitRaftNodeIfTargetLocalCPMember(groupId, response, target);
        if (node != null) {
            node.handleAppendResponse(response);
        }
    }

    public void handleSnapshot(CPGroupId groupId, InstallSnapshot request, RaftEndpoint target) {
        RaftNode node = this.getOrInitRaftNodeIfTargetLocalCPMember(groupId, request, target);
        if (node != null) {
            node.handleInstallSnapshot(request);
        }
    }

    public void handleTriggerLeaderElection(CPGroupId groupId, TriggerLeaderElection request, RaftEndpoint target) {
        RaftNode node = this.getOrInitRaftNodeIfTargetLocalCPMember(groupId, request, target);
        if (node != null) {
            node.handleTriggerLeaderElection(request);
        }
    }

    public Collection<RaftNode> getAllRaftNodes() {
        return new ArrayList<RaftNode>(this.nodes.values());
    }

    public RaftNode getRaftNode(CPGroupId groupId) {
        return (RaftNode)this.nodes.get(groupId);
    }

    public RaftNode getOrInitRaftNode(CPGroupId groupId) {
        RaftNode node = (RaftNode)this.nodes.get(groupId);
        if (node == null && this.isStartCompleted() && this.isDiscoveryCompleted() && !this.destroyedGroupIds.contains(groupId) && !this.terminatedRaftNodeGroupIds.contains(groupId)) {
            this.logger.fine("RaftNode[" + groupId + "] does not exist. Asking to the METADATA CP group...");
            this.nodeEngine.getExecutionService().execute(CP_SUBSYSTEM_EXECUTOR, new InitializeRaftNodeTask(groupId));
        }
        return node;
    }

    private RaftNode getOrInitRaftNodeIfTargetLocalCPMember(CPGroupId groupId, Object message, RaftEndpoint target) {
        RaftNode node = this.getOrInitRaftNode(groupId);
        if (node == null) {
            if (this.logger.isFineEnabled()) {
                this.logger.warning("RaftNode[" + groupId + "] does not exist to handle: " + message);
            }
            return null;
        }
        if (!target.equals(node.getLocalMember())) {
            if (this.logger.isFineEnabled()) {
                this.logger.warning("Won't handle " + message + ". We are not the expected target: " + target + ", local endpoint: " + node.getLocalMember());
            }
            return null;
        }
        return node;
    }

    boolean isStartCompleted() {
        return this.nodeEngine.getNode().getNodeExtension().isStartCompleted();
    }

    public boolean isRaftGroupDestroyed(CPGroupId groupId) {
        return this.destroyedGroupIds.contains(groupId);
    }

    public CPSubsystemConfig getConfig() {
        return this.config;
    }

    public CPMemberInfo getLocalCPMember() {
        return this.metadataGroupManager.getLocalCPMember();
    }

    public RaftEndpoint getLocalCPEndpoint() {
        CPMemberInfo localCPMember = this.getLocalCPMember();
        return localCPMember != null ? localCPMember.toRaftEndpoint() : null;
    }

    public void createRaftNode(CPGroupId groupId, Collection<RaftEndpoint> members) {
        this.createRaftNode(groupId, members, this.getLocalCPEndpoint());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void createRaftNode(CPGroupId groupId, Collection<RaftEndpoint> members, RaftEndpoint localCPMember) {
        assert (!(Thread.currentThread() instanceof PartitionOperationThread)) : "Cannot create RaftNode of " + groupId + " in a partition thread!";
        if (this.nodes.containsKey(groupId) || !this.isStartCompleted() || !this.hasSameSeed(groupId)) {
            return;
        }
        if (this.getLocalCPMember() == null) {
            this.logger.warning("Not creating Raft node for " + groupId + " because local CP member is not initialized yet.");
            return;
        }
        this.nodeLock.readLock().lock();
        try {
            if (this.destroyedGroupIds.contains(groupId)) {
                this.logger.warning("Not creating RaftNode[" + groupId + "] since the CP group is already destroyed.");
                return;
            }
            if (this.terminatedRaftNodeGroupIds.contains(groupId) && !this.nodeEngine.isRunning()) {
                this.logger.fine("Not creating RaftNode[" + groupId + "] since the local CP member is already terminated.");
                return;
            }
            int partitionId = this.getCPGroupPartitionId(groupId);
            NodeEngineRaftIntegration integration = new NodeEngineRaftIntegration(this.nodeEngine, groupId, localCPMember, partitionId);
            RaftAlgorithmConfig raftAlgorithmConfig = this.config.getRaftAlgorithmConfig();
            CPPersistenceService persistenceService = this.getCPPersistenceService();
            RaftStateStore stateStore = persistenceService.createRaftStateStore((RaftGroupId)groupId, null);
            RaftNodeImpl node = RaftNodeImpl.newRaftNode(groupId, localCPMember, members, raftAlgorithmConfig, integration, stateStore);
            this.registerNodeMetrics(groupId);
            if (this.nodes.putIfAbsent(groupId, node) == null) {
                if (this.destroyedGroupIds.contains(groupId)) {
                    this.deregisterNodeMetrics(groupId);
                    this.nodes.remove(groupId, node);
                    this.logger.warning("Not creating RaftNode[" + groupId + "] since the CP group is already destroyed.");
                    return;
                }
                node.start();
                this.logger.info("RaftNode[" + groupId + "] is created with " + members);
            }
        }
        finally {
            this.nodeLock.readLock().unlock();
        }
    }

    CPPersistenceService getCPPersistenceService() {
        return this.nodeEngine.getNode().getNodeExtension().getCPPersistenceService();
    }

    public RaftNodeImpl restoreRaftNode(RaftGroupId groupId, RestoredRaftState restoredState, LogFileStructure logFileStructure) {
        int partitionId = this.getCPGroupPartitionId(groupId);
        NodeEngineRaftIntegration integration = new NodeEngineRaftIntegration(this.nodeEngine, groupId, restoredState.localEndpoint(), partitionId);
        RaftAlgorithmConfig raftAlgorithmConfig = this.config.getRaftAlgorithmConfig();
        RaftStateStore stateStore = this.getCPPersistenceService().createRaftStateStore(groupId, logFileStructure);
        RaftNodeImpl node = RaftNodeImpl.restoreRaftNode(groupId, restoredState, raftAlgorithmConfig, integration, stateStore);
        this.registerNodeMetrics(groupId);
        RaftNode prev = this.nodes.putIfAbsent(groupId, node);
        Preconditions.checkState(prev == null, "Could not restore " + groupId + " because its Raft node already exists!");
        node.start();
        this.logger.info("RaftNode[" + groupId + "] is restored.");
        return node;
    }

    @Override
    public void provideDynamicMetrics(MetricDescriptor descriptor, MetricsCollectionContext context) {
        MetricDescriptor rootDescriptor = descriptor.withPrefix("raft.group");
        for (Map.Entry entry : this.nodeMetrics.entrySet()) {
            CPGroupId groupId = (CPGroupId)entry.getKey();
            MetricDescriptor groupDescriptor = rootDescriptor.copy().withDiscriminator("groupId", String.valueOf(groupId.getId())).withTag("name", groupId.getName());
            context.collect(groupDescriptor, entry.getValue());
        }
    }

    private void registerNodeMetrics(CPGroupId groupId) {
        this.nodeMetrics.putIfAbsent(groupId, new RaftNodeMetrics());
    }

    private void deregisterNodeMetrics(CPGroupId groupId) {
        this.nodeMetrics.remove(groupId);
    }

    private boolean hasSameSeed(CPGroupId groupId) {
        return this.getMetadataGroupId().getSeed() == ((RaftGroupId)groupId).getSeed();
    }

    public boolean updateInvocationManagerMembers(long groupIdSeed, long membersCommitIndex, Collection<? extends CPMember> members) {
        return this.invocationManager.getRaftInvocationContext().setMembers(groupIdSeed, membersCommitIndex, members);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void terminateRaftNode(CPGroupId groupId, boolean groupDestroyed) {
        if (this.destroyedGroupIds.contains(groupId) || !this.hasSameSeed(groupId)) {
            return;
        }
        assert (!(Thread.currentThread() instanceof PartitionOperationThread)) : "Cannot terminate RaftNode of " + groupId + " in a partition thread!";
        this.nodeLock.readLock().lock();
        try {
            if (this.destroyedGroupIds.contains(groupId)) {
                return;
            }
            if (groupDestroyed) {
                this.destroyedGroupIds.add(groupId);
            }
            this.terminatedRaftNodeGroupIds.add(groupId);
            RaftNode node = (RaftNode)this.nodes.get(groupId);
            CPPersistenceService persistenceService = this.getCPPersistenceService();
            if (node != null) {
                this.destroyRaftNode(node, groupDestroyed);
                this.logger.info("RaftNode[" + groupId + "] is destroyed.");
            } else if (groupDestroyed && persistenceService.isEnabled()) {
                persistenceService.removeRaftStateStore((RaftGroupId)groupId);
                this.logger.info("RaftStateStore of RaftNode[" + groupId + "] is deleted.");
            }
        }
        finally {
            this.nodeLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stepDownRaftNode(CPGroupId groupId) {
        if (this.terminatedRaftNodeGroupIds.contains(groupId) || !this.hasSameSeed(groupId)) {
            return;
        }
        assert (!(Thread.currentThread() instanceof PartitionOperationThread)) : "Cannot step down RaftNode of " + groupId + " in a partition thread!";
        this.nodeLock.readLock().lock();
        try {
            if (this.terminatedRaftNodeGroupIds.contains(groupId)) {
                return;
            }
            CPPersistenceService persistenceService = this.getCPPersistenceService();
            RaftNode node = (RaftNode)this.nodes.get(groupId);
            if (node != null && node.getStatus() == RaftNodeStatus.STEPPED_DOWN) {
                this.terminatedRaftNodeGroupIds.add(groupId);
                this.destroyRaftNode(node, true);
                this.logger.fine("RaftNode[" + groupId + "] has stepped down.");
            } else if (node == null && persistenceService.isEnabled()) {
                persistenceService.removeRaftStateStore((RaftGroupId)groupId);
                this.logger.info("RaftStateStore of RaftNode[" + groupId + "] is deleted.");
            }
        }
        finally {
            this.nodeLock.readLock().unlock();
        }
    }

    private void destroyRaftNode(RaftNode node, boolean removeRaftStateStore) {
        RaftGroupId groupId = (RaftGroupId)node.getGroupId();
        node.forceSetTerminatedStatus().whenCompleteAsync((v, t) -> {
            this.nodes.remove(groupId, node);
            this.deregisterNodeMetrics(groupId);
            CPPersistenceService persistenceService = this.getCPPersistenceService();
            try {
                if (removeRaftStateStore && persistenceService.isEnabled()) {
                    persistenceService.removeRaftStateStore(groupId);
                    this.logger.info("RaftStateStore of RaftNode[" + groupId + "] is deleted.");
                }
            }
            catch (Exception e) {
                this.logger.severe("Deletion of RaftStateStore of RaftNode[" + groupId + "] failed.", e);
            }
        });
    }

    public RaftGroupId createRaftGroupForProxy(String name) {
        String groupName = RaftService.getGroupNameForProxy(name);
        if (this.cpSubsystemEnabled) {
            try {
                CPGroupSummary group = this.getGroupSummaryForProxy(groupName).joinInternal();
                if (group != null) {
                    return (RaftGroupId)group.id();
                }
                return (RaftGroupId)this.invocationManager.createRaftGroup(groupName).get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("Could not create CP group: " + groupName);
            }
            catch (ExecutionException e) {
                throw new IllegalStateException("Could not create CP group: " + groupName);
            }
        }
        return this.createPartitionBasedRaftGroupId(name, groupName);
    }

    private RaftGroupId createPartitionBasedRaftGroupId(String name, String groupName) {
        if ("default".equals(groupName)) {
            groupName = name;
        }
        Object key = this.nodeEngine.getSerializationService().toData(groupName);
        int partitionId = this.nodeEngine.getPartitionService().getPartitionId((Data)key);
        return new RaftGroupId(groupName, 0L, partitionId);
    }

    public InternalCompletableFuture<CPGroupId> createRaftGroupForProxyAsync(String name) {
        String groupName = RaftService.getGroupNameForProxy(name);
        if (this.cpSubsystemEnabled) {
            InternalCompletableFuture<CPGroupId> future = this.newCompletableFuture();
            InternalCompletableFuture<CPGroupSummary> groupIdFuture = this.getGroupSummaryForProxy(groupName);
            groupIdFuture.whenCompleteAsync((response, throwable) -> {
                if (throwable == null) {
                    if (response != null) {
                        future.complete(response.id());
                    } else {
                        this.invocationManager.createRaftGroup(groupName).whenCompleteAsync((r, t) -> RaftService.complete(future, r, t));
                    }
                } else {
                    RaftService.complete(future, throwable);
                }
            });
            return future;
        }
        return InternalCompletableFuture.newCompletedFuture(this.createPartitionBasedRaftGroupId(name, groupName));
    }

    private InternalCompletableFuture<CPGroupSummary> getGroupSummaryForProxy(String groupName) {
        return this.invocationManager.query(this.getMetadataGroupId(), new GetActiveRaftGroupByNameOp(groupName), QueryPolicy.LINEARIZABLE);
    }

    private InternalCompletableFuture<Void> invokeTriggerRemoveMember(CPMemberInfo member) {
        return this.invocationManager.invoke(this.getMetadataGroupId(), new RemoveCPMemberOp(member));
    }

    private static <T> InternalCompletableFuture<T> complete(InternalCompletableFuture<T> future, Throwable t) {
        future.completeExceptionally(t);
        return future;
    }

    private static <T> void complete(InternalCompletableFuture<T> future, T value, Throwable t) {
        if (t == null) {
            future.complete(value);
        } else {
            future.completeExceptionally(t);
        }
    }

    public static String withoutDefaultGroupName(String name) {
        int i = (name = name.trim()).indexOf("@");
        if (i == -1) {
            return name;
        }
        Preconditions.checkTrue(name.indexOf("@", i + 1) == -1, "Custom group name must be specified at most once");
        String groupName = name.substring(i + 1).trim();
        if (groupName.equalsIgnoreCase("default")) {
            return name.substring(0, i);
        }
        return name;
    }

    public static String getGroupNameForProxy(String name) {
        int i = (name = name.trim()).indexOf("@");
        if (i == -1) {
            return "default";
        }
        Preconditions.checkTrue(i < name.length() - 1, "Custom CP group name cannot be empty string");
        Preconditions.checkTrue(name.indexOf("@", i + 1) == -1, "Custom group name must be specified at most once");
        String groupName = name.substring(i + 1).trim();
        Preconditions.checkTrue(groupName.length() > 0, "Custom CP group name cannot be empty string");
        Preconditions.checkFalse(groupName.equalsIgnoreCase("METADATA"), "CP data structures cannot run on the METADATA CP group!");
        return groupName.equalsIgnoreCase("default") ? "default" : groupName;
    }

    public static String getObjectNameForProxy(String name) {
        int i = name.indexOf("@");
        if (i == -1) {
            return name;
        }
        Preconditions.checkTrue(i < name.length() - 1, "Object name cannot be empty string");
        Preconditions.checkTrue(name.indexOf("@", i + 1) == -1, "Custom CP group name must be specified at most once");
        String objectName = name.substring(0, i).trim();
        Preconditions.checkTrue(objectName.length() > 0, "Object name cannot be empty string");
        return objectName;
    }

    public RaftGroupId getMetadataGroupId() {
        return this.metadataGroupManager.getMetadataGroupId();
    }

    public boolean isCpSubsystemEnabled() {
        return this.cpSubsystemEnabled;
    }

    public void handleActiveCPMembers(RaftGroupId receivedMetadataGroupId, long membersCommitIndex, Collection<CPMemberInfo> members) {
        if (!this.metadataGroupManager.isDiscoveryCompleted()) {
            if (this.logger.isFineEnabled()) {
                this.logger.fine("Ignoring received active CP members: " + members + " since discovery is in progress.");
            }
            return;
        }
        Preconditions.checkNotNull(members);
        Preconditions.checkTrue(members.size() > 0, "Active CP members list cannot be empty");
        if (members.size() == 1) {
            this.logger.fine("There is one active CP member left: " + members);
            return;
        }
        CPMemberInfo localMember = this.getLocalCPMember();
        members = this.replaceLocalMemberIfAddressChanged(membersCommitIndex, members, localMember);
        if (this.updateInvocationManagerMembers(receivedMetadataGroupId.getSeed(), membersCommitIndex, members) && this.logger.isFineEnabled()) {
            this.logger.fine("Handled new active CP members list: " + members + ", members commit index: " + membersCommitIndex + ", METADATA group id seed: " + receivedMetadataGroupId.getSeed());
        }
        RaftGroupId metadataGroupId = this.getMetadataGroupId();
        if (receivedMetadataGroupId.getSeed() < metadataGroupId.getSeed() || metadataGroupId.equals(receivedMetadataGroupId)) {
            return;
        }
        if (!this.isStartCompleted()) {
            if (!metadataGroupId.equals(receivedMetadataGroupId)) {
                this.logger.severe("Restored METADATA groupId: " + metadataGroupId + " is different than received METADATA groupId: " + receivedMetadataGroupId + ". There must have been a CP Subsystem reset while this member was down...");
            }
            return;
        }
        if (this.getRaftNode(receivedMetadataGroupId) != null) {
            if (this.logger.isFineEnabled()) {
                this.logger.fine(localMember + " is already part of METADATA group but received active CP members!");
            }
            return;
        }
        if (!receivedMetadataGroupId.equals(metadataGroupId) && this.getRaftNode(metadataGroupId) != null) {
            this.logger.warning(localMember + " was part of " + metadataGroupId + ", but received active CP members for " + receivedMetadataGroupId + ".");
            return;
        }
        this.metadataGroupManager.handleMetadataGroupId(receivedMetadataGroupId);
    }

    private Collection<CPMemberInfo> replaceLocalMemberIfAddressChanged(long membersCommitIndex, Collection<CPMemberInfo> members, CPMemberInfo localMember) {
        if (localMember != null && !members.contains(localMember)) {
            CPMemberInfo otherMember = null;
            CPMemberInfo staleLocalMember = null;
            for (CPMemberInfo m : members) {
                if (m.getAddress().equals(localMember.getAddress()) && !m.getUuid().equals(localMember.getUuid())) {
                    otherMember = m;
                    continue;
                }
                if (m.getAddress().equals(localMember.getAddress()) || !m.getUuid().equals(localMember.getUuid())) continue;
                staleLocalMember = m;
            }
            if (otherMember != null || staleLocalMember != null) {
                members = new ArrayList<CPMemberInfo>(members);
                members.remove(otherMember);
                members.remove(staleLocalMember);
                if (this.logger.isFineEnabled()) {
                    this.logger.fine("Removing other member: " + otherMember + " in received CP members list: " + members + " and commit index: " + membersCommitIndex);
                }
            }
            if (staleLocalMember != null) {
                members.add(localMember);
                if (this.logger.isFineEnabled()) {
                    this.logger.fine("Replacing stale local member: " + staleLocalMember + " with: " + localMember + " in received CP members list: " + members + " and commit index: " + membersCommitIndex);
                }
            } else if (this.nodeEngine.getNode().isRunning()) {
                boolean missingAutoRemovalEnabled = this.config.getMissingCPMemberAutoRemovalSeconds() > 0;
                this.logger.severe("Local " + localMember + " is not part of received active CP members: " + members + ". It seems local member is removed from CP Subsystem. Auto removal of missing members is " + (missingAutoRemovalEnabled ? "enabled." : "disabled."));
            }
        }
        return members;
    }

    @Override
    public void onRaftNodeTerminated(CPGroupId groupId) {
        this.nodeEngine.getExecutionService().execute(CP_SUBSYSTEM_EXECUTOR, () -> this.terminateRaftNode(groupId, false));
    }

    @Override
    public void onRaftNodeSteppedDown(CPGroupId groupId) {
        this.nodeEngine.getExecutionService().execute(CP_SUBSYSTEM_EXECUTOR, () -> this.stepDownRaftNode(groupId));
    }

    public Collection<CPGroupId> getLeadedGroups() {
        ArrayList<CPGroupId> groupIds = new ArrayList<CPGroupId>();
        RaftEndpoint localEndpoint = this.getLocalCPEndpoint();
        for (RaftNode raftNode : this.nodes.values()) {
            RaftEndpoint leader;
            if ("METADATA".equals(raftNode.getGroupId().getName()) || (leader = raftNode.getLeader()) == null || !leader.equals(localEndpoint)) continue;
            groupIds.add(raftNode.getGroupId());
        }
        return groupIds;
    }

    public InternalCompletableFuture transferLeadership(CPGroupId groupId, CPMemberInfo destination) {
        RaftNode raftNode = this.getRaftNode(groupId);
        if (raftNode == null) {
            throw new IllegalStateException("RaftNode does not exist for group: " + groupId);
        }
        return raftNode.transferLeadership(destination.toRaftEndpoint());
    }

    public int getCPGroupPartitionId(CPGroupId groupId) {
        int partitionCount = this.nodeEngine.getPartitionService().getPartitionCount();
        return RaftService.getCPGroupPartitionId(groupId, partitionCount);
    }

    public static int getCPGroupPartitionId(CPGroupId groupId, int partitionCount) {
        assert (groupId.getId() >= 0L) : "Invalid groupId: " + groupId;
        return (int)(groupId.getId() % (long)partitionCount);
    }

    public long nextUnsafeModeCommitIndex(CPGroupId groupId) {
        assert (!this.cpSubsystemEnabled);
        int partitionId = this.getCPGroupPartitionId(groupId);
        UnsafeModePartitionState unsafeModeState = this.unsafeModeStates[partitionId];
        return unsafeModeState.nextCommitIndex();
    }

    public void registerUnsafeWaitingOperation(CPGroupId groupId, long commitIndex, Operation op) {
        assert (!this.cpSubsystemEnabled);
        int partitionId = this.getCPGroupPartitionId(groupId);
        UnsafeModePartitionState unsafeModeState = this.unsafeModeStates[partitionId];
        if (!unsafeModeState.registerWaitingOp(commitIndex, op)) {
            throw new IllegalArgumentException("Cannot register " + op + " with index " + commitIndex);
        }
    }

    public boolean completeFutures(CPGroupId groupId, Collection<Long> indices, Object result) {
        if (this.cpSubsystemEnabled) {
            RaftNodeImpl raftNode = (RaftNodeImpl)this.getRaftNode(groupId);
            if (raftNode == null) {
                return false;
            }
            for (Long index : indices) {
                raftNode.completeFuture(index, result);
            }
        } else {
            int partitionId = this.getCPGroupPartitionId(groupId);
            UnsafeModePartitionState unsafeModeState = this.unsafeModeStates[partitionId];
            for (Long index : indices) {
                Operation op = unsafeModeState.removeWaitingOp(index);
                if (op == null) continue;
                op.sendResponse(result);
            }
        }
        return true;
    }

    public boolean completeFutures(CPGroupId groupId, Collection<Map.Entry<Long, Object>> results) {
        if (this.cpSubsystemEnabled) {
            RaftNodeImpl raftNode = (RaftNodeImpl)this.getRaftNode(groupId);
            if (raftNode == null) {
                return false;
            }
            for (Map.Entry<Long, Object> result : results) {
                raftNode.completeFuture(result.getKey(), result.getValue());
            }
        } else {
            int partitionId = this.getCPGroupPartitionId(groupId);
            UnsafeModePartitionState unsafeModeState = this.unsafeModeStates[partitionId];
            for (Map.Entry<Long, Object> result : results) {
                Operation op = unsafeModeState.removeWaitingOp(result.getKey());
                if (op == null) continue;
                op.sendResponse(result.getValue());
            }
        }
        return true;
    }

    @Override
    public Operation prepareReplicationOperation(PartitionReplicationEvent event) {
        if (this.cpSubsystemEnabled) {
            return null;
        }
        if (event.getReplicaIndex() > this.getBackupCount()) {
            return null;
        }
        return new UnsafeStateReplicationOp(this.unsafeModeStates[event.getPartitionId()]);
    }

    @Override
    public void beforeMigration(PartitionMigrationEvent event) {
    }

    @Override
    public void commitMigration(PartitionMigrationEvent event) {
        if (this.cpSubsystemEnabled) {
            return;
        }
        if (event.getMigrationEndpoint() == MigrationEndpoint.SOURCE) {
            int thresholdReplicaIndex;
            UnsafeModePartitionState state = this.unsafeModeStates[event.getPartitionId()];
            if (event.getCurrentReplicaIndex() == 0) {
                PartitionMigratingException ex = new PartitionMigratingException("Partition " + event.getPartitionId() + " is migrating!");
                for (Operation op : state.getWaitingOps()) {
                    op.sendResponse(ex);
                }
            }
            if ((thresholdReplicaIndex = event.getNewReplicaIndex()) == -1 || thresholdReplicaIndex > this.getBackupCount()) {
                state.reset();
            }
        }
    }

    @Override
    public void rollbackMigration(PartitionMigrationEvent event) {
        int thresholdReplicaIndex;
        if (this.cpSubsystemEnabled) {
            return;
        }
        if (event.getMigrationEndpoint() == MigrationEndpoint.DESTINATION && ((thresholdReplicaIndex = event.getCurrentReplicaIndex()) == -1 || thresholdReplicaIndex > this.getBackupCount())) {
            this.unsafeModeStates[event.getPartitionId()].reset();
        }
    }

    private int getBackupCount() {
        return 1;
    }

    public void applyUnsafeModeState(int partitionId, UnsafeModePartitionState state) {
        assert (!this.cpSubsystemEnabled);
        this.unsafeModeStates[partitionId].apply(state);
    }

    private class PublishNodeMetricsTask
    implements Runnable {
        private PublishNodeMetricsTask() {
        }

        @Override
        public void run() {
            for (RaftNode node : RaftService.this.nodes.values()) {
                RaftNodeImpl raftNode = (RaftNodeImpl)node;
                RaftNodeMetrics metrics = (RaftNodeMetrics)RaftService.this.nodeMetrics.get(node.getGroupId());
                assert (metrics != null);
                raftNode.execute(() -> {
                    RaftState state = raftNode.state();
                    RaftLog log = state.log();
                    metrics.update(state.term(), state.commitIndex(), state.lastApplied(), log.lastLogOrSnapshotTerm(), log.snapshotIndex(), log.lastLogOrSnapshotIndex(), log.availableCapacity());
                });
            }
        }
    }

    private class AutoRemoveMissingCPMemberTask
    implements Runnable {
        private AutoRemoveMissingCPMemberTask() {
        }

        @Override
        public void run() {
            try {
                if (!RaftService.this.metadataGroupManager.isMetadataGroupLeader() || RaftService.this.metadataGroupManager.getMembershipChangeSchedule() != null) {
                    return;
                }
                for (Map.Entry e : RaftService.this.missingMembers.entrySet()) {
                    long missingTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - (Long)e.getValue());
                    if (missingTimeSeconds < (long)RaftService.this.config.getMissingCPMemberAutoRemovalSeconds()) continue;
                    CPMemberInfo missingMember = (CPMemberInfo)e.getKey();
                    RaftService.this.logger.warning("Removing " + missingMember + " since it is absent for " + missingTimeSeconds + " seconds.");
                    RaftService.this.removeCPMember(missingMember.getUuid()).get();
                    RaftService.this.logger.info("Auto-removal of " + missingMember + " is successful.");
                    return;
                }
            }
            catch (Exception e) {
                RaftService.this.logger.severe("RemoveMissingMembersTask failed", e);
            }
        }
    }

    private class InitializeRaftNodeTask
    implements Runnable {
        private final CPGroupId groupId;

        InitializeRaftNodeTask(CPGroupId groupId) {
            this.groupId = groupId;
        }

        @Override
        public void run() {
            this.queryInitialMembersFromMetadataRaftGroup();
        }

        private void queryInitialMembersFromMetadataRaftGroup() {
            GetRaftGroupOp op = new GetRaftGroupOp(this.groupId);
            InternalCompletableFuture f = RaftService.this.invocationManager.query(RaftService.this.getMetadataGroupId(), op, QueryPolicy.LEADER_LOCAL);
            f.whenCompleteAsync((group, throwable) -> {
                if (throwable == null) {
                    if (group != null) {
                        if (group.members().contains(RaftService.this.getLocalCPMember())) {
                            RaftService.this.createRaftNode(this.groupId, group.initialMembers());
                        } else {
                            this.queryInitialMembersFromTargetRaftGroup();
                        }
                    } else if (RaftService.this.logger.isFineEnabled()) {
                        RaftService.this.logger.fine("Cannot get initial members of " + this.groupId + " from the METADATA CP group");
                    }
                } else {
                    if (throwable instanceof CPGroupDestroyedException) {
                        CPGroupId destroyedGroupId = ((CPGroupDestroyedException)throwable).getGroupId();
                        RaftService.this.terminateRaftNode(destroyedGroupId, true);
                    }
                    if (RaftService.this.logger.isFineEnabled()) {
                        RaftService.this.logger.fine("Cannot get initial members of " + this.groupId + " from the METADATA CP group", (Throwable)throwable);
                    }
                }
            });
        }

        void queryInitialMembersFromTargetRaftGroup() {
            RaftEndpoint localEndpoint = RaftService.this.getLocalCPEndpoint();
            if (localEndpoint == null) {
                return;
            }
            GetInitialRaftGroupMembersIfCurrentGroupMemberOp op = new GetInitialRaftGroupMembersIfCurrentGroupMemberOp(localEndpoint);
            InternalCompletableFuture f = RaftService.this.invocationManager.query(this.groupId, op, QueryPolicy.LEADER_LOCAL);
            f.whenCompleteAsync((initialMembers, t) -> {
                if (t == null) {
                    RaftService.this.createRaftNode(this.groupId, (Collection<RaftEndpoint>)initialMembers);
                } else if (RaftService.this.logger.isFineEnabled()) {
                    RaftService.this.logger.fine("Cannot get initial members of " + this.groupId + " from the CP group itself", (Throwable)t);
                }
            });
        }
    }
}

