/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.rds.stream;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import io.micrometer.core.instrument.Counter;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginConfigObserver;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.resync.CascadingActionDetector;
import org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientLifecycleListener;
import org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientWrapper;
import org.opensearch.dataprepper.plugins.source.rds.stream.BinlogEventListener;
import org.opensearch.dataprepper.plugins.source.rds.stream.LogicalReplicationClient;
import org.opensearch.dataprepper.plugins.source.rds.stream.LogicalReplicationEventProcessor;
import org.opensearch.dataprepper.plugins.source.rds.stream.ReplicationLogClient;
import org.opensearch.dataprepper.plugins.source.rds.stream.ReplicationLogClientFactory;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamCheckpointer;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamWorkerTaskRefresher
implements PluginConfigObserver<RdsSourceConfig> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamWorkerTaskRefresher.class);
    static final String CREDENTIALS_CHANGED = "credentialsChanged";
    static final String TASK_REFRESH_ERRORS = "streamWorkerTaskRefreshErrors";
    private final EnhancedSourceCoordinator sourceCoordinator;
    private final StreamPartition streamPartition;
    private final StreamCheckpointer streamCheckpointer;
    private final String s3Prefix;
    private final ReplicationLogClientFactory replicationLogClientFactory;
    private final Buffer<Record<Event>> buffer;
    private final Supplier<ExecutorService> executorServiceSupplier;
    private final PluginMetrics pluginMetrics;
    private final AcknowledgementSetManager acknowledgementSetManager;
    private final Counter credentialsChangeCounter;
    private final Counter taskRefreshErrorsCounter;
    private ExecutorService executorService;
    private RdsSourceConfig currentSourceConfig;
    private StreamWorker streamWorker;

    public StreamWorkerTaskRefresher(EnhancedSourceCoordinator sourceCoordinator, StreamPartition streamPartition, StreamCheckpointer streamCheckpointer, String s3Prefix, ReplicationLogClientFactory replicationLogClientFactory, Buffer<Record<Event>> buffer, Supplier<ExecutorService> executorServiceSupplier, AcknowledgementSetManager acknowledgementSetManager, PluginMetrics pluginMetrics) {
        this.sourceCoordinator = sourceCoordinator;
        this.streamPartition = streamPartition;
        this.streamCheckpointer = streamCheckpointer;
        this.s3Prefix = s3Prefix;
        this.buffer = buffer;
        this.executorServiceSupplier = executorServiceSupplier;
        this.executorService = executorServiceSupplier.get();
        this.pluginMetrics = pluginMetrics;
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.replicationLogClientFactory = replicationLogClientFactory;
        this.credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED);
        this.taskRefreshErrorsCounter = pluginMetrics.counter(TASK_REFRESH_ERRORS);
    }

    public static StreamWorkerTaskRefresher create(EnhancedSourceCoordinator sourceCoordinator, StreamPartition streamPartition, StreamCheckpointer streamCheckpointer, String s3Prefix, ReplicationLogClientFactory binlogClientFactory, Buffer<Record<Event>> buffer, Supplier<ExecutorService> executorServiceSupplier, AcknowledgementSetManager acknowledgementSetManager, PluginMetrics pluginMetrics) {
        return new StreamWorkerTaskRefresher(sourceCoordinator, streamPartition, streamCheckpointer, s3Prefix, binlogClientFactory, buffer, executorServiceSupplier, acknowledgementSetManager, pluginMetrics);
    }

    public void initialize(RdsSourceConfig sourceConfig) {
        this.currentSourceConfig = sourceConfig;
        this.refreshTask(sourceConfig);
    }

    public void update(RdsSourceConfig sourceConfig) {
        if (this.basicAuthChanged(sourceConfig.getAuthenticationConfig())) {
            LOG.info("Database credentials were updated. Refreshing stream worker...");
            this.credentialsChangeCounter.increment();
            try {
                this.streamWorker.shutdown();
                this.executorService.shutdownNow();
                this.executorService = this.executorServiceSupplier.get();
                this.replicationLogClientFactory.updateCredentials(sourceConfig);
                this.refreshTask(sourceConfig);
                this.currentSourceConfig = sourceConfig;
            }
            catch (Exception e) {
                this.taskRefreshErrorsCounter.increment();
                LOG.error("Refreshing stream worker failed", (Throwable)e);
            }
        }
        LOG.debug("Database credentials were not changed. Skipping...");
    }

    public void shutdown() {
        if (this.streamWorker != null) {
            this.streamWorker.shutdown();
        }
        LOG.info("Stream worker stopped.");
        this.executorService.shutdownNow();
    }

    private void refreshTask(RdsSourceConfig sourceConfig) {
        DbTableMetadata dbTableMetadata = this.getDBTableMetadata(this.streamPartition);
        CascadingActionDetector cascadeActionDetector = new CascadingActionDetector(this.sourceCoordinator);
        ReplicationLogClient replicationLogClient = this.replicationLogClientFactory.create(this.streamPartition);
        if (sourceConfig.getEngine().isMySql()) {
            BinaryLogClient binaryLogClient = ((BinlogClientWrapper)replicationLogClient).getBinlogClient();
            binaryLogClient.registerEventListener((BinaryLogClient.EventListener)BinlogEventListener.create(this.streamPartition, this.buffer, sourceConfig, this.s3Prefix, this.pluginMetrics, binaryLogClient, this.streamCheckpointer, this.acknowledgementSetManager, dbTableMetadata, cascadeActionDetector));
            binaryLogClient.registerLifecycleListener((BinaryLogClient.LifecycleListener)new BinlogClientLifecycleListener());
        } else {
            LogicalReplicationClient logicalReplicationClient = (LogicalReplicationClient)replicationLogClient;
            logicalReplicationClient.setEventProcessor(LogicalReplicationEventProcessor.create(this.streamPartition, sourceConfig, this.buffer, this.s3Prefix, this.pluginMetrics, logicalReplicationClient, this.streamCheckpointer, this.acknowledgementSetManager));
        }
        this.streamWorker = StreamWorker.create(this.sourceCoordinator, replicationLogClient, this.pluginMetrics);
        this.executorService.submit(() -> this.streamWorker.processStream(this.streamPartition));
    }

    private boolean basicAuthChanged(RdsSourceConfig.AuthenticationConfig newAuthConfig) {
        RdsSourceConfig.AuthenticationConfig currentAuthConfig = this.currentSourceConfig.getAuthenticationConfig();
        return !Objects.equals(currentAuthConfig.getUsername(), newAuthConfig.getUsername()) || !Objects.equals(currentAuthConfig.getPassword(), newAuthConfig.getPassword());
    }

    private DbTableMetadata getDBTableMetadata(StreamPartition streamPartition) {
        String dbIdentifier = streamPartition.getPartitionKey();
        Optional globalStatePartition = this.sourceCoordinator.getPartition(dbIdentifier);
        GlobalState globalState = (GlobalState)((Object)globalStatePartition.get());
        return DbTableMetadata.fromMap(globalState.getProgressState().get());
    }
}

