/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.task;

import io.debezium.DebeziumException;
import io.debezium.connector.spanner.kafka.internal.model.MessageTypeEnum;
import io.debezium.connector.spanner.kafka.internal.model.RebalanceState;
import io.debezium.connector.spanner.kafka.internal.model.TaskState;
import io.debezium.connector.spanner.kafka.internal.model.TaskSyncEvent;
import io.debezium.connector.spanner.task.LoggerUtils;
import io.debezium.connector.spanner.task.TaskSyncContext;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyncEventMerger {
    private static final Logger LOGGER = LoggerFactory.getLogger(SyncEventMerger.class);

    private SyncEventMerger() {
    }

    public static TaskSyncContext merge(TaskSyncContext context, TaskSyncEvent inSync) {
        boolean foundDuplication = false;
        if (inSync.getMessageType() != MessageTypeEnum.REGULAR && context.checkDuplication(false, inSync.getMessageType().toString())) {
            foundDuplication = true;
        }
        Map<String, TaskState> newTaskStatesMap = inSync.getTaskStates();
        LoggerUtils.debug(LOGGER, "merge: state before {}, \nIncoming states: {}", context, newTaskStatesMap);
        TaskSyncContext.TaskSyncContextBuilder builder = context.toBuilder();
        HashSet<String> updatedStatesUids = new HashSet<String>();
        for (TaskState inTaskState : newTaskStatesMap.values()) {
            TaskState currentTaskState;
            if (inTaskState.getTaskUid().equals(context.getTaskUid()) || (currentTaskState = context.getTaskStates().get(inTaskState.getTaskUid())) != null && inTaskState.getStateTimestamp() <= currentTaskState.getStateTimestamp()) continue;
            updatedStatesUids.add(inTaskState.getTaskUid());
        }
        if (inSync.getMessageType() == MessageTypeEnum.UPDATE_EPOCH && !inSync.getTaskUid().equals(context.getTaskUid())) {
            LOGGER.info("Task {}, updating the epoch offset from the leader's UPDATE_EPOCH message {}: {}", new Object[]{context.getTaskUid(), inSync.getTaskUid(), inSync.getEpochOffset()});
            builder.epochOffsetHolder(context.getEpochOffsetHolder().nextOffset(inSync.getEpochOffset()));
        }
        if (!updatedStatesUids.isEmpty()) {
            Stream<Map.Entry> oldStatesStream = context.getTaskStates().entrySet().stream().filter(e -> !updatedStatesUids.contains(e.getKey()));
            Stream<Map.Entry> updatedStatesStream = newTaskStatesMap.entrySet().stream().filter(e -> updatedStatesUids.contains(e.getKey()));
            Map<String, TaskState> mergedTaskStates = Stream.concat(oldStatesStream, updatedStatesStream).collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
            builder.taskStates(mergedTaskStates).createdTimestamp(Long.max(context.getCreatedTimestamp(), inSync.getMessageTimestamp()));
            TaskSyncContext result = builder.build();
            LoggerUtils.debug(LOGGER, "merge: final state {}, \nUpdated uids: {}, epoch: {}", result, updatedStatesUids, result.getRebalanceGenerationId());
            if (inSync.getMessageType() != MessageTypeEnum.REGULAR && !foundDuplication && result.checkDuplication(true, inSync.getMessageType().toString())) {
                LOGGER.info("Task {} found duplication after processing {}", (Object)context.getTaskUid(), (Object)inSync);
                LOGGER.info("Task {} final message {}", (Object)context.getTaskUid(), (Object)result);
            }
            return result;
        }
        LOGGER.debug("merge: final state is not changed");
        return builder.build();
    }

    public static TaskSyncContext mergeNewEpoch(TaskSyncContext currentContext, TaskSyncEvent inSync) {
        boolean start_initial_sync;
        TaskSyncContext.TaskSyncContextBuilder builder = currentContext.toBuilder();
        Set allNewEpochTasks = inSync.getTaskStates().values().stream().map(taskState -> taskState.getTaskUid()).collect(Collectors.toSet());
        boolean bl = start_initial_sync = currentContext.getRebalanceState() == RebalanceState.START_INITIAL_SYNC;
        if (!start_initial_sync && !allNewEpochTasks.contains(currentContext.getTaskUid())) {
            LOGGER.warn("Task {} - Received new epoch message , but leader did not include the task in the new epoch message, throwing exception", (Object)currentContext.getTaskUid());
            throw new DebeziumException("New epoch message does not contain task state " + currentContext.getTaskUid());
        }
        boolean foundDuplication = false;
        if (currentContext.checkDuplication(false, "NEW_EPOCH")) {
            foundDuplication = true;
        }
        if (inSync.getTaskUid().equals(currentContext.getTaskUid())) {
            return builder.build();
        }
        HashMap<String, TaskState> newTaskStates = new HashMap<String, TaskState>(inSync.getTaskStates());
        newTaskStates.remove(currentContext.getTaskUid());
        TaskState.TaskStateBuilder currentTaskBuilder = currentContext.getCurrentTaskState().toBuilder();
        if (RebalanceState.START_INITIAL_SYNC.equals((Object)currentContext.getRebalanceState())) {
            LOGGER.info("Task {}, updating the rebalance generation ID from the leader new epoch {}: {}", new Object[]{currentContext.getTaskUid(), inSync.getTaskUid(), inSync.getRebalanceGenerationId()});
            builder.rebalanceGenerationId(inSync.getRebalanceGenerationId());
            currentTaskBuilder.rebalanceGenerationId(inSync.getRebalanceGenerationId());
        } else {
            builder.rebalanceState(RebalanceState.NEW_EPOCH_STARTED);
        }
        LOGGER.info("Task {}, updating the epoch offset from the leader new epoch {}: {}", new Object[]{currentContext.getTaskUid(), inSync.getTaskUid(), inSync.getEpochOffset()});
        builder.createdTimestamp(inSync.getMessageTimestamp()).epochOffsetHolder(currentContext.getEpochOffsetHolder().nextOffset(inSync.getEpochOffset())).taskStates(newTaskStates).currentTaskState(currentTaskBuilder.build());
        TaskSyncContext result = builder.build();
        if (!foundDuplication && result.checkDuplication(true, "NEW_EPOCH")) {
            LOGGER.warn("Task {}, duplication exists after processing new epoch, old context {}", (Object)result.getTaskUid(), (Object)currentContext);
            LOGGER.warn("Task {}, duplication exists after processing new epoch, new message {}", (Object)result.getTaskUid(), (Object)inSync);
            LOGGER.warn("Task {}, duplication exists after processing new epoch, resulting context {}", (Object)result.getTaskUid(), (Object)result);
        }
        return result;
    }
}

