/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.postgres.source.enumerator;

import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus;
import org.apache.flink.cdc.connectors.base.source.assigner.SplitAssigner;
import org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitAssignedEvent;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitAckEvent;
import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitEvent;
import org.apache.flink.util.FlinkRuntimeException;

@Internal
public class PostgresSourceEnumerator
extends IncrementalSourceEnumerator {
    private final PostgresDialect postgresDialect;
    private final PostgresSourceConfig sourceConfig;
    private volatile boolean receiveOffsetCommitAck = false;

    public PostgresSourceEnumerator(SplitEnumeratorContext<SourceSplitBase> context, PostgresSourceConfig sourceConfig, SplitAssigner splitAssigner, PostgresDialect postgresDialect, Boundedness boundedness) {
        super(context, sourceConfig, splitAssigner, boundedness);
        this.postgresDialect = postgresDialect;
        this.sourceConfig = sourceConfig;
    }

    @Override
    public void start() {
        this.createSlotForGlobalStreamSplit();
        super.start();
    }

    @Override
    protected void assignSplits() {
        if (this.sourceConfig.isScanNewlyAddedTableEnabled() && this.streamSplitTaskId != null && !this.receiveOffsetCommitAck && AssignerStatus.isNewlyAddedAssigning(this.splitAssigner.getAssignerStatus())) {
            return;
        }
        super.assignSplits();
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        if (sourceEvent instanceof StreamSplitAssignedEvent && this.receiveOffsetCommitAck) {
            this.receiveOffsetCommitAck = false;
        }
        if (sourceEvent instanceof OffsetCommitAckEvent) {
            if (this.streamSplitTaskId == null || this.streamSplitTaskId != subtaskId) throw new RuntimeException("Receive SyncAssignStatusAck from wrong subtask");
            this.receiveOffsetCommitAck = true;
            return;
        } else {
            super.handleSourceEvent(subtaskId, sourceEvent);
        }
    }

    @Override
    protected void syncWithReaders(int[] subtaskIds, Throwable t) {
        super.syncWithReaders(subtaskIds, t);
        if (!this.receiveOffsetCommitAck && this.sourceConfig.isScanNewlyAddedTableEnabled() && this.streamSplitTaskId != null) {
            AssignerStatus assignerStatus = this.splitAssigner.getAssignerStatus();
            this.context.sendEventToSourceReader(this.streamSplitTaskId.intValue(), (SourceEvent)new OffsetCommitEvent(!AssignerStatus.isNewlyAddedAssigning(assignerStatus) && !AssignerStatus.isNewlyAddedAssigningSnapshotFinished(assignerStatus)));
        }
    }

    private void createSlotForGlobalStreamSplit() {
        try (PostgresConnection connection = this.postgresDialect.openJdbcConnection();){
            SlotState slotInfo = connection.getReplicationSlotState(this.postgresDialect.getSlotName(), this.postgresDialect.getPluginName());
            if (slotInfo != null) {
                return;
            }
            PostgresReplicationConnection replicationConnection = this.postgresDialect.openPostgresReplicationConnection(connection);
            replicationConnection.createReplicationSlot();
            replicationConnection.close(false);
        }
        catch (Throwable t) {
            throw new FlinkRuntimeException(String.format("Fail to get or create slot for global stream split, the slot name is %s. Due to: ", this.postgresDialect.getSlotName()), t);
        }
    }
}

