/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.task.leader;

import io.debezium.DebeziumException;
import io.debezium.connector.spanner.kafka.internal.KafkaConsumerAdminService;
import io.debezium.connector.spanner.kafka.internal.TaskSyncPublisher;
import io.debezium.connector.spanner.kafka.internal.model.RebalanceState;
import io.debezium.connector.spanner.kafka.internal.model.TaskState;
import io.debezium.connector.spanner.kafka.internal.model.TaskSyncEvent;
import io.debezium.connector.spanner.task.LoggerUtils;
import io.debezium.connector.spanner.task.TaskStateUtil;
import io.debezium.connector.spanner.task.TaskSyncContext;
import io.debezium.connector.spanner.task.TaskSyncContextHolder;
import io.debezium.connector.spanner.task.leader.LeaderService;
import io.debezium.connector.spanner.task.leader.rebalancer.TaskPartitionRebalancer;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LeaderAction {
    private static final Logger LOGGER = LoggerFactory.getLogger(LeaderAction.class);
    private static final Duration EPOCH_OFFSET_UPDATE_DURATION = Duration.ofSeconds(60L);
    private final TaskSyncContextHolder taskSyncContextHolder;
    private final KafkaConsumerAdminService kafkaAdminService;
    private final LeaderService leaderService;
    private final TaskPartitionRebalancer taskPartitonRebalancer;
    private final TaskSyncPublisher taskSyncPublisher;
    private volatile Thread leaderThread;
    private Consumer<Throwable> errorHandler;
    private final Duration sleepInterval = Duration.ofMillis(100L);
    private final Clock clock;

    public LeaderAction(TaskSyncContextHolder taskSyncContextHolder, KafkaConsumerAdminService kafkaAdminService, LeaderService leaderService, TaskPartitionRebalancer taskPartitonRebalancer, TaskSyncPublisher taskSyncPublisher, Consumer<Throwable> errorHandler) {
        this.taskSyncContextHolder = taskSyncContextHolder;
        this.kafkaAdminService = kafkaAdminService;
        this.leaderService = leaderService;
        this.taskPartitonRebalancer = taskPartitonRebalancer;
        this.taskSyncPublisher = taskSyncPublisher;
        this.errorHandler = errorHandler;
        this.clock = Clock.system();
    }

    private Thread createLeaderThread() {
        Thread thread = new Thread(() -> {
            LOGGER.info("performLeaderAction: Task {} start leader thread with rebalance generation ID {}", (Object)this.taskSyncContextHolder.get().getTaskUid(), (Object)this.taskSyncContextHolder.get().getRebalanceGenerationId());
            try {
                this.newEpoch();
            }
            catch (InterruptedException e) {
                LOGGER.info("performLeaderAction: Task {} stop leader thread", (Object)this.taskSyncContextHolder.get().getTaskUid());
                Thread.currentThread().interrupt();
                return;
            }
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Thread.sleep(EPOCH_OFFSET_UPDATE_DURATION.toMillis());
                    if (this.taskSyncContextHolder.get().getRebalanceState() != RebalanceState.NEW_EPOCH_STARTED) continue;
                    this.publishEpochOffset();
                }
                catch (InterruptedException e) {
                    LOGGER.info("performLeaderAction: Task {} stop leader thread", (Object)this.taskSyncContextHolder.get().getTaskUid());
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            LOGGER.info("performLeaderAction: Task {} stopped leader thread", (Object)this.taskSyncContextHolder.get().getTaskUid());
        }, "SpannerConnector-LeaderAction");
        thread.setUncaughtExceptionHandler((t, ex) -> {
            LOGGER.error("Leader action execution error, task {}, ex {}", (Object)this.taskSyncContextHolder.get().getTaskUid(), (Object)ex.getStackTrace());
            this.errorHandler.accept(ex);
        });
        return thread;
    }

    private TaskSyncContext publishEpochOffset() throws InterruptedException {
        TaskSyncContext taskSyncContext = this.taskSyncContextHolder.updateAndGet(oldContext -> oldContext.toBuilder().epochOffsetHolder(oldContext.getEpochOffsetHolder().nextOffset(oldContext.getCurrentKafkaRecordOffset())).build());
        TaskSyncEvent taskSyncEvent = taskSyncContext.buildUpdateEpochTaskSyncEvent();
        this.taskSyncPublisher.send(taskSyncEvent);
        int numPartitions = taskSyncEvent.getNumPartitions();
        int numSharedPartitions = taskSyncEvent.getNumSharedPartitions();
        LOGGER.info("Task {} - Leader task has updated the epoch offset with rebalance generation ID: {} and epoch offset: {}, num partitions {}, num shared partitions {}", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), taskSyncContext.getRebalanceGenerationId(), taskSyncContext.getEpochOffsetHolder().getEpochOffset(), numPartitions, numSharedPartitions});
        return taskSyncContext;
    }

    public void start() {
        if (this.leaderThread != null) {
            this.stop();
        }
        this.leaderThread = this.createLeaderThread();
        this.leaderThread.start();
    }

    public synchronized void stop() {
        if (this.leaderThread == null) {
            return;
        }
        LOGGER.info("Task {}, trying to stop leader thread with rebalance generation ID {}", (Object)this.taskSyncContextHolder.get().getTaskUid(), (Object)this.taskSyncContextHolder.get().getRebalanceGenerationId());
        this.leaderThread.interrupt();
        LOGGER.info("Task {}, interrupted leader thread with rebalance generation ID {}", (Object)this.taskSyncContextHolder.get().getTaskUid(), (Object)this.taskSyncContextHolder.get().getRebalanceGenerationId());
        Metronome metronome = Metronome.sleeper((Duration)this.sleepInterval, (Clock)this.clock);
        while (!this.leaderThread.getState().equals((Object)Thread.State.TERMINATED)) {
            try {
                metronome.pause();
                LOGGER.info("Task {}, interrupting leader thread again with rebalance generation ID {}", (Object)this.taskSyncContextHolder.get().getTaskUid(), (Object)this.taskSyncContextHolder.get().getRebalanceGenerationId());
                this.leaderThread.interrupt();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        LOGGER.info("Task {}, stopped leader thread with rebalance generation ID {}", (Object)this.taskSyncContextHolder.get().getTaskUid(), (Object)this.taskSyncContextHolder.get().getRebalanceGenerationId());
        this.leaderThread = null;
    }

    private void newEpoch() throws InterruptedException {
        LOGGER.info("performLeaderActions: new epoch initialization");
        boolean startFromScratch = this.leaderService.isStartFromScratch();
        Set<String> activeConsumers = this.kafkaAdminService.getActiveConsumerGroupMembers();
        LOGGER.info("performLeaderActions: consumers found {}", activeConsumers);
        Map<String, String> consumerToTaskMap = this.leaderService.awaitAllNewTaskStateUpdates(activeConsumers, this.taskSyncContextHolder.get().getRebalanceGenerationId());
        LOGGER.info("performLeaderActions: answers received {}", consumerToTaskMap);
        if (consumerToTaskMap.size() < activeConsumers.size()) {
            LOGGER.info("TaskUid {}, Expected active consumers {}, but only received consumers {}, not sending new epoch", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), activeConsumers, consumerToTaskMap});
            throw new DebeziumException("Task Uid " + this.taskSyncContextHolder.get().getTaskUid() + " Expected active consumers " + activeConsumers.toString() + " but only received consumers " + consumerToTaskMap.toString() + " not sending new epoch ");
        }
        TaskSyncContext staleContext = this.taskSyncContextHolder.get();
        boolean foundDuplication = false;
        if (staleContext.checkDuplication(false, "NEW EPOCH rebalance event, initial context")) {
            foundDuplication = true;
        }
        int numOldPartitions = this.taskSyncContextHolder.get().getNumPartitions();
        int numSharedOldPartitions = this.taskSyncContextHolder.get().getNumSharedPartitions();
        LOGGER.info("Task {} - before sending new epoch, total partitions {}, num partitions {}, num shared partitions {}", new Object[]{numOldPartitions + numSharedOldPartitions, numOldPartitions, numSharedOldPartitions});
        TaskSyncContext taskSyncContext = this.taskSyncContextHolder.updateAndGet(oldContext -> {
            TaskState leaderState = oldContext.getCurrentTaskState();
            Map<String, TaskState> currentTaskStates = oldContext.getAllTaskStates();
            Set taskUids = currentTaskStates.keySet().stream().collect(Collectors.toSet());
            LOGGER.info("Task {}, Current task states in old context: {}", (Object)oldContext.getTaskUid(), taskUids);
            Map<Boolean, Map<String, TaskState>> isSurvivedPartitionedTaskStates = TaskStateUtil.splitSurvivedAndObsoleteTaskStates(currentTaskStates, consumerToTaskMap.values());
            Map<String, TaskState> survivedTasks = isSurvivedPartitionedTaskStates.get(true);
            Map<String, TaskState> obsoleteTasks = isSurvivedPartitionedTaskStates.get(false);
            leaderState = this.taskPartitonRebalancer.rebalance(leaderState, survivedTasks, obsoleteTasks);
            return oldContext.toBuilder().currentTaskState(leaderState).rebalanceState(RebalanceState.NEW_EPOCH_STARTED).taskStates(TaskStateUtil.filterSurvivedTasksStates(oldContext.getTaskStates(), survivedTasks.keySet())).epochOffsetHolder(oldContext.getEpochOffsetHolder().nextOffset(oldContext.getCurrentKafkaRecordOffset())).build();
        });
        if (!foundDuplication) {
            taskSyncContext.checkDuplication(true, "NEW EPOCH rebalance event, resulting context");
        }
        TaskSyncEvent taskSyncEvent = taskSyncContext.buildNewEpochTaskSyncEvent();
        int numPartitions = taskSyncEvent.getNumPartitions();
        int numSharedPartitions = taskSyncEvent.getNumSharedPartitions();
        Set taskUids = taskSyncEvent.getTaskStates().keySet().stream().collect(Collectors.toSet());
        LOGGER.info("Task {} - sent new epoch with rebalance generation ID {}, num tasks {}, total partitions {}, num owned partitions {}, num shared partitions {}, task Uids {}", new Object[]{taskSyncContext.getTaskUid(), taskSyncContext.getRebalanceGenerationId(), taskSyncEvent.getTaskStates().size(), numPartitions + numSharedPartitions, numPartitions, numSharedPartitions, taskUids, numPartitions + numSharedPartitions});
        this.taskSyncPublisher.send(taskSyncEvent);
        if (startFromScratch) {
            this.leaderService.newParentPartition();
            LOGGER.info("performLeaderActions: newParentPartition");
        }
        LoggerUtils.debug(LOGGER, "performLeaderActions: new epoch {}", taskSyncEvent);
    }
}

