/*
 * Decompiled with CFR 0.152.
 */
package com.tc.objectserver.entity;

import com.tc.async.api.Sink;
import com.tc.l2.msg.ReplicationMessage;
import com.tc.l2.msg.SyncReplicationActivity;
import com.tc.net.ServerID;
import com.tc.net.groups.AbstractGroupMessage;
import com.tc.net.groups.GroupException;
import com.tc.net.groups.GroupManager;
import com.tc.object.FetchID;
import com.tc.object.session.SessionID;
import com.tc.objectserver.entity.MessagePayload;
import com.tc.objectserver.handler.GroupMessageBatchContext;
import com.tc.objectserver.handler.ReplicationSendingAction;
import com.tc.properties.TCPropertiesImpl;
import com.tc.util.Assert;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.tripwire.Event;
import org.terracotta.tripwire.TripwireFactory;

public class ReplicationSender {
    private static final int DEFAULT_BATCH_LIMIT = 1024;
    private static final int DEFAULT_INFLIGHT_MESSAGES = 1;
    private static int maximumBatchSize = TCPropertiesImpl.getProperties().getInt("active-passive.batchsize", 1024);
    private static int idealMessagesInFlight = TCPropertiesImpl.getProperties().getInt("active-passive.inflight", 1);
    private final GroupManager<AbstractGroupMessage> group;
    private final Map<SessionID, SyncState> filtering = new ConcurrentHashMap<SessionID, SyncState>();
    private static final Logger logger = LoggerFactory.getLogger(ReplicationSender.class);
    private static final Logger PLOGGER = LoggerFactory.getLogger(MessagePayload.class);
    private static final boolean debugLogging = logger.isDebugEnabled();
    private static final boolean debugMessaging = PLOGGER.isDebugEnabled();
    private final Sink<ReplicationSendingAction> outgoing;

    public ReplicationSender(Sink<ReplicationSendingAction> outgoing, GroupManager<AbstractGroupMessage> group) {
        this.group = group;
        this.outgoing = outgoing;
    }

    public void removePassive(SessionID dest) {
        this.filtering.remove(dest);
    }

    public boolean addPassive(ServerID node, SessionID session, Integer execution, SyncReplicationActivity activity) {
        Event event = TripwireFactory.createPrimeEvent((String)node.getName(), (byte[])node.getUID(), (long)session.toLong(), (long)activity.getSequenceID());
        SyncState state = this.createAndRegisterSyncState(node, session, execution);
        event.commit();
        return state.attemptToSend(activity);
    }

    public void replicateMessage(SessionID session, SyncReplicationActivity activity, Consumer<Boolean> sentCallback) {
        Optional<SyncState> syncing;
        if (debugLogging) {
            logger.debug("WIRE:" + activity);
        }
        if (debugMessaging) {
            PLOGGER.debug("SENDING:" + activity.getDebugID());
        }
        if ((syncing = this.getSyncState(session, activity)).isPresent()) {
            this.outgoing.addToSink((Object)new ReplicationSendingAction(syncing.get().executionLane, () -> {
                Optional<Boolean> didSend = syncing.map(state -> state.attemptToSend(activity));
                if (sentCallback != null) {
                    sentCallback.accept(didSend.orElse(false));
                }
            }));
        } else {
            logger.info("ignoring replication message no session {} for activity {}", (Object)session, (Object)activity);
            if (sentCallback != null) {
                sentCallback.accept(false);
            }
        }
    }

    private SyncState createAndRegisterSyncState(ServerID node, SessionID session, int lane) {
        Assert.assertTrue((!node.isNull() ? 1 : 0) != 0);
        Assert.assertTrue((!this.filtering.containsKey(session) ? 1 : 0) != 0);
        SyncState state = new SyncState(node, session, lane);
        this.filtering.put(session, state);
        return state;
    }

    private Optional<SyncState> getSyncState(SessionID session, SyncReplicationActivity activity) {
        SyncState state = this.filtering.get(session);
        if (null == state || !state.isSameSession(session)) {
            this.dropActivityForDisconnectedServer(session, activity);
            return Optional.empty();
        }
        return Optional.of(state);
    }

    private void dropActivityForDisconnectedServer(SessionID session, SyncReplicationActivity activity) {
        if (logger.isDebugEnabled()) {
            logger.debug("ignoring: " + session + " no longer exists");
        }
    }

    boolean isSyncOccuring(SessionID origin) {
        SyncState state = this.filtering.get(origin);
        if (state != null) {
            return state.isSyncOccuring();
        }
        return false;
    }

