/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.task;

import io.debezium.connector.spanner.kafka.internal.model.RebalanceState;
import io.debezium.connector.spanner.kafka.internal.model.TaskState;
import io.debezium.connector.spanner.kafka.internal.model.TaskSyncEvent;
import io.debezium.connector.spanner.task.LoggerUtils;
import io.debezium.connector.spanner.task.TaskSyncContext;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyncEventMerger {
    private static final Logger LOGGER = LoggerFactory.getLogger(SyncEventMerger.class);

    private SyncEventMerger() {
    }

    public static TaskSyncContext mergeIncrementalTaskSyncEvent(TaskSyncContext currentContext, TaskSyncEvent newMessage) {
        Map<String, TaskState> newTaskStatesMap = newMessage.getTaskStates();
        LoggerUtils.debug(LOGGER, "merge: state before {}, \nIncoming states: {}", currentContext, newTaskStatesMap);
        TaskSyncContext.TaskSyncContextBuilder builder = currentContext.toBuilder();
        TaskState newTask = newMessage.getTaskStates().get(newMessage.getTaskUid());
        if (newTask == null) {
            LOGGER.warn("The rebalance answer {} did not contain the task's UID: {}", (Object)newMessage, (Object)newMessage.getTaskUid());
            return builder.build();
        }
        if (newTask.getTaskUid().equals(currentContext.getTaskUid())) {
            return builder.build();
        }
        long oldPartitions = currentContext.getNumPartitions() + currentContext.getNumSharedPartitions();
        TaskState currentTask = currentContext.getTaskStates().get(newMessage.getTaskUid());
        if (currentTask == null) {
            LOGGER.debug("Task {}, The task's UID: {} not contained in current task states map", (Object)currentContext.getTaskUid(), (Object)newMessage.getTaskUid());
            return builder.build();
        }
        if (newTask.getStateTimestamp() > currentTask.getStateTimestamp()) {
            HashMap<String, TaskState> taskStates = new HashMap<String, TaskState>(currentContext.getTaskStates());
            taskStates.remove(newMessage.getTaskUid());
            taskStates.put(newMessage.getTaskUid(), newTask);
            builder.taskStates(taskStates).createdTimestamp(Long.max(currentContext.getCreatedTimestamp(), newMessage.getMessageTimestamp()));
            TaskSyncContext result = builder.build();
            long newPartitions = result.getNumPartitions() + result.getNumSharedPartitions();
            if (newPartitions != oldPartitions) {
                LOGGER.debug("Task {}, processed incremental answer {}: {}, task has total partitions {}, num partitions {}, num shared partitions {}, num old partitions {}", new Object[]{currentContext.getTaskUid(), newMessage, result.getNumPartitions() + result.getNumSharedPartitions(), result.getNumPartitions(), result.getNumSharedPartitions(), oldPartitions});
            }
            return result;
        }
        LOGGER.debug("merge: final state is not changed");
        return builder.build();
    }

    public static TaskSyncContext mergeRebalanceAnswer(TaskSyncContext currentContext, TaskSyncEvent newMessage) {
        Map<String, TaskState> newTaskStatesMap = newMessage.getTaskStates();
        LoggerUtils.debug(LOGGER, "merge: state before {}, \nIncoming states: {}", currentContext, newTaskStatesMap);
        TaskSyncContext.TaskSyncContextBuilder builder = currentContext.toBuilder();
        TaskState newTask = newMessage.getTaskStates().get(newMessage.getTaskUid());
        if (newTask == null) {
            LOGGER.warn("Task {}, The rebalance answer {} did not contain the task's UID: {}", new Object[]{currentContext.getTaskUid(), newMessage, newMessage.getTaskUid()});
            return builder.build();
        }
        if (newTask.getTaskUid().equals(currentContext.getTaskUid())) {
            return builder.build();
        }
        long oldPartitions = currentContext.getNumPartitions() + currentContext.getNumSharedPartitions();
        TaskState currentTask = currentContext.getTaskStates().get(newMessage.getTaskUid());
        if (currentTask == null || newTask.getStateTimestamp() > currentTask.getStateTimestamp()) {
            HashMap<String, TaskState> taskStates = new HashMap<String, TaskState>(currentContext.getTaskStates());
            taskStates.remove(newMessage.getTaskUid());
            taskStates.put(newMessage.getTaskUid(), newTask);
            builder.taskStates(taskStates).createdTimestamp(Long.max(currentContext.getCreatedTimestamp(), newMessage.getMessageTimestamp()));
            TaskSyncContext result = builder.build();
            LOGGER.info("Task {}, Processed rebalance answer from task {} for rebalance generation id {}, task has total partitions {}, num partitions {}, num shared partitions {}, num old partitions {}", new Object[]{currentContext.getTaskUid(), newMessage.getTaskUid(), newMessage.getRebalanceGenerationId(), result.getNumPartitions() + result.getNumSharedPartitions(), result.getNumPartitions(), result.getNumSharedPartitions(), oldPartitions});
            return result;
        }
        LOGGER.info("Task {}, Skipping rebalance answer from task {} for rebalance generation id {}", new Object[]{currentContext.getTaskUid(), newMessage.getTaskUid(), newMessage.getRebalanceGenerationId()});
        LOGGER.debug("merge: final state is not changed");
        return builder.build();
    }

    public static TaskSyncContext mergeEpochUpdate(TaskSyncContext currentContext, TaskSyncEvent newMessage) {
        boolean startInitialSync;
        Map<String, TaskState> newTaskStatesMap = newMessage.getTaskStates();
        LoggerUtils.debug(LOGGER, "merge: state before {}, \nIncoming states: {}", currentContext, newTaskStatesMap);
        TaskSyncContext.TaskSyncContextBuilder builder = currentContext.toBuilder();
        boolean bl = startInitialSync = currentContext.getRebalanceState() == RebalanceState.START_INITIAL_SYNC;
        if (!startInitialSync && !currentContext.getRebalanceState().equals((Object)RebalanceState.NEW_EPOCH_STARTED)) {
            return builder.build();
        }
        if (newMessage.getTaskUid().equals(currentContext.getTaskUid())) {
            return builder.build();
        }
        long oldPartitions = currentContext.getNumPartitions() + currentContext.getNumSharedPartitions();
        builder.epochOffsetHolder(currentContext.getEpochOffsetHolder().nextOffset(newMessage.getEpochOffset()));
        TaskState.TaskStateBuilder currentTaskBuilder = currentContext.getCurrentTaskState().toBuilder();
        if (RebalanceState.START_INITIAL_SYNC.equals((Object)currentContext.getRebalanceState())) {
            LOGGER.info("Task {}, updating the rebalance generation ID from the leader epoch update {}: {}", new Object[]{currentContext.getTaskUid(), newMessage.getTaskUid(), newMessage.getRebalanceGenerationId()});
            builder.rebalanceGenerationId(newMessage.getRebalanceGenerationId());
            currentTaskBuilder.rebalanceGenerationId(newMessage.getRebalanceGenerationId());
        }
        Set updateEpochTaskUids = newTaskStatesMap.keySet().stream().collect(Collectors.toSet());
        Map<String, TaskState> currentTaskStates = currentContext.getTaskStates();
        HashMap<String, TaskState> filteredTaskStates = new HashMap<String, TaskState>();
        for (Map.Entry<String, TaskState> currentTaskState : currentTaskStates.entrySet()) {
            if (!updateEpochTaskUids.contains(currentTaskState.getKey())) {
                LOGGER.info("Task {}, removing task state {} since it is not included in the UPDATE_EPOCH message {}", new Object[]{currentContext.getTaskUid(), currentTaskState.getKey(), updateEpochTaskUids});
                continue;
            }
            filteredTaskStates.put((String)currentTaskState.getKey(), (TaskState)currentTaskState.getValue());
        }
        HashSet<String> updatedStatesUids = new HashSet<String>();
        for (TaskState newTaskState : newTaskStatesMap.values()) {
            TaskState currentTaskState;
            if (newTaskState.getTaskUid().equals(currentContext.getTaskUid()) || (currentTaskState = (TaskState)filteredTaskStates.get(newTaskState.getTaskUid())) != null && newTaskState.getStateTimestamp() <= currentTaskState.getStateTimestamp()) continue;
            updatedStatesUids.add(newTaskState.getTaskUid());
        }
        Stream<Map.Entry> oldStatesStream = filteredTaskStates.entrySet().stream().filter(e -> !updatedStatesUids.contains(e.getKey()));
        Stream<Map.Entry> updatedStatesStream = newTaskStatesMap.entrySet().stream().filter(e -> updatedStatesUids.contains(e.getKey()));
        Map<String, TaskState> mergedTaskStates = Stream.concat(oldStatesStream, updatedStatesStream).collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
        TaskSyncContext result = builder.taskStates(mergedTaskStates).createdTimestamp(Long.max(currentContext.getCreatedTimestamp(), newMessage.getMessageTimestamp())).currentTaskState(currentTaskBuilder.build()).epochOffsetHolder(currentContext.getEpochOffsetHolder().nextOffset(newMessage.getEpochOffset())).build();
        int numPartitions = result.getNumPartitions();
        int numSharedPartitions = result.getNumSharedPartitions();
        long newPartitions = numPartitions + numSharedPartitions;
        LOGGER.debug("Task {}, updating the epoch offset from the leader's UPDATE_EPOCH message {}: {}, task has total partitions {}, num partitions {}, num shared partitions {}, num old partitions {}", new Object[]{currentContext.getTaskUid(), newMessage.getTaskUid(), newMessage.getEpochOffset(), numPartitions + numSharedPartitions, numPartitions, numSharedPartitions, oldPartitions});
        return result;
    }

    public static TaskSyncContext mergeNewEpoch(TaskSyncContext currentContext, TaskSyncEvent inSync) {
        boolean startInitialSync;
        TaskSyncContext.TaskSyncContextBuilder builder = currentContext.toBuilder();
        if (inSync.getTaskUid().equals(currentContext.getTaskUid())) {
            return builder.build();
        }
        boolean foundDuplication = false;
        if (currentContext.checkDuplication(false, "NEW_EPOCH")) {
            foundDuplication = true;
        }
        long oldPartitions = currentContext.getNumPartitions() + currentContext.getNumSharedPartitions();
        HashMap<String, TaskState> newTaskStates = new HashMap<String, TaskState>(inSync.getTaskStates());
        newTaskStates.remove(currentContext.getTaskUid());
        boolean bl = startInitialSync = currentContext.getRebalanceState() == RebalanceState.START_INITIAL_SYNC;
        if (!startInitialSync) {
            Set allNewEpochTasks = inSync.getTaskStates().values().stream().map(taskState -> taskState.getTaskUid()).collect(Collectors.toSet());
            if (!allNewEpochTasks.contains(currentContext.getTaskUid())) {
                LOGGER.warn("Task {} - Received new epoch message , but leader {} did not include the task in the new epoch message with rebalance ID {} with tasks {}, probably just initialized, throw exception", new Object[]{currentContext.getTaskUid(), inSync.getTaskUid(), inSync.getRebalanceGenerationId(), inSync.getTaskStates().keySet().stream().collect(Collectors.toList())});
            } else if (inSync.getRebalanceGenerationId() < currentContext.getReceivedRebalanceGenerationId()) {
                LOGGER.warn("Task {} - Received new epoch message from {} , but the new epoch message had rebalance generation ID {} while the latest rebalance generation ID is {}", new Object[]{currentContext.getTaskUid(), inSync.getTaskUid(), inSync.getRebalanceGenerationId(), currentContext.getReceivedRebalanceGenerationId()});
            } else {
                LOGGER.info("Task {}, updating the rebalance state to NEW_EPOCH_STARTED {}: {}", new Object[]{currentContext.getTaskUid(), inSync.getTaskUid(), inSync.getRebalanceGenerationId()});
                builder.rebalanceState(RebalanceState.NEW_EPOCH_STARTED);
            }
        }
        TaskState.TaskStateBuilder currentTaskBuilder = currentContext.getCurrentTaskState().toBuilder();
        LOGGER.info("Task {}, updating the rebalance generation ID and epoch offset from the leader new epoch {}: {}, {}", new Object[]{currentContext.getTaskUid(), inSync.getTaskUid(), inSync.getRebalanceGenerationId(), inSync.getEpochOffset()});
        currentTaskBuilder.rebalanceGenerationId(inSync.getRebalanceGenerationId());
        builder.createdTimestamp(inSync.getMessageTimestamp()).rebalanceGenerationId(inSync.getRebalanceGenerationId()).epochOffsetHolder(currentContext.getEpochOffsetHolder().nextOffset(inSync.getEpochOffset())).taskStates(newTaskStates).currentTaskState(currentTaskBuilder.build());
        TaskSyncContext result = builder.build();
        long newPartitions = result.getNumPartitions() + result.getNumSharedPartitions();
        LOGGER.info("Task {}, processed new epoch message {}: {}, task has total partitions {}, num partitions {}, num shared partitions {}, num old partitions {}", new Object[]{currentContext.getTaskUid(), inSync.getTaskUid(), inSync.getEpochOffset(), result.getNumPartitions() + result.getNumSharedPartitions(), result.getNumPartitions(), result.getNumSharedPartitions(), oldPartitions});
        if (!foundDuplication && result.checkDuplication(true, "NEW_EPOCH")) {
            LOGGER.debug("Task {}, duplication exists after processing new epoch, old context {}", (Object)result.getTaskUid(), (Object)currentContext);
            LOGGER.debug("Task {}, duplication exists after processing new epoch, new message {}", (Object)result.getTaskUid(), (Object)inSync);
            LOGGER.debug("Task {}, duplication exists after processing new epoch, resulting context {}", (Object)result.getTaskUid(), (Object)result);
        }
        return result;
    }
}

