/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.task;

import io.debezium.connector.spanner.kafka.internal.model.TaskState;
import io.debezium.connector.spanner.kafka.internal.model.TaskSyncEvent;
import io.debezium.connector.spanner.task.LoggerUtils;
import io.debezium.connector.spanner.task.TaskSyncContext;
import java.util.HashSet;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyncEventMerger {
    private static final Logger LOGGER = LoggerFactory.getLogger(SyncEventMerger.class);

    private SyncEventMerger() {
    }

    public static TaskSyncContext merge(TaskSyncContext context, TaskSyncEvent inSync) {
        Map<String, TaskState> newTaskStatesMap = inSync.getTaskStates();
        LoggerUtils.debug(LOGGER, "merge: state before {}, \nIncoming states: {}", context, newTaskStatesMap);
        TaskSyncContext.TaskSyncContextBuilder builder = context.toBuilder();
        HashSet<String> updatedStatesUids = new HashSet<String>();
        for (TaskState inTaskState : newTaskStatesMap.values()) {
            TaskState currentTaskState;
            if (inTaskState.getTaskUid().equals(context.getTaskUid()) || (currentTaskState = context.getTaskStates().get(inTaskState.getTaskUid())) != null && inTaskState.getStateTimestamp() <= currentTaskState.getStateTimestamp()) continue;
            updatedStatesUids.add(inTaskState.getTaskUid());
        }
        if (!updatedStatesUids.isEmpty()) {
            Stream<Map.Entry> oldStatesStream = context.getTaskStates().entrySet().stream().filter(e -> !updatedStatesUids.contains(e.getKey()));
            Stream<Map.Entry> updatedStatesStream = newTaskStatesMap.entrySet().stream().filter(e -> updatedStatesUids.contains(e.getKey()));
            Map<String, TaskState> mergedTaskStates = Stream.concat(oldStatesStream, updatedStatesStream).collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
            builder.taskStates(mergedTaskStates).createdTimestamp(Long.max(context.getCreatedTimestamp(), inSync.getMessageTimestamp()));
            TaskSyncContext result = builder.build();
            LoggerUtils.debug(LOGGER, "merge: final state {}, \nUpdated uids: {}, epoch: {}", result, updatedStatesUids, result.getRebalanceGenerationId());
            return result;
        }
        LOGGER.debug("merge: final state is not changed");
        return builder.build();
    }
}

