/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.postgres.source.reader;

import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.relational.TableId;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;

public class PostgresPipelineRecordEmitter<T>
extends IncrementalSourceRecordEmitter<T> {
    private final PostgresSourceConfig sourceConfig;
    private final PostgresDialect postgresDialect;
    private Set<TableId> alreadySendCreateTableTables;
    private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
    private boolean isBounded = false;
    private final List<CreateTableEvent> createTableEventCache = new ArrayList<CreateTableEvent>();

    public PostgresPipelineRecordEmitter(DebeziumDeserializationSchema debeziumDeserializationSchema, SourceReaderMetrics sourceReaderMetrics, PostgresSourceConfig sourceConfig, OffsetFactory offsetFactory, PostgresDialect postgresDialect) {
        super(debeziumDeserializationSchema, sourceReaderMetrics, sourceConfig.isIncludeSchemaChanges(), offsetFactory);
        this.sourceConfig = sourceConfig;
        this.postgresDialect = postgresDialect;
        this.alreadySendCreateTableTables = new HashSet<TableId>();
        this.generateCreateTableEvent(sourceConfig);
        this.isBounded = StartupOptions.snapshot().equals(sourceConfig.getStartupOptions());
    }

    @Override
    protected void processElement(SourceRecord element, SourceOutput<T> output, SourceSplitState splitState) throws Exception {
        TableId tableId;
        if (this.shouldEmitAllCreateTableEventsInSnapshotMode && this.isBounded) {
            for (CreateTableEvent createTableEvent : this.createTableEventCache) {
                output.collect((Object)createTableEvent);
            }
            this.shouldEmitAllCreateTableEventsInSnapshotMode = false;
        } else if (WatermarkEvent.isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
            TableId tableId2 = splitState.asSnapshotSplitState().toSourceSplit().getTableId();
            if (!this.alreadySendCreateTableTables.contains(tableId2)) {
                try (PostgresConnection jdbc = this.postgresDialect.openJdbcConnection();){
                    this.sendCreateTableEvent(jdbc, tableId2, output);
                    this.alreadySendCreateTableTables.add(tableId2);
                }
            }
        } else if ((SourceRecordUtils.isDataChangeRecord(element) || SourceRecordUtils.isSchemaChangeEvent(element)) && !this.alreadySendCreateTableTables.contains(tableId = SourceRecordUtils.getTableId(element))) {
            for (CreateTableEvent createTableEvent : this.createTableEventCache) {
                if (createTableEvent == null) continue;
                output.collect((Object)createTableEvent);
            }
            this.alreadySendCreateTableTables.add(tableId);
        }
        super.processElement(element, output, splitState);
    }

    private void sendCreateTableEvent(PostgresConnection jdbc, TableId tableId, SourceOutput<Event> output) {
        Schema schema = PostgresSchemaUtils.getTableSchema(tableId, this.sourceConfig, jdbc);
        output.collect((Object)new CreateTableEvent(org.apache.flink.cdc.common.event.TableId.tableId((String)tableId.schema(), (String)tableId.table()), schema));
    }

    private void generateCreateTableEvent(PostgresSourceConfig sourceConfig) {
        try (PostgresConnection jdbc = this.postgresDialect.openJdbcConnection();){
            List<TableId> capturedTableIds = TableDiscoveryUtils.listTables(sourceConfig.getDatabaseList().get(0), jdbc, sourceConfig.getTableFilters(), sourceConfig.includePartitionedTables());
            for (TableId tableId : capturedTableIds) {
                Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc);
                this.createTableEventCache.add(new CreateTableEvent(org.apache.flink.cdc.common.event.TableId.tableId((String)tableId.schema(), (String)tableId.table()), schema));
            }
        }
        catch (SQLException e) {
            throw new RuntimeException("Cannot start emitter to fetch table schema.", e);
        }
    }
}

