/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline;

import io.debezium.annotation.ThreadSafe;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.util.Threads;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class ChangeEventSourceCoordinator {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventSourceCoordinator.class);
    private static final Duration SHUTDOWN_WAIT_TIMEOUT = Duration.ofSeconds(90L);
    private final OffsetContext previousOffset;
    private final ErrorHandler errorHandler;
    private final ChangeEventSourceFactory changeEventSourceFactory;
    private final ExecutorService executor;
    private volatile boolean running;
    private volatile StreamingChangeEventSource streamingSource;

    public ChangeEventSourceCoordinator(OffsetContext previousOffset, ErrorHandler errorHandler, Class<? extends SourceConnector> connectorType, String logicalName, ChangeEventSourceFactory changeEventSourceFactory) {
        this.previousOffset = previousOffset;
        this.errorHandler = errorHandler;
        this.changeEventSourceFactory = changeEventSourceFactory;
        this.executor = Threads.newSingleThreadExecutor(connectorType, logicalName, "change-event-source-coordinator");
    }

    public synchronized void start() {
        this.running = true;
        this.executor.submit(() -> {
            try {
                ChangeEventSourceContextImpl context = new ChangeEventSourceContextImpl();
                SnapshotChangeEventSource snapshotSource = this.changeEventSourceFactory.getSnapshotChangeEventSource(this.previousOffset);
                SnapshotResult snapshotResult = snapshotSource.execute(context);
                if (this.running && snapshotResult.getStatus() == SnapshotResult.SnapshotResultStatus.COMPLETED) {
                    this.streamingSource = this.changeEventSourceFactory.getStreamingChangeEventSource(snapshotResult.getOffset());
                    this.streamingSource.execute(context);
                }
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                LOGGER.warn("Coordinator was interrupted", (Throwable)e);
            }
            catch (Exception e) {
                this.errorHandler.setProducerThrowable(e);
            }
        });
    }

    public void commitOffset(Map<String, ?> offset) {
        if (this.streamingSource != null) {
            this.streamingSource.commitOffset(offset);
        }
    }

    public synchronized void stop() throws InterruptedException {
        this.running = false;
        this.executor.shutdown();
        Thread.interrupted();
        boolean isShutdown = this.executor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        if (!isShutdown) {
            LOGGER.warn("Coordinator didn't stop in the expected time, shutting down executor now");
            this.executor.shutdownNow();
            this.executor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private class ChangeEventSourceContextImpl
    implements ChangeEventSource.ChangeEventSourceContext {
        private ChangeEventSourceContextImpl() {
        }

        @Override
        public boolean isRunning() {
            return ChangeEventSourceCoordinator.this.running;
        }
    }
}

