/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline;

import io.debezium.DebeziumException;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigurationDefaults;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.signal.actions.SignalActionProvider;
import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.schema.DatabaseSchema;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import io.debezium.util.Metronome;
import io.debezium.util.Threads;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class ChangeEventSourceCoordinator<P extends Partition, O extends OffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventSourceCoordinator.class);
    public static final Duration SHUTDOWN_WAIT_TIMEOUT = Duration.ofSeconds(90L);
    protected final Offsets<P, O> previousOffsets;
    protected final ErrorHandler errorHandler;
    protected final ChangeEventSourceFactory<P, O> changeEventSourceFactory;
    protected final ChangeEventSourceMetricsFactory<P> changeEventSourceMetricsFactory;
    protected final SnapshotterService snapshotterService;
    protected final ExecutorService executor;
    private final ExecutorService blockingSnapshotExecutor;
    protected final EventDispatcher<P, ?> eventDispatcher;
    protected final DatabaseSchema<?> schema;
    protected final SignalProcessor<P, O> signalProcessor;
    protected final NotificationService<P, O> notificationService;
    protected final CommonConnectorConfig connectorConfig;
    private volatile boolean running;
    private volatile boolean paused;
    private volatile boolean streaming;
    protected volatile StreamingChangeEventSource<P, O> streamingSource;
    protected final ReentrantLock commitOffsetLock = new ReentrantLock();
    protected SnapshotChangeEventSourceMetrics<P> snapshotMetrics;
    protected StreamingChangeEventSourceMetrics<P> streamingMetrics;
    private ChangeEventSource.ChangeEventSourceContext context;
    private SnapshotChangeEventSource<P, O> snapshotSource;
    private AtomicReference<LoggingContext.PreviousContext> previousLogContext;
    private CdcSourceTaskContext taskContext;

    public ChangeEventSourceCoordinator(Offsets<P, O> previousOffsets, ErrorHandler errorHandler, Class<? extends SourceConnector> connectorType, CommonConnectorConfig connectorConfig, ChangeEventSourceFactory<P, O> changeEventSourceFactory, ChangeEventSourceMetricsFactory<P> changeEventSourceMetricsFactory, EventDispatcher<P, ?> eventDispatcher, DatabaseSchema<?> schema, SignalProcessor<P, O> signalProcessor, NotificationService<P, O> notificationService, SnapshotterService snapshotterService) {
        this.previousOffsets = previousOffsets;
        this.errorHandler = errorHandler;
        this.changeEventSourceFactory = changeEventSourceFactory;
        this.changeEventSourceMetricsFactory = changeEventSourceMetricsFactory;
        this.snapshotterService = snapshotterService;
        this.executor = Threads.newSingleThreadExecutor(connectorType, connectorConfig.getLogicalName(), "change-event-source-coordinator");
        this.blockingSnapshotExecutor = Threads.newSingleThreadExecutor(connectorType, connectorConfig.getLogicalName(), "blocking-snapshot");
        this.eventDispatcher = eventDispatcher;
        this.schema = schema;
        this.signalProcessor = signalProcessor;
        this.notificationService = notificationService;
        this.connectorConfig = connectorConfig;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueueMetrics changeEventQueueMetrics, EventMetadataProvider metadataProvider) {
        this.previousLogContext = new AtomicReference();
        try {
            this.taskContext = taskContext;
            this.snapshotMetrics = this.changeEventSourceMetricsFactory.getSnapshotMetrics(taskContext, changeEventQueueMetrics, metadataProvider);
            this.streamingMetrics = this.changeEventSourceMetricsFactory.getStreamingMetrics(taskContext, changeEventQueueMetrics, metadataProvider);
            this.running = true;
            this.executor.submit(() -> {
                try {
                    this.previousLogContext.set(taskContext.configureLoggingContext("snapshot"));
                    this.snapshotMetrics.register();
                    this.streamingMetrics.register();
                    LOGGER.info("Metrics registered");
                    this.context = new ChangeEventSourceContextImpl();
                    LOGGER.info("Context created");
                    this.snapshotSource = this.changeEventSourceFactory.getSnapshotChangeEventSource(this.snapshotMetrics, this.notificationService);
                    this.executeChangeEventSources(taskContext, this.snapshotSource, this.previousOffsets, this.previousLogContext, this.context);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LOGGER.warn("Change event source executor was interrupted", (Throwable)e);
                }
                catch (Throwable e) {
                    this.errorHandler.setProducerThrowable(e);
                }
                finally {
                    this.streamingConnected(false);
                }
            });
        }
        finally {
            if (this.previousLogContext.get() != null) {
                this.previousLogContext.get().restore();
            }
        }
    }

    protected void registerSignalActionsAndStartProcessor(SignalProcessor<P, O> signalProcessor, EventDispatcher<P, ? extends DataCollectionId> dispatcher, ChangeEventSourceCoordinator<P, ?> changeEventSourceCoordinator, CommonConnectorConfig connectorConfig) {
        List actionProviders = StreamSupport.stream(ServiceLoader.load(SignalActionProvider.class).spliterator(), false).collect(Collectors.toList());
        actionProviders.stream().map(provider -> provider.createActions(dispatcher, changeEventSourceCoordinator, connectorConfig)).flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)).forEach(signalProcessor::registerSignalAction);
        signalProcessor.start();
    }

    public Optional<SignalProcessor<P, O>> getSignalProcessor(Offsets<P, O> previousOffset) {
        return previousOffset == null || previousOffset.getOffsets().size() == 1 ? Optional.ofNullable(this.signalProcessor) : Optional.empty();
    }

    protected void executeChangeEventSources(CdcSourceTaskContext taskContext, SnapshotChangeEventSource<P, O> snapshotSource, Offsets<P, O> previousOffsets, AtomicReference<LoggingContext.PreviousContext> previousLogContext, ChangeEventSource.ChangeEventSourceContext context) throws InterruptedException {
        P partition = previousOffsets.getTheOnlyPartition();
        O previousOffset = previousOffsets.getTheOnlyOffset();
        previousLogContext.set(taskContext.configureLoggingContext("snapshot", (Partition)partition));
        SnapshotResult snapshotResult = this.doSnapshot(snapshotSource, context, partition, previousOffset);
        this.getSignalProcessor(previousOffsets).ifPresent(s -> s.setContext(snapshotResult.getOffset()));
        LOGGER.debug("Snapshot result {}", snapshotResult);
        if (this.running && snapshotResult.isCompletedOrSkipped()) {
            if (snapshotResult.isCompleted()) {
                this.delayStreamingIfNeeded(context);
            }
            previousLogContext.set(taskContext.configureLoggingContext("streaming", (Partition)partition));
            this.streamEvents(context, partition, snapshotResult.getOffset());
        }
    }

    protected void delayStreamingIfNeeded(ChangeEventSource.ChangeEventSourceContext context) throws InterruptedException {
        if (this.snapshotterService != null && !this.snapshotterService.getSnapshotter().shouldStream()) {
            return;
        }
        Duration streamingDelay = this.connectorConfig.getStreamingDelay();
        if (streamingDelay.isZero() || streamingDelay.isNegative()) {
            return;
        }
        Threads.Timer timer = Threads.timer(Clock.SYSTEM, streamingDelay);
        Metronome metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM);
        while (!timer.expired()) {
            if (!context.isRunning()) {
                throw new InterruptedException("Interrupted while awaiting streaming delay");
            }
            LOGGER.info("The connector will wait for {}s before initiating streaming", (Object)timer.remaining().getSeconds());
            metronome.pause();
        }
    }

    public void doBlockingSnapshot(P partition, OffsetContext offsetContext, SnapshotConfiguration snapshotConfiguration) {
        this.blockingSnapshotExecutor.submit(() -> {
            this.previousLogContext.set(this.taskContext.configureLoggingContext("streaming", (Partition)partition));
            this.paused = true;
            this.streaming = true;
            try {
                this.context.waitStreamingPaused();
                this.previousLogContext.set(this.taskContext.configureLoggingContext("snapshot"));
                LOGGER.info("Starting snapshot");
                SnapshottingTask snapshottingTask = this.snapshotSource.getBlockingSnapshottingTask((Partition)partition, offsetContext, snapshotConfiguration);
                try {
                    this.doSnapshot(this.snapshotSource, this.context, partition, offsetContext, snapshottingTask);
                }
                catch (Exception e) {
                    LOGGER.warn("Error while executing requested blocking snapshot.", (Throwable)e);
                }
                finally {
                    this.eventDispatcher.setEventListener(this.streamingMetrics);
                    this.resumeStreaming(partition);
                }
            }
            catch (InterruptedException e) {
                throw new DebeziumException("Blocking snapshot has been interrupted");
            }
        });
    }

    private void resumeStreaming(P partition) throws InterruptedException {
        this.previousLogContext.set(this.taskContext.configureLoggingContext("streaming", (Partition)partition));
        this.paused = false;
        this.context.resumeStreaming();
    }

    protected SnapshotResult<O> doSnapshot(SnapshotChangeEventSource<P, O> snapshotSource, ChangeEventSource.ChangeEventSourceContext context, P partition, O previousOffset) throws InterruptedException {
        SnapshottingTask snapshottingTask = snapshotSource.getSnapshottingTask(partition, previousOffset);
        return this.doSnapshot(snapshotSource, context, partition, previousOffset, snapshottingTask);
    }

    protected SnapshotResult<O> doSnapshot(SnapshotChangeEventSource<P, O> snapshotSource, ChangeEventSource.ChangeEventSourceContext context, P partition, O previousOffset, SnapshottingTask snapshottingTask) throws InterruptedException {
        CatchUpStreamingResult catchUpStreamingResult = this.executeCatchUpStreaming(context, snapshotSource, partition, previousOffset);
        if (catchUpStreamingResult.performedCatchUpStreaming) {
            this.streamingConnected(false);
            this.commitOffsetLock.lock();
            this.streamingSource = null;
            this.commitOffsetLock.unlock();
        }
        this.eventDispatcher.setEventListener(this.snapshotMetrics);
        SnapshotResult<O> snapshotResult = snapshotSource.execute(context, partition, previousOffset, snapshottingTask);
        LOGGER.info("Snapshot ended with {}", snapshotResult);
        if (snapshotResult.getStatus() == SnapshotResult.SnapshotResultStatus.COMPLETED || this.schema.tableInformationComplete()) {
            this.schema.assureNonEmptySchema();
        }
        return snapshotResult;
    }

    protected CatchUpStreamingResult executeCatchUpStreaming(ChangeEventSource.ChangeEventSourceContext context, SnapshotChangeEventSource<P, O> snapshotSource, P partition, O previousOffset) throws InterruptedException {
        return new CatchUpStreamingResult(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void streamEvents(ChangeEventSource.ChangeEventSourceContext context, P partition, O offsetContext) throws InterruptedException {
        try {
            this.initStreamEvents(partition, offsetContext);
            this.getSignalProcessor(this.previousOffsets).ifPresent(signalProcessor -> this.registerSignalActionsAndStartProcessor((SignalProcessor<P, O>)signalProcessor, (EventDispatcher<P, ? extends DataCollectionId>)this.eventDispatcher, this, this.connectorConfig));
            if (this.snapshotterService != null && !this.snapshotterService.getSnapshotter().shouldStream()) {
                LOGGER.info("Streaming is disabled for snapshot mode {}", (Object)this.snapshotterService.getSnapshotter().name());
                return;
            }
            LOGGER.info("Starting streaming");
            this.streamingSource.execute(context, partition, offsetContext);
            LOGGER.info("Finished streaming");
        }
        finally {
            if (this.streamingSource != null) {
                this.streamingSource.close();
            }
        }
    }

    protected void initStreamEvents(P partition, O offsetContext) throws InterruptedException {
        this.streamingSource = this.changeEventSourceFactory.getStreamingChangeEventSource();
        this.eventDispatcher.setEventListener(this.streamingMetrics);
        this.streamingConnected(true);
        this.streamingSource.init(offsetContext);
        this.getSignalProcessor(this.previousOffsets).ifPresent(s -> s.setContext(this.streamingSource.getOffsetContext()));
        Optional incrementalSnapshotChangeEventSource = this.changeEventSourceFactory.getIncrementalSnapshotChangeEventSource(offsetContext, this.snapshotMetrics, this.snapshotMetrics, this.notificationService);
        this.eventDispatcher.setIncrementalSnapshotChangeEventSource(incrementalSnapshotChangeEventSource);
        incrementalSnapshotChangeEventSource.ifPresent(x -> x.init(partition, (OffsetContext)offsetContext));
    }

    public void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
        try {
            if (!this.commitOffsetLock.isLocked() && this.streamingSource != null && offset != null) {
                this.streamingSource.commitOffset(partition, offset);
            }
        }
        catch (Throwable e) {
            this.errorHandler.setProducerThrowable(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void stop() throws InterruptedException {
        this.running = false;
        try {
            Optional<SignalProcessor<P, O>> processor;
            Thread.interrupted();
            this.executor.shutdown();
            this.blockingSnapshotExecutor.shutdown();
            boolean isShutdown = this.executor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            boolean isBlockingSnapshotShutdown = this.blockingSnapshotExecutor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            if (!isShutdown) {
                LOGGER.warn("Coordinator didn't stop in the expected time, shutting down executor now");
                Thread.interrupted();
                this.executor.shutdownNow();
                this.executor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            }
            if (!isBlockingSnapshotShutdown) {
                LOGGER.warn("Coordinator didn't stop in the expected time, shutting down blocking snapshot executor now");
                Thread.interrupted();
                this.blockingSnapshotExecutor.shutdownNow();
                this.blockingSnapshotExecutor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            }
            if (this.streamingSource != null) {
                this.streamingSource.close();
            }
            if ((processor = this.getSignalProcessor(this.previousOffsets)).isPresent()) {
                processor.get().stop();
            }
            if (this.notificationService != null) {
                this.notificationService.stop();
            }
            this.eventDispatcher.close();
            this.connectorConfig.getServiceRegistry().close();
        }
        finally {
            this.snapshotMetrics.unregister();
            this.streamingMetrics.unregister();
        }
    }

    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    protected void streamingConnected(boolean status) {
        if (this.changeEventSourceMetricsFactory.connectionMetricHandledByCoordinator()) {
            this.streamingMetrics.connected(status);
            LOGGER.info("Connected metrics set to '{}'", (Object)status);
        }
    }

    protected class CatchUpStreamingResult {
        public boolean performedCatchUpStreaming;

        public CatchUpStreamingResult(boolean performedCatchUpStreaming) {
            this.performedCatchUpStreaming = performedCatchUpStreaming;
        }
    }

    public class ChangeEventSourceContextImpl
    implements ChangeEventSource.ChangeEventSourceContext {
        private static final Duration PAUSE_BETWEEN_HEARTBEAT_CALLBACKS = Duration.ofSeconds(1L);
        private final Lock lock = new ReentrantLock();
        private final Condition snapshotFinished = this.lock.newCondition();
        private final Condition streamingPaused = this.lock.newCondition();

        @Override
        public boolean isPaused() {
            return ChangeEventSourceCoordinator.this.paused;
        }

        @Override
        public boolean isRunning() {
            return ChangeEventSourceCoordinator.this.running;
        }

        @Override
        public void resumeStreaming() {
            this.lock.lock();
            try {
                this.snapshotFinished.signalAll();
                LOGGER.trace("Streaming will now resume.");
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public void waitSnapshotCompletion() throws InterruptedException {
            this.waitSnapshotCompletion(() -> {});
        }

        @Override
        public void waitSnapshotCompletion(Runnable heartbeatCallback) throws InterruptedException {
            this.lock.lock();
            try {
                while (ChangeEventSourceCoordinator.this.paused) {
                    LOGGER.trace("Waiting for snapshot to be completed.");
                    if (this.snapshotFinished.await(PAUSE_BETWEEN_HEARTBEAT_CALLBACKS.toNanos(), TimeUnit.NANOSECONDS)) continue;
                    heartbeatCallback.run();
                }
                ChangeEventSourceCoordinator.this.streaming = true;
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public void streamingPaused() {
            this.lock.lock();
            try {
                LOGGER.trace("Streaming paused. Blocking snapshot can now start.");
                ChangeEventSourceCoordinator.this.streaming = false;
                this.streamingPaused.signalAll();
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public void waitStreamingPaused() throws InterruptedException {
            this.lock.lock();
            try {
                while (ChangeEventSourceCoordinator.this.streaming) {
                    LOGGER.trace("Requested a blocking snapshot. Waiting for streaming to be paused.");
                    this.streamingPaused.await();
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }
}

