/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.task;

import io.debezium.connector.spanner.db.model.Partition;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.metrics.event.TaskStateChangeQueueUpdateMetricEvent;
import io.debezium.connector.spanner.task.LoggerUtils;
import io.debezium.connector.spanner.task.TaskStateChangeEventHandler;
import io.debezium.connector.spanner.task.TaskStateUtil;
import io.debezium.connector.spanner.task.TaskSyncContextHolder;
import io.debezium.connector.spanner.task.state.NewPartitionsEvent;
import io.debezium.connector.spanner.task.state.TaskStateChangeEvent;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskStateChangeEventProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateChangeEventProcessor.class);
    private final BlockingQueue<TaskStateChangeEvent> queue;
    private final TaskSyncContextHolder taskSyncContextHolder;
    private final TaskStateChangeEventHandler taskStateChangeEventHandler;
    private final Consumer<Throwable> errorHandler;
    private final MetricsEventPublisher metricsEventPublisher;
    private volatile Thread thread;

    public TaskStateChangeEventProcessor(int queueCapacity, TaskSyncContextHolder taskSyncContextHolder, TaskStateChangeEventHandler taskStateChangeEventHandler, Consumer<Throwable> errorHandler, MetricsEventPublisher metricsEventPublisher) {
        this.queue = new ArrayBlockingQueue<TaskStateChangeEvent>(queueCapacity);
        this.taskSyncContextHolder = taskSyncContextHolder;
        this.errorHandler = errorHandler;
        this.taskStateChangeEventHandler = taskStateChangeEventHandler;
        this.metricsEventPublisher = metricsEventPublisher;
    }

    private Thread createEventHandlerThread() {
        Thread thread = new Thread(() -> {
            while (!Thread.interrupted()) {
                TaskStateChangeEvent event;
                try {
                    LOGGER.debug("createEventHandlerThread: Wait for sync event");
                    event = this.queue.take();
                    this.metricsEventPublisher.publishMetricEvent(new TaskStateChangeQueueUpdateMetricEvent(this.queue.remainingCapacity()));
                    LoggerUtils.debug(LOGGER, "createEventHandlerThread: Received sync event of type: {}, event: {}", event.getClass().getSimpleName(), event);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LOGGER.info("Task {}, interrupting the event handler thread", (Object)this.taskSyncContextHolder.get().getTaskUid());
                    return;
                }
                this.taskSyncContextHolder.awaitNewEpoch();
                this.taskSyncContextHolder.lock();
                try {
                    this.taskStateChangeEventHandler.processEvent(event);
                }
                catch (InterruptedException e) {
                    LOGGER.info("Task {}, interrupting the event handler thread", (Object)this.taskSyncContextHolder.get().getTaskUid());
                    Thread.currentThread().interrupt();
                }
                finally {
                    this.taskSyncContextHolder.unlock();
                }
            }
        }, "SpannerConnector-TaskStateChangeEventProcessor");
        thread.setUncaughtExceptionHandler((t, e) -> this.errorHandler.accept(e));
        return thread;
    }

    public void startProcessing() {
        if (this.thread != null) {
            return;
        }
        this.thread = this.createEventHandlerThread();
        this.thread.start();
    }

    public void stopProcessing() {
        if (this.thread != null) {
            this.queue.clear();
            this.thread.interrupt();
            this.thread = null;
        }
    }

    public void processEvent(TaskStateChangeEvent event) throws InterruptedException {
        if (event instanceof NewPartitionsEvent) {
            NewPartitionsEvent newPartitionsEvent = (NewPartitionsEvent)event;
            List<Partition> filteredPartitions = this.removeAlreadyExistingPartitions(newPartitionsEvent.getPartitions());
            if (!filteredPartitions.isEmpty()) {
                this.queue.put(new NewPartitionsEvent(filteredPartitions));
            }
        } else {
            this.queue.put(event);
        }
        this.metricsEventPublisher.publishMetricEvent(new TaskStateChangeQueueUpdateMetricEvent(this.queue.remainingCapacity()));
    }

    private List<Partition> removeAlreadyExistingPartitions(List<Partition> partitions) {
        Set<String> existingPartitions = TaskStateUtil.allPartitionTokens(this.taskSyncContextHolder.get());
        return partitions.stream().filter(p -> !existingPartitions.contains(p.getToken())).collect(Collectors.toList());
    }
}

