/*
 * Decompiled with CFR 0.152.
 */
package com.tc.objectserver.handler;

import com.tc.async.api.AbstractEventHandler;
import com.tc.async.api.ConfigurationContext;
import com.tc.async.api.EventHandler;
import com.tc.async.api.EventHandlerException;
import com.tc.async.api.Sink;
import com.tc.async.api.Stage;
import com.tc.bytes.TCByteBuffer;
import com.tc.bytes.TCByteBufferFactory;
import com.tc.exception.ServerException;
import com.tc.l2.msg.ReplicationAckTuple;
import com.tc.l2.msg.ReplicationMessage;
import com.tc.l2.msg.ReplicationMessageAck;
import com.tc.l2.msg.ReplicationResultCode;
import com.tc.l2.msg.SyncReplicationActivity;
import com.tc.l2.state.ServerMode;
import com.tc.l2.state.StateManager;
import com.tc.net.ClientID;
import com.tc.net.NodeID;
import com.tc.net.ServerID;
import com.tc.net.groups.AbstractGroupMessage;
import com.tc.net.groups.GroupException;
import com.tc.net.groups.GroupManager;
import com.tc.net.utils.L2Utils;
import com.tc.object.ClientInstanceID;
import com.tc.object.EntityDescriptor;
import com.tc.object.EntityID;
import com.tc.object.FetchID;
import com.tc.object.session.SessionID;
import com.tc.object.tx.TransactionID;
import com.tc.objectserver.api.EntityManager;
import com.tc.objectserver.api.ManagedEntity;
import com.tc.objectserver.api.ResultCapture;
import com.tc.objectserver.api.ServerEntityAction;
import com.tc.objectserver.api.ServerEntityRequest;
import com.tc.objectserver.core.api.ServerConfigurationContext;
import com.tc.objectserver.entity.BarrierCompletion;
import com.tc.objectserver.entity.MessagePayload;
import com.tc.objectserver.entity.NoopResultCapture;
import com.tc.objectserver.entity.PassiveResultCapture;
import com.tc.objectserver.entity.PlatformEntity;
import com.tc.objectserver.entity.ResultCaptureImpl;
import com.tc.objectserver.handler.GroupMessageBatchContext;
import com.tc.objectserver.persistence.Persistor;
import com.tc.properties.TCPropertiesImpl;
import com.tc.tracing.Trace;
import com.tc.util.Assert;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.tripwire.Event;
import org.terracotta.tripwire.TripwireFactory;

