/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.oceanbase.source;

import com.oceanbase.clogproxy.client.LogProxyClient;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.config.ObReaderConfig;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.listener.RecordListener;
import com.oceanbase.clogproxy.client.util.ClientUtil;
import com.oceanbase.oms.logmessage.DataMessage;
import com.oceanbase.oms.logmessage.LogMessage;
import io.debezium.connector.SnapshotRecord;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.relational.history.TableChanges;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.oceanbase.source.config.OceanBaseConnectorConfig;
import org.apache.flink.cdc.connectors.oceanbase.source.connection.OceanBaseConnection;
import org.apache.flink.cdc.connectors.oceanbase.source.offset.OceanBaseSourceInfo;
import org.apache.flink.cdc.connectors.oceanbase.source.schema.OceanBaseDatabaseSchema;
import org.apache.flink.cdc.connectors.oceanbase.source.schema.OceanBaseSchema;
import org.apache.flink.cdc.connectors.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OceanBaseRichSourceFunction<T>
extends RichSourceFunction<T>
implements CheckpointListener,
CheckpointedFunction,
ResultTypeQueryable<T> {
    private static final long serialVersionUID = 2844054619864617340L;
    private static final Logger LOG = LoggerFactory.getLogger(OceanBaseRichSourceFunction.class);
    private final StartupOptions startupOptions;
    private final String username;
    private final String password;
    private final String tenantName;
    private final String databaseName;
    private final String tableName;
    private final String tableList;
    private final String serverTimeZone;
    private final Duration connectTimeout;
    private final String hostname;
    private final Integer port;
    private final String compatibleMode;
    private final String jdbcDriver;
    private final Properties jdbcProperties;
    private final String logProxyHost;
    private final Integer logProxyPort;
    private final String logProxyClientId;
    private final ObReaderConfig obReaderConfig;
    private final Properties debeziumProperties;
    private final DebeziumDeserializationSchema<T> deserializer;
    private final List<SourceRecord> changeRecordBuffer = new LinkedList<SourceRecord>();
    private transient OceanBaseConnectorConfig connectorConfig;
    private transient OceanBaseSourceInfo sourceInfo;
    private transient Set<TableId> tableSet;
    private transient OceanBaseSchema obSchema;
    private transient OceanBaseDatabaseSchema databaseSchema;
    private volatile transient long resolvedTimestamp;
    private volatile transient Exception logProxyClientException;
    private volatile transient OceanBaseConnection snapshotConnection;
    private transient LogProxyClient logProxyClient;
    private transient ListState<Long> offsetState;
    private transient OutputCollector<T> outputCollector;

    public OceanBaseRichSourceFunction(StartupOptions startupOptions, String username, String password, String tenantName, String databaseName, String tableName, String tableList, String serverTimeZone, Duration connectTimeout, String hostname, Integer port, String compatibleMode, String jdbcDriver, Properties jdbcProperties, String logProxyHost, Integer logProxyPort, String logProxyClientId, ObReaderConfig obReaderConfig, Properties debeziumProperties, DebeziumDeserializationSchema<T> deserializer) {
        this.startupOptions = (StartupOptions)Preconditions.checkNotNull((Object)startupOptions);
        this.username = (String)Preconditions.checkNotNull((Object)username);
        this.password = (String)Preconditions.checkNotNull((Object)password);
        this.tenantName = tenantName;
        this.databaseName = databaseName;
        this.tableName = tableName;
        this.tableList = tableList;
        this.serverTimeZone = (String)Preconditions.checkNotNull((Object)serverTimeZone);
        this.connectTimeout = (Duration)Preconditions.checkNotNull((Object)connectTimeout);
        this.hostname = (String)Preconditions.checkNotNull((Object)hostname);
        this.port = (Integer)Preconditions.checkNotNull((Object)port);
        this.compatibleMode = (String)Preconditions.checkNotNull((Object)compatibleMode);
        this.jdbcDriver = (String)Preconditions.checkNotNull((Object)jdbcDriver);
        this.jdbcProperties = jdbcProperties;
        this.logProxyHost = logProxyHost;
        this.logProxyPort = logProxyPort;
        this.logProxyClientId = logProxyClientId;
        this.obReaderConfig = obReaderConfig;
        this.debeziumProperties = debeziumProperties;
        this.deserializer = (DebeziumDeserializationSchema)Preconditions.checkNotNull(deserializer);
    }

    public void open(Configuration config) throws Exception {
        super.open(config);
        this.outputCollector = new OutputCollector();
        this.connectorConfig = new OceanBaseConnectorConfig(this.compatibleMode, this.serverTimeZone, this.debeziumProperties);
        this.sourceInfo = new OceanBaseSourceInfo(this.connectorConfig, this.tenantName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
        ((OutputCollector)this.outputCollector).context = ctx;
        try {
            LOG.info("Start to initial table whitelist");
            this.initTableWhiteList();
            if (this.resolvedTimestamp <= 0L && !this.startupOptions.isStreamOnly()) {
                this.sourceInfo.setSnapshot(SnapshotRecord.TRUE);
                long startTimestamp = this.getSnapshotConnection().getCurrentTimestampS();
                LOG.info("Snapshot reading started from timestamp: {}", (Object)startTimestamp);
                this.readSnapshotRecords();
                this.sourceInfo.setSnapshot(SnapshotRecord.FALSE);
                LOG.info("Snapshot reading finished");
                this.resolvedTimestamp = startTimestamp;
            } else {
                LOG.info("Snapshot reading skipped");
            }
            if (!this.startupOptions.isSnapshotOnly()) {
                this.sourceInfo.setSnapshot(SnapshotRecord.FALSE);
                LOG.info("Change events reading started");
                this.readChangeRecords();
            }
        }
        finally {
            this.cancel();
        }
    }

    private OceanBaseConnection getSnapshotConnection() {
        if (this.snapshotConnection == null) {
            this.snapshotConnection = new OceanBaseConnection(this.hostname, this.port, this.username, this.password, this.connectTimeout, this.compatibleMode, this.jdbcDriver, this.jdbcProperties, ((Object)((Object)this)).getClass().getClassLoader());
        }
        return this.snapshotConnection;
    }

    private void closeSnapshotConnection() {
        if (this.snapshotConnection != null) {
            try {
                this.snapshotConnection.close();
            }
            catch (SQLException e) {
                LOG.error("Failed to close snapshotConnection", (Throwable)e);
            }
            this.snapshotConnection = null;
        }
    }

    private TableId tableId(String databaseName, String tableName) {
        if ("mysql".equalsIgnoreCase(this.compatibleMode)) {
            return new TableId(databaseName, null, tableName);
        }
        return new TableId(null, databaseName, tableName);
    }

    private void initTableWhiteList() {
        if (this.tableSet != null && !this.tableSet.isEmpty()) {
            return;
        }
        HashSet<TableId> localTableSet = new HashSet<TableId>();
        if (StringUtils.isNotBlank(this.tableList)) {
            for (String s : this.tableList.split(",")) {
                if (!StringUtils.isNotBlank(s)) continue;
                String[] arr = s.split("\\.");
                TableId tableId2 = this.tableId(arr[0].trim(), arr[1].trim());
                localTableSet.add(tableId2);
            }
        }
        if (StringUtils.isNotBlank(this.databaseName) && StringUtils.isNotBlank(this.tableName)) {
            try {
                List<TableId> tableIds = this.getSnapshotConnection().getTables(this.databaseName, this.tableName);
                LOG.info("Pattern matched tables: {}", tableIds);
                localTableSet.addAll(tableIds);
            }
            catch (SQLException e) {
                LOG.error(String.format("Query table list by 'databaseName' %s and 'tableName' %s failed.", this.databaseName, this.tableName), (Throwable)e);
                throw new FlinkRuntimeException((Throwable)e);
            }
        }
        if (localTableSet.isEmpty()) {
            throw new FlinkRuntimeException("No valid table found");
        }
        LOG.info("Table list: {}", localTableSet);
        this.tableSet = localTableSet;
        if (this.obReaderConfig != null) {
            this.obReaderConfig.setTableWhiteList(localTableSet.stream().map(tableId -> String.format("%s.%s", this.tenantName, tableId.toString())).collect(Collectors.joining("|")));
        }
    }

    private TableSchema getTableSchema(TableId tableId) {
        TableSchema tableSchema;
        if (this.databaseSchema == null) {
            this.databaseSchema = new OceanBaseDatabaseSchema(this.connectorConfig, t -> this.tableSet.contains(t), false);
        }
        if ((tableSchema = this.databaseSchema.schemaFor(tableId)) != null) {
            return tableSchema;
        }
        if (this.obSchema == null) {
            this.obSchema = new OceanBaseSchema();
        }
        TableChanges.TableChange tableChange = this.obSchema.getTableSchema(this.getSnapshotConnection(), tableId);
        this.databaseSchema.refresh(tableChange.getTable());
        return this.databaseSchema.schemaFor(tableId);
    }

    protected void readSnapshotRecords() {
        this.tableSet.forEach(this::readSnapshotRecordsByTable);
    }

    private void readSnapshotRecordsByTable(TableId tableId) {
        String fullName = this.getSnapshotConnection().quotedTableIdString(tableId);
        this.sourceInfo.tableEvent(tableId);
        try (OceanBaseConnection connection = this.getSnapshotConnection();){
            LOG.info("Start to read snapshot from {}", (Object)connection.quotedTableIdString(tableId));
            connection.query("SELECT * FROM " + fullName, rs -> {
                TableSchema tableSchema = this.getTableSchema(tableId);
                List<Field> fields = tableSchema.valueSchema().fields();
                while (rs.next()) {
                    Object[] fieldValues = new Object[fields.size()];
                    for (Field field : fields) {
                        fieldValues[field.index()] = rs.getObject(field.name());
                    }
                    Struct value = tableSchema.valueFromColumnData(fieldValues);
                    Instant now = Instant.now();
                    Struct struct = tableSchema.getEnvelopeSchema().read(value, this.sourceInfo.struct(), now);
                    try {
                        this.deserializer.deserialize(new SourceRecord(null, null, tableId.identifier(), null, null, null, struct.schema(), struct), this.outputCollector);
                    }
                    catch (Exception e) {
                        LOG.error("Deserialize snapshot record failed ", (Throwable)e);
                        throw new FlinkRuntimeException((Throwable)e);
                    }
                }
            });
            LOG.info("Read snapshot from {} finished", (Object)fullName);
        }
        catch (SQLException e) {
            LOG.error("Read snapshot from table " + fullName + " failed", (Throwable)e);
            throw new FlinkRuntimeException((Throwable)e);
        }
    }

    protected void readChangeRecords() throws InterruptedException, TimeoutException {
        if (this.resolvedTimestamp > 0L) {
            this.obReaderConfig.updateCheckpoint(Long.toString(this.resolvedTimestamp));
            LOG.info("Restore from timestamp: {}", (Object)this.resolvedTimestamp);
        }
        ClientConf clientConf = ClientConf.builder().clientId(this.logProxyClientId != null ? this.logProxyClientId : String.format("%s_%s_%s", ClientUtil.generateClientId(), Thread.currentThread().getId(), this.tenantName)).maxReconnectTimes(0).connectTimeoutMs((int)this.connectTimeout.toMillis()).build();
        this.logProxyClient = new LogProxyClient(this.logProxyHost, this.logProxyPort, this.obReaderConfig, clientConf);
        final CountDownLatch latch = new CountDownLatch(1);
        this.logProxyClient.addListener(new RecordListener(){
            boolean started = false;

            @Override
            public void notify(LogMessage message) {
                switch (message.getOpt()) {
                    case HEARTBEAT: 
                    case BEGIN: {
                        if (this.started) break;
                        this.started = true;
                        latch.countDown();
                        break;
                    }
                    case INSERT: 
                    case UPDATE: 
                    case DELETE: {
                        SourceRecord record;
                        if (!this.started || (record = OceanBaseRichSourceFunction.this.getChangeRecord(message)) == null) break;
                        OceanBaseRichSourceFunction.this.changeRecordBuffer.add(record);
                        break;
                    }
                    case COMMIT: {
                        OceanBaseRichSourceFunction.this.changeRecordBuffer.forEach(r -> {
                            try {
                                OceanBaseRichSourceFunction.this.deserializer.deserialize((SourceRecord)r, OceanBaseRichSourceFunction.this.outputCollector);
                            }
                            catch (Exception e) {
                                throw new FlinkRuntimeException((Throwable)e);
                            }
                        });
                        OceanBaseRichSourceFunction.this.changeRecordBuffer.clear();
                        long timestamp = Long.parseLong(message.getSafeTimestamp());
                        if (timestamp <= OceanBaseRichSourceFunction.this.resolvedTimestamp) break;
                        OceanBaseRichSourceFunction.this.resolvedTimestamp = timestamp;
                        break;
                    }
                    case DDL: {
                        LOG.trace("Ddl: {}", (Object)message.getFieldList().get(0).getValue().toString());
                        break;
                    }
                    default: {
                        throw new UnsupportedOperationException("Unsupported type: " + (Object)((Object)message.getOpt()));
                    }
                }
            }

            @Override
            public void onException(LogProxyClientException e) {
                OceanBaseRichSourceFunction.this.logProxyClientException = e;
                OceanBaseRichSourceFunction.this.logProxyClient.stop();
            }
        });
        LOG.info("Try to start LogProxyClient with client id: {}, config: {}", (Object)clientConf.getClientId(), (Object)this.obReaderConfig);
        this.logProxyClient.start();
        if (!latch.await(this.connectTimeout.getSeconds(), TimeUnit.SECONDS)) {
            throw new TimeoutException("Timeout to receive log messages in LogProxyClient.RecordListener");
        }
        LOG.info("LogProxyClient started successfully");
        this.logProxyClient.join();
        if (this.logProxyClientException != null) {
            throw new RuntimeException("LogProxyClient exception", this.logProxyClientException);
        }
    }

    private SourceRecord getChangeRecord(LogMessage message) {
        Struct struct;
        String databaseName = message.getDbName().replace(this.tenantName + ".", "");
        TableId tableId = this.tableId(databaseName, message.getTableName());
        if (!this.tableSet.contains(tableId)) {
            return null;
        }
        this.sourceInfo.tableEvent(tableId);
        this.sourceInfo.setSourceTime(Instant.ofEpochSecond(Long.parseLong(message.getTimestamp())));
        Struct source = this.sourceInfo.struct();
        TableSchema tableSchema = this.getTableSchema(tableId);
        Schema valueSchema = tableSchema.valueSchema();
        List<Field> fields = valueSchema.fields();
        HashMap beforeValueMap = new HashMap();
        HashMap afterValueMap = new HashMap();
        message.getFieldList().forEach(field -> {
            if (field.isPrev()) {
                beforeValueMap.put(field.getFieldname(), this.getFieldValue((DataMessage.Record.Field)field));
            } else {
                afterValueMap.put(field.getFieldname(), this.getFieldValue((DataMessage.Record.Field)field));
            }
        });
        switch (message.getOpt()) {
            case INSERT: {
                Object[] afterFieldValues = new Object[fields.size()];
                for (Field field2 : fields) {
                    afterFieldValues[field2.index()] = afterValueMap.get(field2.name());
                }
                Struct after = tableSchema.valueFromColumnData(afterFieldValues);
                struct = tableSchema.getEnvelopeSchema().create(after, source, Instant.now());
                break;
            }
            case DELETE: {
                Object[] beforeFieldValues = new Object[fields.size()];
                for (Field field3 : fields) {
                    beforeFieldValues[field3.index()] = beforeValueMap.get(field3.name());
                }
                Struct before = tableSchema.valueFromColumnData(beforeFieldValues);
                struct = tableSchema.getEnvelopeSchema().delete(before, source, Instant.now());
                break;
            }
            case UPDATE: {
                Object[] beforeFieldValues = new Object[fields.size()];
                Object[] afterFieldValues = new Object[fields.size()];
                for (Field field4 : fields) {
                    beforeFieldValues[field4.index()] = beforeValueMap.get(field4.name());
                    afterFieldValues[field4.index()] = afterValueMap.get(field4.name());
                }
                Struct before = tableSchema.valueFromColumnData(beforeFieldValues);
                Struct after = tableSchema.valueFromColumnData(afterFieldValues);
                struct = tableSchema.getEnvelopeSchema().update(before, after, source, Instant.now());
                break;
            }
            default: {
                throw new UnsupportedOperationException();
            }
        }
        return new SourceRecord(null, null, tableId.identifier(), null, null, null, struct.schema(), struct);
    }

    private Object getFieldValue(DataMessage.Record.Field field) {
        if (field.getValue() == null) {
            return null;
        }
        String encoding = field.getEncoding();
        if ("binary".equalsIgnoreCase(encoding)) {
            return field.getValue().getBytes();
        }
        return field.getValue().toString(encoding);
    }

    public void notifyCheckpointComplete(long l) {
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        LOG.info("snapshotState checkpoint: {} at resolvedTimestamp: {}", (Object)context.getCheckpointId(), (Object)this.resolvedTimestamp);
        this.offsetState.clear();
        this.offsetState.add((Object)this.resolvedTimestamp);
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        Iterator iterator;
        LOG.info("initialize checkpoint");
        this.offsetState = context.getOperatorStateStore().getListState(new ListStateDescriptor("resolvedTimestampState", (TypeSerializer)LongSerializer.INSTANCE));
        if (context.isRestored() && (iterator = ((Iterable)this.offsetState.get()).iterator()).hasNext()) {
            Long offset = (Long)iterator.next();
            this.resolvedTimestamp = offset;
            LOG.info("Restore State from resolvedTimestamp: {}", (Object)this.resolvedTimestamp);
            return;
        }
    }

    public void cancel() {
        this.closeSnapshotConnection();
        if (this.logProxyClient != null) {
            this.logProxyClient.stop();
        }
    }

    private static class OutputCollector<T>
    implements Collector<T> {
        private SourceFunction.SourceContext<T> context;

        private OutputCollector() {
        }

        public void collect(T record) {
            this.context.collect(record);
        }

        public void close() {
        }
    }
}