    private class SyncState {
        private final Set<FetchID> liveFetch = new HashSet<FetchID>();
        private final Set<Integer> syncdID = new HashSet<Integer>();
        private FetchID syncingFetch = FetchID.NULL_ID;
        private int syncingConcurrency = -1;
        boolean begun = false;
        boolean complete = false;
        private SyncReplicationActivity.ActivityType lastSeen;
        private SyncReplicationActivity.ActivityType lastSent;
        private final GroupMessageBatchContext<ReplicationMessage, SyncReplicationActivity> batchContext;
        private final SessionID session;
        private final int executionLane;

        public SyncState(ServerID target, SessionID nodeToId, int lane) {
            this.session = nodeToId;
            this.executionLane = lane;
            this.batchContext = new GroupMessageBatchContext<ReplicationMessage, SyncReplicationActivity>(ReplicationMessage::createActivityContainer, ReplicationSender.this.group, target, maximumBatchSize, idealMessagesInFlight, node -> this.flushBatch());
        }

        private boolean isSameSession(SessionID session) {
            return this.session.equals((Object)session);
        }

        public boolean isSyncOccuring() {
            return this.begun && !this.complete;
        }

        public boolean hasSyncBegun() {
            return this.begun;
        }

        public boolean hasSyncFinished() {
            return this.complete;
        }

        public boolean attemptToSend(SyncReplicationActivity activity) {
            boolean shouldRemoveFromStream;
            boolean bl = shouldRemoveFromStream = !this.hasSyncFinished() && !this.shouldMessageBeReplicated(activity) && this.hasSyncBegun();
            if (!shouldRemoveFromStream) {
                this.validateSending(activity);
                if (debugLogging && activity.getActivityType() != SyncReplicationActivity.ActivityType.SYNC_BEGIN) {
                    logger.debug("SENDING:" + activity.getActivityType() + " " + activity.getEntityID() + " " + activity.getFetchID() + " " + activity.getSource() + " " + activity.getClientInstanceID() + " " + activity.getActivityID().id);
                }
                this.send(activity);
                return true;
            }
            if (debugLogging) {
                logger.debug("FILTERING:" + activity);
            }
            return false;
        }

        private boolean shouldMessageBeReplicated(SyncReplicationActivity activity) {
            switch (this.validateInput(activity)) {
                case SYNC_BEGIN: {
                    this.begun = true;
                    return true;
                }
                case SYNC_ENTITY_BEGIN: {
                    if (this.liveFetch.contains(activity.getFetchID())) {
                        return false;
                    }
                    this.syncingFetch = activity.getFetchID();
                    this.syncdID.clear();
                    this.syncdID.add(0);
                    this.syncdID.add(Integer.MIN_VALUE);
                    this.syncingConcurrency = 0;
                    return true;
                }
                case SYNC_ENTITY_CONCURRENCY_BEGIN: {
                    if (this.syncingFetch.equals((Object)activity.getFetchID())) {
                        Assert.assertEquals((int)this.syncingConcurrency, (int)0);
                        this.syncingConcurrency = activity.getConcurrency();
                        return true;
                    }
                    return false;
                }
                case SYNC_ENTITY_CONCURRENCY_PAYLOAD: {
                    return this.syncingFetch.equals((Object)activity.getFetchID());
                }
                case SYNC_ENTITY_CONCURRENCY_END: {
                    if (this.syncingFetch.equals((Object)activity.getFetchID())) {
                        this.syncdID.add(this.syncingConcurrency);
                        this.syncingConcurrency = 0;
                        return true;
                    }
                    return false;
                }
                case SYNC_ENTITY_END: {
                    if (this.syncingFetch.equals((Object)activity.getFetchID())) {
                        this.liveFetch.add(this.syncingFetch);
                        this.syncingFetch = FetchID.NULL_ID;
                        return true;
                    }
                    return false;
                }
                case SYNC_END: {
                    this.complete = true;
                    this.liveFetch.clear();
                    this.syncdID.clear();
                    this.syncingFetch = FetchID.NULL_ID;
                    return true;
                }
                case CREATE_ENTITY: {
                    if (this.begun) {
                        this.liveFetch.add(activity.getFetchID());
                    }
                }
                case RECONFIGURE_ENTITY: 
                case FETCH_ENTITY: 
                case RELEASE_ENTITY: 
                case DISCONNECT_CLIENT: 
                case DESTROY_ENTITY: {
                    return this.begun;
                }
                case INVOKE_ACTION: {
                    if (this.liveFetch.contains(activity.getFetchID())) {
                        return true;
                    }
                    if (this.syncingFetch.equals((Object)activity.getFetchID())) {
                        int concurrencyKey = activity.getConcurrency();
                        if (this.syncingConcurrency == concurrencyKey) {
                            return true;
                        }
                        return this.syncdID.contains(concurrencyKey);
                    }
                    return false;
                }
                case LOCAL_ENTITY_GC: 
                case FLUSH_LOCAL_PIPELINE: 
                case ORDERING_PLACEHOLDER: {
                    return false;
                }
                case SYNC_START: {
                    return true;
                }
            }
            throw new AssertionError((Object)("unknown replication activity:" + activity));
        }