public class ReplicatedTransactionHandler {
    private static final int DEFAULT_BATCH_LIMIT = 64;
    private static final int DEFAULT_INFLIGHT_MESSAGES = 1;
    private static final int maximumBatchSize = TCPropertiesImpl.getProperties().getInt("passive-active.batchsize", 64);
    private static final int idealMessagesInFlight = TCPropertiesImpl.getProperties().getInt("passive-active.inflight", 1);
    private static final Logger PLOGGER = LoggerFactory.getLogger(MessagePayload.class);
    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicatedTransactionHandler.class);
    private final EntityManager entityManager;
    private final Persistor persistor;
    private final GroupManager<AbstractGroupMessage> groupManager;
    private final StateManager stateManager;
    private final ManagedEntity platform;
    private final SyncState state = new SyncState();
    private ServerID cachedMessageAckFrom;
    private GroupMessageBatchContext<ReplicationMessageAck, ReplicationAckTuple> cachedBatchAck;
    private final Sink<Runnable> sentToActive;
    private volatile long currentSequence = 0L;
    private final EventHandler<ReplicationMessage> eventHorizon = new AbstractEventHandler<ReplicationMessage>(){

        public void handleEvent(ReplicationMessage message) throws EventHandlerException {
            try {
                ReplicatedTransactionHandler.this.currentSequence = message.getSequenceID();
                ReplicatedTransactionHandler.this.processMessage(message);
            }
            catch (Throwable t) {
                throw Assert.failure((Object)"Unexpected exception executing replicated message", (Throwable)t);
            }
        }

        protected void initialize(ConfigurationContext context) {
            ServerConfigurationContext scxt = (ServerConfigurationContext)context;
            scxt.getL2Coordinator().getReplicatedClusterStateManager().setCurrentState(scxt.getL2Coordinator().getStateManager().getCurrentMode().getState());
            if (ReplicatedTransactionHandler.this.stateManager.getCurrentMode() == ServerMode.UNINITIALIZED) {
                ReplicatedTransactionHandler.this.requestPassiveSync();
            }
        }

        public void destroy() {
            ServerEntityRequest req = new ServerEntityRequest(){

                @Override
                public ServerEntityAction getAction() {
                    return ServerEntityAction.FAILOVER_FLUSH;
                }

                @Override
                public ClientID getNodeID() {
                    return ClientID.NULL_ID;
                }

                @Override
                public TransactionID getTransaction() {
                    return TransactionID.NULL_ID;
                }

                @Override
                public TransactionID getOldestTransactionOnClient() {
                    return TransactionID.NULL_ID;
                }

                @Override
                public ClientInstanceID getClientInstance() {
                    return ClientInstanceID.NULL_ID;
                }

                @Override
                public boolean requiresReceived() {
                    return false;
                }

                @Override
                public Set<SessionID> replicateTo(Set<SessionID> passives) {
                    return Collections.emptySet();
                }
            };
            for (ManagedEntity me : ReplicatedTransactionHandler.this.entityManager.getAll()) {
                BarrierCompletion latch = new BarrierCompletion();
                me.clearQueue();
                me.addRequestMessage(req, MessagePayload.emptyPayload(), new ResultCaptureImpl(null, result -> latch.complete(), null, exception -> latch.failure((ServerException)exception)));
                latch.waitForCompletion();
            }
            BarrierCompletion latch = new BarrierCompletion();
            ReplicatedTransactionHandler.this.platform.addRequestMessage(req, MessagePayload.emptyPayload(), new ResultCaptureImpl(null, result -> latch.complete(), null, exception -> latch.failure((ServerException)exception)));
        }
    };

    public long getCurrentSequence() {
        return this.currentSequence;
    }

    public ReplicatedTransactionHandler(StateManager state, Stage<Runnable> sendToActive, Persistor persistor, EntityManager manager, GroupManager<AbstractGroupMessage> groupManager) {
        this.stateManager = state;
        this.sentToActive = sendToActive.getSink();
        this.entityManager = manager;
        this.persistor = persistor;
        this.groupManager = groupManager;
        try {
            this.platform = this.entityManager.getEntity(EntityDescriptor.createDescriptorForLifecycle((EntityID)PlatformEntity.PLATFORM_ID, (long)PlatformEntity.VERSION)).get();
        }
        catch (ServerException ee) {
            throw new RuntimeException(ee);
        }
    }

    public EventHandler<ReplicationMessage> getEventHandler() {
        return this.eventHorizon;
    }

    private void processMessage(ReplicationMessage rep) throws ServerException {
        if (PLOGGER.isDebugEnabled()) {
            PLOGGER.debug("RECEIVED:" + rep.getDebugId());
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("BATCH:" + rep.getSequenceID());
        }
        ServerID activeSender = rep.messageFrom();
        for (SyncReplicationActivity activity : rep.getActivities()) {
            EntityID eid = null;
            if (activity.getActivityType() != SyncReplicationActivity.ActivityType.SYNC_BEGIN) {
                Optional<ManagedEntity> opt = this.entityManager.getEntity(EntityDescriptor.createDescriptorForInvoke((FetchID)activity.getFetchID(), (ClientInstanceID)activity.getClientInstanceID()));
                eid = opt.map(ManagedEntity::getID).orElse(activity.getEntityID());
                Long fid = opt.map(ManagedEntity::getConsumerID).orElse(activity.getFetchID().toLong());
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("RECEIVING:" + eid + ":" + fid + " " + activity.getActivityType() + " " + activity.getActivityID().id);
                }
            }
            if (activity.isSyncActivity()) {
                if (SyncReplicationActivity.ActivityType.SYNC_BEGIN == activity.getActivityType()) {
                    this.syncBeginEntityListReceived(activeSender, activity);
                    continue;
                }
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Sync:" + eid + " " + activity.getActivityType());
                }
                this.syncActivityReceived(activeSender, activity);
                continue;
            }
            if (this.state.ignore(activity)) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Ignoring:" + eid + " " + activity.getActivityType());
                }
                this.acknowledge(activeSender, activity, ReplicationResultCode.NONE);
                continue;
            }
            if (this.state.defer(activeSender, activity)) {
                if (!LOGGER.isDebugEnabled()) continue;
                LOGGER.debug("Deferring:" + eid + " " + activity.getActivityType());
                continue;
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Applying:" + eid + " " + activity.getActivityType());
            }
            this.replicatedActivityReceived(activeSender, activity);
        }
    }

    private void syncBeginEntityListReceived(ServerID activeSender, SyncReplicationActivity activity) throws ServerException {
        this.ackReceived(activeSender, activity, null);
        this.beforeSyncAction(activity);
        SyncReplicationActivity.EntityCreationTuple[] entityTuples = activity.getEntitiesToCreateForSync();
        Assert.assertNotNull((Object)entityTuples);
        for (SyncReplicationActivity.EntityCreationTuple tuple : entityTuples) {
            EntityID eid = tuple.id;
            long version = tuple.version;
            long consumerID = tuple.consumerID;
            byte[] config = tuple.configPayload;
            boolean canDelete = tuple.canDelete;
            if (!this.entityManager.getEntity(EntityDescriptor.createDescriptorForLifecycle((EntityID)eid, (long)version)).isPresent()) {
                ManagedEntity entity = this.entityManager.createEntity(eid, version, consumerID);
                Assert.assertTrue((entity.canDelete() == canDelete ? 1 : 0) != 0);
                this.persistor.getEntityPersistor().entityCreatedNoJournal(eid, version, consumerID, canDelete, config);
                continue;
            }
            Assert.fail((String)"this entity should not be here");
        }
        this.afterSyncAction(activity);
        this.acknowledge(activeSender, activity, ReplicationResultCode.SUCCESS);
    }

    private void replicatedActivityReceived(ServerID activeSender, SyncReplicationActivity activity) throws ServerException {
        Trace trace = new Trace(String.valueOf(activity.getActivityID().id), "Replication");
        trace.start();
        ClientID sourceNodeID = activity.getSource();
        TransactionID transactionID = activity.getTransactionID();
        TransactionID oldestTransactionOnClient = activity.getOldestTransactionOnClient();
        Future<Void> tmpFuture = null;
        if (sourceNodeID != null && !sourceNodeID.isNull() && transactionID.isValid()) {
            Assert.assertTrue((boolean)oldestTransactionOnClient.isValid());
            tmpFuture = this.persistor.getTransactionOrderPersistor().updateWithNewMessage(sourceNodeID, transactionID, oldestTransactionOnClient);
        }
        Future<Void> transactionOrderPersistenceFuture = tmpFuture;
        TCByteBuffer extendedData = activity.getExtendedData();
        ServerEntityRequest request = this.activityToLocalRequest(activity);
        if (request.getAction() == ServerEntityAction.CREATE_ENTITY) {
            this.persistor.getEntityPersistor().setNextConsumerID(activity.getFetchID().toLong());
            try {
                ManagedEntity temp = this.entityManager.createEntity(activity.getEntityID(), activity.getVersion(), activity.getFetchID().toLong());
                boolean canDelete = temp.canDelete();
                Assert.assertTrue((Object)(temp.getConsumerID() + " == " + activity.getFetchID().toLong()), (temp.getConsumerID() == activity.getFetchID().toLong() ? 1 : 0) != 0);
                temp.addRequestMessage(request, MessagePayload.rawDataOnly(extendedData), this.createCapture(() -> this.ackReceived(activeSender, activity, transactionOrderPersistenceFuture), result -> {
                    if (!sourceNodeID.isNull()) {
                        this.persistor.getEntityPersistor().entityCreated(sourceNodeID, transactionID.toLong(), oldestTransactionOnClient.toLong(), activity.getEntityID(), activity.getVersion(), activity.getFetchID().toLong(), true, TCByteBufferFactory.unwrap((TCByteBuffer)extendedData));
                    } else {
                        this.persistor.getEntityPersistor().entityCreatedNoJournal(activity.getEntityID(), activity.getVersion(), activity.getFetchID().toLong(), canDelete, TCByteBufferFactory.unwrap((TCByteBuffer)extendedData));
                    }
                    this.acknowledge(activeSender, activity, ReplicationResultCode.SUCCESS);
                }, exception -> {
                    this.persistor.getEntityPersistor().entityCreateFailed(activity.getEntityID(), sourceNodeID, transactionID.toLong(), oldestTransactionOnClient.toLong(), (ServerException)((Object)exception));
                    LOGGER.debug("create fail:" + temp.getID());
                    this.acknowledge(activeSender, activity, ReplicationResultCode.FAIL);
                }));
            }
            catch (ServerException ee) {
                this.acknowledge(activeSender, activity, ReplicationResultCode.FAIL);
                this.persistor.getEntityPersistor().entityCreateFailed(activity.getEntityID(), sourceNodeID, transactionID.toLong(), oldestTransactionOnClient.toLong(), ee);
            }
        } else {
            Assert.assertFalse((Object)activity.getActivityType(), (boolean)activity.getFetchID().isNull());
            EntityDescriptor desp = EntityDescriptor.createDescriptorForInvoke((FetchID)activity.getFetchID(), (ClientInstanceID)ClientInstanceID.NULL_ID);
            Optional<ManagedEntity> entity = this.entityManager.getEntity(desp);
            if (entity.isPresent()) {
                ManagedEntity entityInstance = entity.get();
                MessagePayload payload = MessagePayload.syncPayloadNormal(extendedData, activity.getConcurrency());
                if (null != request.getAction()) {
                    switch (request.getAction()) {
                        case RECONFIGURE_ENTITY: {
                            entityInstance.addRequestMessage(request, payload, this.createCapture(() -> this.ackReceived(activeSender, activity, transactionOrderPersistenceFuture), result -> {
                                this.persistor.getEntityPersistor().entityReconfigureSucceeded(sourceNodeID, transactionID.toLong(), oldestTransactionOnClient.toLong(), entityInstance.getID(), entityInstance.getVersion(), payload.getRawPayload());
                                this.acknowledge(activeSender, activity, ReplicationResultCode.SUCCESS);
                            }, exception -> {
                                this.persistor.getEntityPersistor().entityReconfigureFailed(sourceNodeID, transactionID.toLong(), oldestTransactionOnClient.toLong(), (ServerException)((Object)exception));
                                this.acknowledge(activeSender, activity, ReplicationResultCode.FAIL);
                            }));
                            break;
                        }
                        case DESTROY_ENTITY: {
                            entityInstance.addRequestMessage(request, payload, this.createCapture(() -> this.ackReceived(activeSender, activity, transactionOrderPersistenceFuture), result -> {
                                this.persistor.getEntityPersistor().entityDestroyed(sourceNodeID, transactionID.toLong(), oldestTransactionOnClient.toLong(), entityInstance.getID());
                                this.acknowledge(activeSender, activity, ReplicationResultCode.SUCCESS);
                            }, exception -> {
                                this.persistor.getEntityPersistor().entityDestroyFailed(sourceNodeID, transactionID.toLong(), oldestTransactionOnClient.toLong(), (ServerException)((Object)exception));
                                LOGGER.debug("destroy fail:" + entityInstance.getID());
                                this.acknowledge(activeSender, activity, ReplicationResultCode.FAIL);
                            }));
                            break;
                        }
                        case FETCH_ENTITY: 
                        case RELEASE_ENTITY: {
                            entityInstance.addRequestMessage(request, payload, this.createCapture(() -> this.ackReceived(activeSender, activity, transactionOrderPersistenceFuture), result -> this.acknowledge(activeSender, activity, ReplicationResultCode.SUCCESS), exception -> {
                                LOGGER.debug("fetch/release fail:" + entityInstance.getID());
                                this.acknowledge(activeSender, activity, ReplicationResultCode.FAIL);
                            }));
                            break;
                        }
                        case MANAGED_ENTITY_GC: {
                            if (entityInstance.isRemoveable()) {
                                LOGGER.debug("removing " + entityInstance.getID());
                                this.entityManager.removeDestroyed(activity.getFetchID());
                                break;
                            }
                        }
                        case FAILOVER_FLUSH: {
                            entityInstance.addRequestMessage(request, payload, new NoopResultCapture());
                            break;
                        }
                        case ORDER_PLACEHOLDER_ONLY: {
                            this.acknowledge(activeSender, activity, ReplicationResultCode.SUCCESS);
                            break;
                        }
                        default: {
                            entityInstance.addRequestMessage(request, payload, this.createCapture(() -> this.ackReceived(activeSender, activity, transactionOrderPersistenceFuture), result -> this.acknowledge(activeSender, activity, ReplicationResultCode.SUCCESS), exception -> this.acknowledge(activeSender, activity, ReplicationResultCode.FAIL)));
                        }
                    }
                }
            } else {
                this.acknowledge(activeSender, activity, ReplicationResultCode.FAIL);
            }
        }
        trace.end();
    }

    private ResultCapture createCapture(Runnable received, Consumer<byte[]> completed, Consumer<ServerException> failure) {
        return new PassiveResultCapture(received, completed, failure);
    }

    private void establishNewPassive(SyncReplicationActivity.ActivityID sequence) {
        Event event = TripwireFactory.createPrimeEvent((String)this.groupManager.getLocalNodeID().getName(), (byte[])this.groupManager.getLocalNodeID().getUID(), (long)SessionID.NULL_ID.toLong(), (long)sequence.id);
        this.entityManager.resetReferences();
        event.commit();
    }

    private void requestPassiveSync() {
        NodeID node = this.stateManager.getActiveNodeID();
        Assert.assertTrue((boolean)this.entityManager.getAll().stream().allMatch(e -> e.getID().equals((Object)PlatformEntity.PLATFORM_ID)));
        this.moveToPassiveUnitialized(node);
        try {
            LOGGER.info("Requesting Passive Sync from " + node);
            this.groupManager.sendTo(node, (AbstractGroupMessage)ReplicationMessageAck.createSyncRequestMessage());
        }
        catch (GroupException ge) {
            LOGGER.warn("can't request passive sync", (Throwable)ge);
        }
    }

    private void syncActivityReceived(ServerID activeSender, SyncReplicationActivity activity) {
        MessagePayload payload;
        Trace trace = new Trace(String.valueOf(activity.getActivityID().id), "Sync");
        trace.start();
        SyncReplicationActivity.ActivityType thisActivityType = activity.getActivityType();
        FetchID fetch = activity.getFetchID();
        EntityDescriptor descriptor = EntityDescriptor.createDescriptorForInvoke((FetchID)fetch, (ClientInstanceID)ClientInstanceID.NULL_ID);
        Assert.assertTrue((SyncReplicationActivity.ActivityType.SYNC_BEGIN != thisActivityType ? 1 : 0) != 0);
        this.beforeSyncAction(activity);
        if (SyncReplicationActivity.ActivityType.SYNC_ENTITY_BEGIN == thisActivityType && !fetch.isNull()) {
            try {
                Assert.assertTrue((boolean)this.entityManager.getEntity(descriptor).isPresent());
                int referenceCount = activity.getReferenceCount();
                payload = MessagePayload.syncPayloadCreation(activity.getExtendedData(), referenceCount);
                BasicServerEntityRequest request = new BasicServerEntityRequest(ServerEntityAction.RECEIVE_SYNC_CREATE_ENTITY, activity.getSource(), activity.getClientInstanceID(), activity.getTransactionID(), activity.getOldestTransactionOnClient());
                this.entityManager.getEntity(descriptor).get().addRequestMessage(request, payload, this.createCapture(null, result -> {}, exception -> this.acknowledge(activeSender, activity, ReplicationResultCode.FAIL)));
            }
            catch (ServerException exception2) {
                LOGGER.warn("entity has already been created", (Throwable)exception2);
            }
        }
        try {
            Optional<ManagedEntity> entity = this.entityManager.getEntity(descriptor);
            if (entity.isPresent()) {
                payload = null;
                if (SyncReplicationActivity.ActivityType.SYNC_ENTITY_BEGIN == thisActivityType) {
                    payload = MessagePayload.emptyPayload();
                } else if (SyncReplicationActivity.ActivityType.SYNC_BEGIN == thisActivityType) {
                    payload = MessagePayload.emptyPayload();
                } else {
                    int concurrencyKey = activity.getConcurrency();
                    payload = MessagePayload.syncPayloadNormal(activity.getExtendedData(), concurrencyKey);
                }
                entity.get().addRequestMessage(this.activityToLocalRequest(activity), payload, this.createCapture(null, result -> this.acknowledge(activeSender, activity, ReplicationResultCode.SUCCESS), exception -> this.acknowledge(activeSender, activity, ReplicationResultCode.FAIL)));
            } else {
                Assert.assertFalse((SyncReplicationActivity.ActivityType.SYNC_ENTITY_BEGIN == thisActivityType ? 1 : 0) != 0);
                Assert.assertFalse((SyncReplicationActivity.ActivityType.ORDERING_PLACEHOLDER == thisActivityType ? 1 : 0) != 0);
                if (!fetch.isNull()) {
                    throw new AssertionError();
                }
                payload = MessagePayload.syncPayloadNormal(activity.getExtendedData(), activity.getConcurrency());
                this.platform.addRequestMessage(this.activityToLocalRequest(activity), payload, this.createCapture(null, result -> {
                    if (SyncReplicationActivity.ActivityType.SYNC_END == thisActivityType) {
                        try {
                            this.persistor.getEntityPersistor().layer(new ObjectInputStream(new ByteArrayInputStream(payload.getRawPayload())));
                        }
                        catch (IOException ioe) {
                            throw new RuntimeException(ioe);
                        }
                        this.moveToPassiveStandBy();
                    }
                    this.acknowledge(activeSender, activity, ReplicationResultCode.SUCCESS);
                }, exception -> this.acknowledge(activeSender, activity, ReplicationResultCode.FAIL)));
            }
        }
        catch (ServerException ee) {
            throw new RuntimeException(ee);
        }
        finally {
            this.afterSyncAction(activity);
        }
        trace.end();
    }

    private void start() {
        this.state.start();
    }

    private void start(FetchID fetch) {
        this.state.startEntity(fetch);
    }

    private void start(FetchID fetch, int concurrency) {
        this.state.startConcurrency(fetch, concurrency);
    }

    private void finish() {
        this.scheduleDeferred(this.state.finish());
    }

    private void finish(FetchID fetch) {
        this.state.endEntity(fetch);
    }

    private void finish(FetchID fetch, int concurrency) {
        this.scheduleDeferred(this.state.endConcurrency(fetch, concurrency));
    }

    private void scheduleDeferred(Deque<DeferredContainer> deferred) {
        if (deferred != null) {
            while (!deferred.isEmpty()) {
                DeferredContainer r = deferred.pop();
                try {
                    this.replicatedActivityReceived(r.activeSender, r.activity);
                }
                catch (ServerException ee) {
                    throw new RuntimeException(ee);
                }
            }
        }
    }

    private void moveToPassiveUnitialized(NodeID connectedTo) {
        if (!this.stateManager.isActiveCoordinator()) {
            this.stateManager.moveToPassiveSyncing(connectedTo);
        }
    }

    private void moveToPassiveStandBy() {
        if (!this.stateManager.isActiveCoordinator()) {
            this.stateManager.moveToPassiveStandbyState();
        }
    }

    private ServerEntityRequest activityToLocalRequest(SyncReplicationActivity activity) {
        SyncReplicationActivity.ActivityType activityType = activity.getActivityType();
        ClientID source = activity.getSource();
        ClientInstanceID instance = activity.getClientInstanceID();
        TransactionID transactionID = activity.getTransactionID();
        TransactionID oldestTransactionID = activity.getOldestTransactionOnClient();
        Assert.assertTrue((SyncReplicationActivity.ActivityType.SYNC_BEGIN != activityType ? 1 : 0) != 0);
        return new BasicServerEntityRequest(ReplicatedTransactionHandler.decodeReplicationType(activityType), source, instance, transactionID, oldestTransactionID);
    }

    private void beforeSyncAction(SyncReplicationActivity activity) {
        switch (activity.getActivityType()) {
            case SYNC_START: {
                this.establishNewPassive(activity.getActivityID());
                break;
            }
            case SYNC_BEGIN: {
                this.start();
                break;
            }
            case SYNC_ENTITY_BEGIN: {
                this.start(activity.getFetchID());
                break;
            }
            case SYNC_ENTITY_CONCURRENCY_BEGIN: {
                this.start(activity.getFetchID(), activity.getConcurrency());
                break;
            }
        }
    }

    private void afterSyncAction(SyncReplicationActivity activity) {
        switch (activity.getActivityType()) {
            case SYNC_END: {
                this.finish();
                break;
            }
            case SYNC_ENTITY_END: {
                this.finish(activity.getFetchID());
                break;
            }
            case SYNC_ENTITY_CONCURRENCY_END: {
                this.finish(activity.getFetchID(), activity.getConcurrency());
                break;
            }
        }
    }

    private void ackReceived(ServerID activeSender, SyncReplicationActivity activity, Future<Void> future) {
        if (!activeSender.equals((Object)ServerID.NULL_ID)) {
            if (future != null) {
                try {
                    future.get();
                }
                catch (InterruptedException ie) {
                    L2Utils.handleInterrupted(LOGGER, ie);
                }
                catch (ExecutionException e) {
                    throw new RuntimeException("Caught exception while persisting transaction order", e);
                }
            }
            this.prepareAckForSend(activeSender, activity.getActivityID(), ReplicationResultCode.RECEIVED);
        }
    }

    private void acknowledge(ServerID activeSender, SyncReplicationActivity activity, ReplicationResultCode code) {
        if (!activeSender.equals((Object)ServerID.NULL_ID)) {
            LOGGER.debug("{} acking {} as {}", new Object[]{activity.getTransactionID(), activity.getActivityID().id, code});
            this.prepareAckForSend(activeSender, activity.getActivityID(), code);
        }
    }

    private ReplicationMessageAck createAckMessage(ReplicationAckTuple initialActivity) {
        ReplicationMessageAck message = ReplicationMessageAck.createBatchAck();
        message.addToBatch(initialActivity);
        return message;
    }

    private synchronized void prepareAckForSend(ServerID sender, SyncReplicationActivity.ActivityID respondTo, ReplicationResultCode code) {
        boolean didCreate;
        if (!sender.equals((Object)this.cachedMessageAckFrom)) {
            this.cachedMessageAckFrom = sender;
            this.cachedBatchAck = new GroupMessageBatchContext<ReplicationMessageAck, ReplicationAckTuple>(this::createAckMessage, this.groupManager, this.cachedMessageAckFrom, maximumBatchSize, idealMessagesInFlight, node -> this.sendToActive());
        }
        if (didCreate = this.cachedBatchAck.batchMessage(new ReplicationAckTuple(respondTo, code))) {
            this.sendToActive();
        }
    }

    private void sendToActive() {
        if (!this.stateManager.isActiveCoordinator()) {
            this.sentToActive.addToSink(() -> {
                try {
                    this.cachedBatchAck.flushBatch();
                }
                catch (GroupException groupException) {
                    // empty catch block
                }
            });
        }
    }

    private static ServerEntityAction decodeReplicationType(SyncReplicationActivity.ActivityType networkType) {
        switch (networkType) {
            case SYNC_BEGIN: {
                throw Assert.failure((Object)"Shouldn't decode this type into an internal action");
            }
            case SYNC_START: 
            case SYNC_END: 
            case ORDERING_PLACEHOLDER: {
                return ServerEntityAction.ORDER_PLACEHOLDER_ONLY;
            }
            case LOCAL_ENTITY_GC: {
                return ServerEntityAction.MANAGED_ENTITY_GC;
            }
            case FLUSH_LOCAL_PIPELINE: {
                return ServerEntityAction.LOCAL_FLUSH;
            }
            case CREATE_ENTITY: {
                return ServerEntityAction.CREATE_ENTITY;
            }
            case RECONFIGURE_ENTITY: {
                return ServerEntityAction.RECONFIGURE_ENTITY;
            }
            case INVOKE_ACTION: {
                return ServerEntityAction.INVOKE_ACTION;
            }
            case DESTROY_ENTITY: {
                return ServerEntityAction.DESTROY_ENTITY;
            }
            case FETCH_ENTITY: {
                return ServerEntityAction.FETCH_ENTITY;
            }
            case RELEASE_ENTITY: {
                return ServerEntityAction.RELEASE_ENTITY;
            }
            case SYNC_ENTITY_BEGIN: {
                return ServerEntityAction.RECEIVE_SYNC_ENTITY_START_SYNCING;
            }
            case SYNC_ENTITY_CONCURRENCY_BEGIN: {
                return ServerEntityAction.RECEIVE_SYNC_ENTITY_KEY_START;
            }
            case SYNC_ENTITY_CONCURRENCY_PAYLOAD: {
                return ServerEntityAction.RECEIVE_SYNC_PAYLOAD;
            }
            case SYNC_ENTITY_CONCURRENCY_END: {
                return ServerEntityAction.RECEIVE_SYNC_ENTITY_KEY_END;
            }
            case SYNC_ENTITY_END: {
                return ServerEntityAction.RECEIVE_SYNC_ENTITY_END;
            }
            case DISCONNECT_CLIENT: {
                return ServerEntityAction.DISCONNECT_CLIENT;
            }
        }
        throw new AssertionError((Object)("bad replication type: " + networkType));
    }

    public static class SedaToken {
    }

    private static class DeferredContainer {
        public final ServerID activeSender;
        public final SyncReplicationActivity activity;

        public DeferredContainer(ServerID activeSender, SyncReplicationActivity activity) {
            this.activeSender = activeSender;
            this.activity = activity;
        }
    }

    public static class BasicServerEntityRequest
    implements ServerEntityRequest {
        private final ServerEntityAction action;
        private final ClientID source;
        private final ClientInstanceID instance;
        private final TransactionID transaction;
        private final TransactionID oldest;

        public BasicServerEntityRequest(ServerEntityAction action, ClientID source, ClientInstanceID instance, TransactionID transaction, TransactionID oldest) {
            this.action = action;
            this.source = source;
            this.instance = instance;
            this.transaction = transaction;
            this.oldest = oldest;
        }

        @Override
        public ServerEntityAction getAction() {
            return this.action;
        }

        @Override
        public ClientID getNodeID() {
            return this.source;
        }

        @Override
        public TransactionID getTransaction() {
            return this.transaction;
        }

        @Override
        public TransactionID getOldestTransactionOnClient() {
            return this.oldest;
        }

        @Override
        public ClientInstanceID getClientInstance() {
            return this.instance;
        }

        @Override
        public boolean requiresReceived() {
            return false;
        }

        @Override
        public Set<SessionID> replicateTo(Set<SessionID> passives) {
            return Collections.emptySet();
        }

        public String toString() {
            return "BasicServerEntityRequest{action=" + (Object)((Object)this.action) + ", source=" + this.source + ", instance=" + this.instance + ", transaction=" + this.transaction + ", oldest=" + this.oldest + '}';
        }
    }

    private class SyncState {
        private LinkedList<DeferredContainer> defer = new LinkedList();
        private final Set<FetchID> syncdFetches = new HashSet<FetchID>();
        private final Set<Integer> syncdKeys = new HashSet<Integer>();
        private FetchID syncingFetch = FetchID.NULL_ID;
        private int currentKey = -1;
        private boolean finished = false;
        private boolean started = false;

        private SyncState() {
        }

        private void start() {
            this.started = true;
        }

        private void startEntity(FetchID fetch) {
            this.assertStarted(null);
            Assert.assertTrue((boolean)this.syncingFetch.isNull());
            this.syncingFetch = fetch;
            this.syncdKeys.add(0);
            this.syncdKeys.add(Integer.MIN_VALUE);
            LOGGER.debug("Starting " + fetch);
        }

        private void endEntity(FetchID fetch) {
            this.assertStarted(null);
            Assert.assertEquals((Object)this.syncingFetch, (Object)fetch);
            this.syncdFetches.add(fetch);
            this.syncdKeys.clear();
            this.syncingFetch = FetchID.NULL_ID;
            LOGGER.debug("Ending " + fetch);
        }

        private void startConcurrency(FetchID fetch, int concurrency) {
            this.assertStarted(null);
            Assert.assertEquals((Object)this.syncingFetch, (Object)fetch);
            this.currentKey = concurrency;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Starting " + fetch + "/" + this.currentKey);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private Deque<DeferredContainer> endConcurrency(FetchID fetch, int concurrency) {
            this.assertStarted(null);
            try {
                Assert.assertEquals((Object)this.syncingFetch, (Object)fetch);
                Assert.assertEquals((int)this.currentKey, (int)concurrency);
                this.syncdKeys.add(concurrency);
                this.currentKey = -1;
                LinkedList<DeferredContainer> linkedList = this.defer;
                return linkedList;
            }
            finally {
                this.defer = new LinkedList();
            }
        }

        private Deque<DeferredContainer> finish() {
            this.assertStarted(null);
            this.syncdFetches.clear();
            this.finished = true;
            return this.defer;
        }

        private boolean ignore(SyncReplicationActivity activity) {
            if (!this.started) {
                return true;
            }
            if (this.finished) {
                return false;
            }
            return false;
        }

        private boolean defer(ServerID activeSender, SyncReplicationActivity activity) {
            this.assertStarted(activity);
            if (this.finished) {
                return false;
            }
            FetchID fetch = activity.getFetchID();
            if (this.syncdFetches.contains(fetch)) {
                return false;
            }
            SyncReplicationActivity.ActivityType activityType = activity.getActivityType();
            if (fetch.equals((Object)this.syncingFetch)) {
                int concurrencyKey = activity.getConcurrency();
                if (this.syncdKeys.contains(concurrencyKey)) {
                    return false;
                }
                if (SyncReplicationActivity.ActivityType.CREATE_ENTITY == activityType) {
                    return true;
                }
                if (SyncReplicationActivity.ActivityType.ORDERING_PLACEHOLDER == activityType) {
                    return false;
                }
                if (SyncReplicationActivity.ActivityType.DESTROY_ENTITY == activityType) {
                    return false;
                }
                if (this.currentKey == concurrencyKey) {
                    this.defer.add(new DeferredContainer(activeSender, activity));
                    return true;
                }
                if (concurrencyKey == Integer.MIN_VALUE) {
                    this.defer.add(new DeferredContainer(activeSender, activity));
                    return true;
                }
            }
            return false;
        }

        private void assertStarted(SyncReplicationActivity activity) {
            Assert.assertTrue((Object)activity, (boolean)this.started);
        }
    }
}

