/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.task;

import io.debezium.connector.spanner.SpannerConnectorConfig;
import io.debezium.connector.spanner.SpannerConnectorTask;
import io.debezium.connector.spanner.db.metadata.SchemaRegistry;
import io.debezium.connector.spanner.db.stream.ChangeStream;
import io.debezium.connector.spanner.kafka.KafkaAdminClientFactory;
import io.debezium.connector.spanner.kafka.internal.KafkaConsumerAdminService;
import io.debezium.connector.spanner.kafka.internal.ProducerFactory;
import io.debezium.connector.spanner.kafka.internal.RebalancingConsumerFactory;
import io.debezium.connector.spanner.kafka.internal.RebalancingEventListener;
import io.debezium.connector.spanner.kafka.internal.SyncEventConsumerFactory;
import io.debezium.connector.spanner.kafka.internal.TaskSyncEventListener;
import io.debezium.connector.spanner.kafka.internal.TaskSyncPublisher;
import io.debezium.connector.spanner.kafka.internal.model.RebalanceEventMetadata;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.processor.SpannerEventDispatcher;
import io.debezium.connector.spanner.task.LoggerUtils;
import io.debezium.connector.spanner.task.LowWatermarkCalculationJob;
import io.debezium.connector.spanner.task.LowWatermarkCalculator;
import io.debezium.connector.spanner.task.LowWatermarkHolder;
import io.debezium.connector.spanner.task.PartitionFactory;
import io.debezium.connector.spanner.task.PartitionOffsetProvider;
import io.debezium.connector.spanner.task.RebalanceHandler;
import io.debezium.connector.spanner.task.SyncEventHandler;
import io.debezium.connector.spanner.task.TaskStateChangeEventHandler;
import io.debezium.connector.spanner.task.TaskStateChangeEventProcessor;
import io.debezium.connector.spanner.task.TaskSyncContext;
import io.debezium.connector.spanner.task.TaskSyncContextHolder;
import io.debezium.connector.spanner.task.leader.LeaderAction;
import io.debezium.connector.spanner.task.leader.LeaderService;
import io.debezium.connector.spanner.task.leader.LowWatermarkStampPublisher;
import io.debezium.connector.spanner.task.leader.rebalancer.LeaderRebalanceStrategy;
import io.debezium.connector.spanner.task.leader.rebalancer.TaskPartitionEqualSharingRebalancer;
import io.debezium.connector.spanner.task.leader.rebalancer.TaskPartitionGreedyLeaderRebalancer;
import io.debezium.connector.spanner.task.leader.rebalancer.TaskPartitionRebalancer;
import io.debezium.connector.spanner.task.state.TaskStateChangeEvent;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.ErrorHandler;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SynchronizationTaskContext {
    private static final Logger LOGGER = LoggerFactory.getLogger(SynchronizationTaskContext.class);
    private final LeaderRebalanceStrategy leaderRebalanceStrategy = LeaderRebalanceStrategy.EQUAL_SHARING;
    private final SyncEventConsumerFactory<String, byte[]> syncEventConsumerFactory;
    private final RebalancingConsumerFactory<?, ?> rebalancingConsumerFactory;
    private final ProducerFactory<String, byte[]> producerFactory;
    private final LeaderAction leaderAction;
    private final RebalancingEventListener rebalancingEventListener;
    private final TaskSyncEventListener taskSyncEventListener;
    private final TaskSyncPublisher taskSyncPublisher;
    private final TaskSyncContextHolder taskSyncContextHolder;
    private final TaskStateChangeEventHandler taskStateChangeEventHandler;
    private final ErrorHandler errorHandler;
    private final PartitionFactory partitionFactory;
    private final LowWatermarkStampPublisher lowWatermarkStampPublisher;
    private final Runnable finishingHandler;
    private final TaskStateChangeEventProcessor taskStateChangeEventProcessor;
    private final SyncEventHandler syncEventHandler;
    private final RebalanceHandler rebalanceHandler;
    private final LowWatermarkCalculationJob lowWatermarkCalculationJob;
    private final SchemaRegistry schemaRegistry;
    private final SpannerConnectorTask task;
    private final SpannerConnectorConfig connectorConfig;

    public SynchronizationTaskContext(SpannerConnectorTask task, SpannerConnectorConfig connectorConfig, ErrorHandler errorHandler, PartitionOffsetProvider partitionOffsetProvider, ChangeStream changeStream, SpannerEventDispatcher spannerEventDispatcher, KafkaAdminClientFactory adminClientFactory, SchemaRegistry schemaRegistry, Runnable finishingHandler, MetricsEventPublisher metricsEventPublisher, LowWatermarkHolder lowWatermarkHolder) {
        String rebalancingTopic = connectorConfig.rebalancingTopic();
        String taskSyncTopic = connectorConfig.taskSyncTopic();
        String connectorName = connectorConfig.getConnectorName();
        this.task = task;
        this.connectorConfig = connectorConfig;
        this.errorHandler = errorHandler;
        this.finishingHandler = finishingHandler;
        this.schemaRegistry = schemaRegistry;
        this.syncEventConsumerFactory = new SyncEventConsumerFactory(connectorConfig, false);
        this.rebalancingConsumerFactory = new RebalancingConsumerFactory(connectorConfig);
        this.producerFactory = new ProducerFactory(connectorConfig);
        this.taskSyncContextHolder = new TaskSyncContextHolder(metricsEventPublisher);
        this.taskSyncPublisher = new TaskSyncPublisher(task.getTaskUid(), taskSyncTopic, connectorConfig.syncEventPublisherWaitingTimeout(), this.producerFactory, this.taskSyncContextHolder, this::onError);
        KafkaConsumerAdminService kafkaAdminService = new KafkaConsumerAdminService(adminClientFactory.getAdminClient(), connectorName);
        this.partitionFactory = new PartitionFactory(partitionOffsetProvider, metricsEventPublisher);
        LeaderService leaderService = new LeaderService(this.taskSyncContextHolder, connectorConfig, (BlockingConsumer<TaskStateChangeEvent>)((BlockingConsumer)this::publishEvent), errorHandler, this.partitionFactory, metricsEventPublisher);
        this.lowWatermarkStampPublisher = new LowWatermarkStampPublisher(connectorConfig, spannerEventDispatcher, this::onError, this.taskSyncContextHolder);
        TaskPartitionRebalancer taskPartitionRebalancer = this.leaderRebalanceStrategy.equals((Object)LeaderRebalanceStrategy.EQUAL_SHARING) ? new TaskPartitionEqualSharingRebalancer() : new TaskPartitionGreedyLeaderRebalancer();
        this.leaderAction = new LeaderAction(this.taskSyncContextHolder, kafkaAdminService, leaderService, taskPartitionRebalancer, this.taskSyncPublisher, this::onError);
        this.taskSyncEventListener = new TaskSyncEventListener(task.getTaskUid(), taskSyncTopic, this.syncEventConsumerFactory, true, this::onError);
        this.rebalancingEventListener = new RebalancingEventListener(task, connectorName, rebalancingTopic, connectorConfig.rebalancingTaskWaitingTimeout(), this.rebalancingConsumerFactory, this::onError);
        this.taskStateChangeEventHandler = new TaskStateChangeEventHandler(this.taskSyncContextHolder, this.taskSyncPublisher, changeStream, this.partitionFactory, spannerEventDispatcher, this::onFinish, connectorConfig, this::onError);
        this.rebalanceHandler = new RebalanceHandler(this.taskSyncContextHolder, this.taskSyncPublisher, this.leaderAction, this.lowWatermarkStampPublisher);
        this.syncEventHandler = new SyncEventHandler(this.taskSyncContextHolder, this.taskSyncPublisher, (BlockingConsumer<TaskStateChangeEvent>)((BlockingConsumer)this::publishEvent));
        LowWatermarkCalculator lowWatermarkCalculator = new LowWatermarkCalculator(connectorConfig, this.taskSyncContextHolder, partitionOffsetProvider);
        this.lowWatermarkCalculationJob = new LowWatermarkCalculationJob(connectorConfig, this::onError, lowWatermarkCalculator, lowWatermarkHolder, task.getTaskUid());
        this.taskStateChangeEventProcessor = new TaskStateChangeEventProcessor(connectorConfig.taskStateChangeEventQueueCapacity(), this.taskSyncContextHolder, this.taskStateChangeEventHandler, this::onError, metricsEventPublisher);
    }

    public synchronized void init() {
        try {
            this.taskSyncContextHolder.init(TaskSyncContext.getInitialContext(this.task.getTaskUid(), this.connectorConfig));
            this.rebalanceHandler.init();
            this.taskSyncEventListener.subscribe(this.syncEventHandler::updateCurrentOffset);
            this.taskSyncEventListener.subscribe(this.syncEventHandler::process);
            this.taskSyncEventListener.subscribe(this.syncEventHandler::processPreviousStates);
            this.taskSyncEventListener.start();
            Duration awaitTimeout = this.connectorConfig.awaitInitializationTimeout();
            this.taskSyncContextHolder.awaitInitialization(awaitTimeout);
            LOGGER.info("{}, connecting to the rebalance topic", (Object)this.task.getTaskUid());
            this.rebalancingEventListener.listen((BlockingConsumer<RebalanceEventMetadata>)((BlockingConsumer)metadata -> this.rebalanceHandler.process(metadata.isLeader(), metadata.getConsumerId(), metadata.getRebalanceGenerationId())));
            LOGGER.info("{}, Start Low Watermark Calculation Job", (Object)this.task.getTaskUid());
            this.lowWatermarkCalculationJob.start();
            LOGGER.info("{}, Init Schema Registry", (Object)this.task.getTaskUid());
            try {
                this.schemaRegistry.init(this.task.getTaskUid());
            }
            catch (Exception e) {
                LOGGER.error("{}, Init Schema Registry failure", (Object)this.task.getTaskUid());
                throw e;
            }
            LOGGER.info("{}, Start Processing Task State Change Event Processor", (Object)this.task.getTaskUid());
            this.taskStateChangeEventProcessor.startProcessing();
            LOGGER.info("{}, TaskSyncContextHolder update initialized", (Object)this.task.getTaskUid());
            this.taskSyncContextHolder.update(context -> context.toBuilder().initialized(true).build());
            LOGGER.info("{}, Finished updating TaskSyncContextHolder", (Object)this.task.getTaskUid());
        }
        catch (InterruptedException ex) {
            LOGGER.error("Interrupted exception during SynchronizationTaskContext starting", (Throwable)ex);
            this.onError(ex);
        }
        catch (Exception ex) {
            LOGGER.error("Exception during SynchronizationTaskContext starting", (Throwable)ex);
            this.onError(ex);
        }
    }

    public void destroy() {
        try {
            try {
                this.rebalancingEventListener.shutdown();
            }
            catch (Exception e) {
                LOGGER.error("Task {}, exception during rebalancing event listener shutdown", (Throwable)e);
                throw e;
            }
            LOGGER.info("Task {}, Shut down rebalancingEventListener", (Object)this.taskSyncContextHolder.get().getTaskUid());
            this.taskSyncEventListener.shutdown();
            LOGGER.info("Task {}, Shut down TaskSyncEventListener", (Object)this.taskSyncContextHolder.get().getTaskUid());
            this.taskSyncPublisher.close();
            LOGGER.info("Task {}, Shut down TaskSyncPublisher", (Object)this.taskSyncContextHolder.get().getTaskUid());
            this.taskStateChangeEventProcessor.stopProcessing();
            LOGGER.info("Task {}, Shut down TaskStateChangeEventProcessor", (Object)this.taskSyncContextHolder.get().getTaskUid());
            this.lowWatermarkCalculationJob.stop();
            LOGGER.info("Task {}, Shut down LowWatermarkCalculationJob", (Object)this.taskSyncContextHolder.get().getTaskUid());
            this.rebalanceHandler.destroy();
            LOGGER.info("Task {}, Shut down rebalance handler", (Object)this.taskSyncContextHolder.get().getTaskUid());
        }
        catch (Exception ex) {
            LOGGER.warn("Task {}, Exception during sync context destroying", (Object)this.taskSyncContextHolder.get().getTaskUid(), (Object)ex);
        }
        finally {
            LOGGER.info("Task {}, SynchronizationTaskContext end", (Object)this.taskSyncContextHolder.get().getTaskUid());
        }
    }

    public void publishEvent(TaskStateChangeEvent event) throws InterruptedException {
        LoggerUtils.debug(LOGGER, "publishEvent: type: {}, event: {}", event.getClass().getSimpleName(), event);
        this.taskStateChangeEventProcessor.processEvent(event);
    }

    private void onError(Throwable throwable) {
        LOGGER.info("Task {}, enqueueing error in task", (Object)this.taskSyncContextHolder.get().getTaskUid(), (Object)throwable);
        this.errorHandler.setProducerThrowable(throwable);
    }

    private void onFinish() {
        this.finishingHandler.run();
    }
}