        public SyncReplicationActivity.ActivityType validateInput(SyncReplicationActivity activity) {
            SyncReplicationActivity.ActivityType type = activity.getActivityType();
            if (activity.isSyncActivity()) {
                this.lastSeen = this.validate(type, this.lastSeen);
            }
            return type;
        }

        public void validateSending(SyncReplicationActivity activity) {
            if (activity.isSyncActivity()) {
                this.lastSent = this.validate(activity.getActivityType(), this.lastSent);
            }
        }

        private SyncReplicationActivity.ActivityType validate(SyncReplicationActivity.ActivityType type, SyncReplicationActivity.ActivityType compare) {
            switch (type) {
                case SYNC_BEGIN: {
                    Assert.assertTrue((Object)(type + " " + compare), (boolean)EnumSet.of(SyncReplicationActivity.ActivityType.SYNC_START).contains(compare));
                    break;
                }
                case SYNC_ENTITY_BEGIN: {
                    Assert.assertTrue((Object)(type + " " + compare), (boolean)EnumSet.of(SyncReplicationActivity.ActivityType.SYNC_BEGIN, SyncReplicationActivity.ActivityType.SYNC_ENTITY_END).contains(compare));
                    break;
                }
                case SYNC_ENTITY_CONCURRENCY_BEGIN: {
                    Assert.assertTrue((Object)(type + " " + compare), (boolean)EnumSet.of(SyncReplicationActivity.ActivityType.SYNC_ENTITY_CONCURRENCY_PAYLOAD, SyncReplicationActivity.ActivityType.SYNC_ENTITY_BEGIN, SyncReplicationActivity.ActivityType.SYNC_ENTITY_CONCURRENCY_END).contains(compare));
                    break;
                }
                case SYNC_ENTITY_CONCURRENCY_PAYLOAD: {
                    Assert.assertTrue((Object)(type + " " + compare), (boolean)EnumSet.of(SyncReplicationActivity.ActivityType.SYNC_ENTITY_BEGIN, SyncReplicationActivity.ActivityType.SYNC_ENTITY_CONCURRENCY_BEGIN, SyncReplicationActivity.ActivityType.SYNC_ENTITY_CONCURRENCY_END, SyncReplicationActivity.ActivityType.SYNC_ENTITY_CONCURRENCY_PAYLOAD).contains(compare));
                    break;
                }
                case SYNC_ENTITY_CONCURRENCY_END: {
                    Assert.assertTrue((Object)(type + " " + compare), (boolean)EnumSet.of(SyncReplicationActivity.ActivityType.SYNC_ENTITY_CONCURRENCY_BEGIN, SyncReplicationActivity.ActivityType.SYNC_ENTITY_CONCURRENCY_PAYLOAD).contains(compare));
                    break;
                }
                case SYNC_ENTITY_END: {
                    Assert.assertTrue((Object)(type + " " + compare), (boolean)EnumSet.of(SyncReplicationActivity.ActivityType.SYNC_ENTITY_BEGIN, SyncReplicationActivity.ActivityType.SYNC_ENTITY_CONCURRENCY_END).contains(compare));
                    break;
                }
                case SYNC_END: {
                    Assert.assertTrue((Object)(type + " " + compare), (boolean)EnumSet.of(SyncReplicationActivity.ActivityType.SYNC_ENTITY_END, SyncReplicationActivity.ActivityType.SYNC_BEGIN).contains(compare));
                    break;
                }
                case SYNC_START: {
                    break;
                }
                default: {
                    throw new AssertionError((Object)"unexpected message type");
                }
            }
            return type;
        }

        private boolean send(SyncReplicationActivity activity) {
            if (this.batchContext.batchMessage(activity)) {
                this.flushBatch();
            }
            return true;
        }

        private void flushBatch() {
            ReplicationSender.this.outgoing.addToSink((Object)new ReplicationSendingAction(this.executionLane, () -> {
                try {
                    this.batchContext.flushBatch();
                }
                catch (GroupException ge) {
                    logger.warn("error sending message to passive ", (Throwable)ge);
                }
            }));
        }
    }
}

