package com.alibaba.ververica.connectors.adb30.sink;

import com.alibaba.ververica.connectors.adb30.Adb30Options;
import com.alibaba.ververica.connectors.adb30.dialect.Adb30Dialect;
import com.alibaba.ververica.connectors.common.MetricUtils;
import com.alibaba.ververica.connectors.common.errorcode.ConnectorErrors;
import com.alibaba.ververica.connectors.common.exception.ConnectorException;
import com.alibaba.ververica.connectors.common.exception.ErrorUtils;
import com.alibaba.ververica.connectors.common.metrics.SimpleGauge;
import com.alibaba.ververica.connectors.common.sink.HasRetryTimeout;
import com.alibaba.ververica.connectors.common.sink.Syncable;
import com.alibaba.ververica.connectors.common.source.resolver.DirtyDataStrategy;
import com.alibaba.ververica.connectors.common.util.ConnectionPool;
import com.alibaba.ververica.connectors.jdbc.util.JdbcRowConverter;
import com.alibaba.ververica.connectors.jdbc.util.SQLExceptionSkipPolicy;
import com.alibaba.ververica.connectors.jdbc.util.TpsLimitUtils;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/adb30/sink/Adb30OutputFormat.class */
public class Adb30OutputFormat extends RichOutputFormat<RowData> implements Syncable, HasRetryTimeout {
    private static final Logger LOG = LoggerFactory.getLogger(Adb30OutputFormat.class);
    private static final ConnectionPool<BasicDataSource> DATASOURCE_POOL = new ConnectionPool<>();
    private final String dataSourceKey;
    private final int connectionMaxActive;
    private final long connectionMaxWait;
    private final String url;
    private final String tableName;
    private final String userName;
    private final String password;
    private final String[] fieldNames;
    private final LogicalType[] fieldTypes;
    private final String[] pkNames;
    private int[] pkIndices;
    private LogicalType[] pkTypes;
    private final List<String> exceptUpdateFields;
    private final int maxRetryTimes;
    private final int bufferSize;
    private final int batchSize;
    private final long flushIntervalMs;
    private final boolean ignoreDelete;
    private final boolean replaceMode;
    private transient Timer flusher;
    private final long maxSinkTps;
    private final DirtyDataStrategy dirtyDataStrategy;
    private Meter outTps;
    private Meter outBps;
    private Counter sinkSkipCounter;
    private SimpleGauge latencyGauge;
    private Counter deleteCounter;
    private final String insertStatement;
    private String upsertStatement;
    private String replaceStatement;
    private String deleteStatement;
    private final JdbcRowConverter rowConverter;
    private JdbcRowConverter pkConverter;
    protected final RowDataSerializer rowDataSerializer;
    private transient BasicDataSource dataSource = null;
    private transient Connection connection = null;
    private volatile transient Exception flushException = null;
    private volatile long lastFlushTime = 0;
    private final Map<RowData, RowData> mapReduceBuffer = new HashMap();
    private int numTasks = 1;
    private long currentCount = 0;
    private final Adb30Dialect adb30Dialect = new Adb30Dialect();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.alibaba.ververica.connectors.adb30.sink.Adb30OutputFormat$2, reason: invalid class name */
    /* loaded from: input_file:com/alibaba/ververica/connectors/adb30/sink/Adb30OutputFormat$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$types$RowKind = new int[RowKind.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_AFTER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.DELETE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public Adb30OutputFormat(String str, String str2, String str3, String str4, int i, int i2, int i3, long j, boolean z, List<String> list, boolean z2, TableSchema tableSchema, int i4, long j2, long j3, DirtyDataStrategy dirtyDataStrategy) {
        this.pkConverter = null;
        this.url = str;
        this.tableName = str2;
        this.userName = str3;
        this.password = str4;
        this.dataSourceKey = str + str3 + str4 + str2;
        this.fieldNames = tableSchema.getFieldNames();
        this.fieldTypes = (LogicalType[]) Arrays.asList(tableSchema.getFieldDataTypes()).stream().map((v0) -> {
            return v0.getLogicalType();
        }).toArray(i5 -> {
            return new LogicalType[i5];
        });
        this.pkNames = (String[]) tableSchema.getPrimaryKey().map(uniqueConstraint -> {
            return (String[]) uniqueConstraint.getColumns().toArray(new String[0]);
        }).orElse(null);
        this.maxRetryTimes = i;
        this.connectionMaxActive = i4;
        this.connectionMaxWait = j2;
        this.ignoreDelete = z;
        this.batchSize = i3;
        this.bufferSize = i2;
        this.flushIntervalMs = j;
        this.maxSinkTps = j3;
        this.replaceMode = z2;
        this.dirtyDataStrategy = dirtyDataStrategy;
        this.exceptUpdateFields = list;
        if (existsPrimaryKeys()) {
            list.addAll(Arrays.asList(this.pkNames));
            List asList = Arrays.asList(this.fieldNames);
            this.pkIndices = new int[this.pkNames.length];
            this.pkTypes = new LogicalType[this.pkNames.length];
            for (int i6 = 0; i6 < this.pkNames.length; i6++) {
                Preconditions.checkArgument(asList.contains(this.pkNames[i6]), "keyName %s can't find in fieldNames %s.", new Object[]{this.pkNames[i6], asList});
                int indexOf = asList.indexOf(this.pkNames[i6]);
                this.pkIndices[i6] = indexOf;
                this.pkTypes[i6] = this.fieldTypes[indexOf];
            }
            this.pkConverter = new JdbcRowConverter(this.pkTypes);
            this.upsertStatement = this.adb30Dialect.getUpsertStatement(str2, this.fieldNames, this.pkNames, (String[]) list.toArray(new String[0])).get();
            this.replaceStatement = this.adb30Dialect.getReplaceStatement(str2, this.fieldNames);
            this.deleteStatement = this.adb30Dialect.getDeleteStatement(str2, this.pkNames);
        }
        this.insertStatement = this.adb30Dialect.getInsertIntoStatement(str2, this.fieldNames);
        this.rowConverter = new JdbcRowConverter(this.fieldTypes);
        this.rowDataSerializer = new RowDataSerializer(this.fieldTypes);
    }

    protected void scheduleFlusher() {
        this.flusher = new Timer(String.format("%s sink buffer flusher", Adb30Options.CONNECTOR_TYPE));
        this.flusher.schedule(new TimerTask() { // from class: com.alibaba.ververica.connectors.adb30.sink.Adb30OutputFormat.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    if (System.currentTimeMillis() - Adb30OutputFormat.this.lastFlushTime >= Adb30OutputFormat.this.flushIntervalMs) {
                        Adb30OutputFormat.this.sync();
                    }
                } catch (Exception e) {
                    Adb30OutputFormat.LOG.error(String.format("%s sink flush buffer to db failed.", Adb30Options.CONNECTOR_TYPE), e);
                    Adb30OutputFormat.this.flushException = e;
                }
            }
        }, this.flushIntervalMs, this.flushIntervalMs);
    }

    @Override // com.alibaba.ververica.connectors.common.sink.HasRetryTimeout
    public long getRetryTimeout() {
        return 0L;
    }

    public void configure(Configuration configuration) {
    }

    public void open(int i, int i2) throws IOException {
        LOG.info("Adb30 output format using url=" + this.url + ", tableName=" + this.tableName + ", maxRetryTimes=" + this.maxRetryTimes + ", bufferSize=" + this.bufferSize + ", batchSize=" + this.batchSize + ", connectionMaxActive=" + this.connectionMaxActive + ", connectionMaxWait=" + this.connectionMaxWait + ", flushIntervalMs=" + this.flushIntervalMs + ", excludeUpdateColumns=" + this.exceptUpdateFields + ", ignoreDelete=" + this.ignoreDelete + ", replaceMode=" + this.replaceMode);
        this.numTasks = i2;
        synchronized (Adb30OutputFormat.class) {
            if (DATASOURCE_POOL.contains(this.dataSourceKey)) {
                this.dataSource = DATASOURCE_POOL.get(this.dataSourceKey);
            } else {
                this.dataSource = new BasicDataSource();
                this.dataSource.setUrl(this.url);
                this.dataSource.setUsername(this.userName);
                this.dataSource.setPassword(this.password);
                this.dataSource.setDriverClassName(Adb30Options.DRIVER_CLASS);
                this.dataSource.setMaxActive(this.connectionMaxActive);
                this.dataSource.setMaxIdle(this.connectionMaxActive);
                this.dataSource.setMaxWait(this.connectionMaxWait);
                this.dataSource.setInitialSize(1);
                this.dataSource.setMinIdle(0);
                this.dataSource.setPoolPreparedStatements(false);
                this.dataSource.setValidationQuery("select 1");
                this.dataSource.setTestWhileIdle(true);
                this.dataSource.setTestOnBorrow(false);
                this.dataSource.setTestOnReturn(false);
                this.dataSource.setTimeBetweenEvictionRunsMillis(180000L);
                this.dataSource.setMinEvictableIdleTimeMillis(3600000L);
                this.dataSource.setNumTestsPerEvictionRun(10);
                this.dataSource.setRemoveAbandoned(true);
                this.dataSource.setRemoveAbandonedTimeout(300);
                DATASOURCE_POOL.put(this.dataSourceKey, this.dataSource);
            }
            try {
                long currentTimeMillis = System.currentTimeMillis();
                tryEstablishNewConnection();
                LOG.info("Get Connection in Open Method " + (System.currentTimeMillis() - currentTimeMillis));
            } catch (Exception e) {
                LOG.error("Error when establish new connection", e);
                ErrorUtils.throwException(ConnectorErrors.INST.rdsGetConnectionError(Adb30Options.CONNECTOR_TYPE), e);
            }
        }
        if (existsPrimaryKeys()) {
            scheduleFlusher();
        }
        this.outTps = MetricUtils.registerNumRecordsOutRate(getRuntimeContext());
        this.outBps = MetricUtils.registerNumBytesOutRate(getRuntimeContext(), Adb30Options.CONNECTOR_TYPE);
        this.latencyGauge = MetricUtils.registerCurrentSendTime(getRuntimeContext());
        this.sinkSkipCounter = MetricUtils.registerNumRecordsOutErrors(getRuntimeContext());
        this.deleteCounter = MetricUtils.registerSinkDeleteCounter(getRuntimeContext());
    }

    public void writeRecord(RowData rowData) throws IOException {
        if (this.flushException != null) {
            throw new RuntimeException(this.flushException);
        }
        if (rowData.getRowKind() == RowKind.DELETE) {
            if (this.ignoreDelete) {
                return;
            } else {
                this.deleteCounter.inc();
            }
        }
        if (!existsPrimaryKeys()) {
            if (rowData.getRowKind() == RowKind.INSERT) {
                executeSql(this.insertStatement, Collections.singletonList(rowData), this.rowConverter);
                return;
            } else {
                if (rowData.getRowKind() != RowKind.DELETE) {
                    throw new UnsupportedOperationException("Only INSERT and DELETE record are supported if keys are not defined");
                }
                eagerDeleteRecord(rowData);
                return;
            }
        }
        this.currentCount++;
        synchronized (this.mapReduceBuffer) {
            RowData copy = this.rowDataSerializer.copy(rowData);
            this.mapReduceBuffer.put(getPrimaryKey(copy), copy);
        }
        if (this.currentCount >= this.bufferSize) {
            sync();
        }
    }

    private void executeSql(String str, List<RowData> list, JdbcRowConverter jdbcRowConverter) {
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= this.maxRetryTimes) {
                break;
            }
            PreparedStatement preparedStatement = null;
            try {
                preparedStatement = this.connection.prepareStatement(str);
                Iterator<RowData> it = list.iterator();
                while (it.hasNext()) {
                    jdbcRowConverter.toExternal(it.next(), preparedStatement);
                    preparedStatement.addBatch();
                }
                preparedStatement.executeBatch();
                TpsLimitUtils.limitTps(this.maxSinkTps, this.numTasks, currentTimeMillis, list.size());
                if (preparedStatement != null) {
                    try {
                        preparedStatement.close();
                    } catch (Exception e) {
                        LOG.debug("close statement error", e);
                    }
                }
            } catch (SQLException e2) {
                try {
                    LOG.error(String.format("Insert into db error, sql: %s, retryTimes: %d", str, Integer.valueOf(i)), e2);
                    if (i == this.maxRetryTimes) {
                        ConnectorException exception = ErrorUtils.getException(ConnectorErrors.INST.rdsWriteError(Adb30Options.CONNECTOR_TYPE, str), e2);
                        if (SQLExceptionSkipPolicy.judge(this.dirtyDataStrategy, e2.getErrorCode(), exception)) {
                            this.sinkSkipCounter.inc();
                            LOG.error(exception.getErrorMessage() + " sql:" + str);
                        }
                    }
                    try {
                        if (!this.connection.isValid(10)) {
                            tryEstablishNewConnection();
                        }
                        try {
                            Thread.sleep(Math.min(1000 * i, Adb30Options.MAX_RETRY_SLEEP_TIME));
                        } catch (Exception e3) {
                        }
                        if (preparedStatement != null) {
                            try {
                                preparedStatement.close();
                            } catch (Exception e4) {
                                LOG.debug("close statement error", e4);
                            }
                        }
                    } catch (SQLException e5) {
                        LOG.error("JDBC connection is not valid, and reestablish connection failed", e5);
                        throw new RuntimeException("Reestablish JDBC connection failed", e5);
                    }
                } catch (Throwable th) {
                    if (preparedStatement != null) {
                        try {
                            preparedStatement.close();
                        } catch (Exception e6) {
                            LOG.debug("close statement error", e6);
                        }
                    }
                    throw th;
                }
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        if (this.latencyGauge != null) {
            this.latencyGauge.report(currentTimeMillis2 - currentTimeMillis, list.size());
        }
        if (this.outTps != null) {
            this.outTps.markEvent(list.size());
        }
        if (this.outBps != null) {
            this.outBps.markEvent(list.size() * str.length() * 2);
        }
    }

    private void tryEstablishNewConnection() throws SQLException {
        Preconditions.checkNotNull(this.dataSource);
        if (this.connection != null) {
            this.connection.close();
        }
        this.connection = this.dataSource.getConnection();
    }

    @Override // com.alibaba.ververica.connectors.common.sink.Syncable
    public synchronized void sync() throws IOException {
        if (existsPrimaryKeys()) {
            synchronized (this.mapReduceBuffer) {
                List<RowData> arrayList = new ArrayList<>();
                List<RowData> arrayList2 = new ArrayList<>();
                for (Map.Entry<RowData, RowData> entry : this.mapReduceBuffer.entrySet()) {
                    switch (AnonymousClass2.$SwitchMap$org$apache$flink$types$RowKind[entry.getValue().getRowKind().ordinal()]) {
                        case 1:
                        case 2:
                            arrayList.add(entry.getValue());
                            break;
                        case 3:
                            arrayList2.add(entry.getKey());
                            break;
                        default:
                            throw new RuntimeException("Not supported row kind " + entry.getValue().getRowKind());
                    }
                }
                batchAdd(arrayList);
                batchDelete(arrayList2);
                this.mapReduceBuffer.clear();
            }
            this.lastFlushTime = System.currentTimeMillis();
            this.currentCount = 0L;
        }
    }

    private void batchAdd(List<RowData> list) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("BatchAdd Size {}.", Integer.valueOf(list.size()));
        }
        batchExecute(this.replaceMode ? this.replaceStatement : this.upsertStatement, list, this.rowConverter);
    }

    private void batchDelete(List<RowData> list) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("BatchDelete Size {}.", Integer.valueOf(list.size()));
        }
        batchExecute(this.deleteStatement, list, this.pkConverter);
    }

    private void batchExecute(String str, List<RowData> list, JdbcRowConverter jdbcRowConverter) {
        Preconditions.checkState(existsPrimaryKeys());
        if (list == null || list.isEmpty()) {
            return;
        }
        int i = 0;
        int i2 = this.batchSize;
        while (true) {
            int i3 = i2;
            if (i3 >= list.size()) {
                break;
            }
            executeSql(str, list.subList(i, i3), jdbcRowConverter);
            i = i3;
            i2 = i3 + this.batchSize;
        }
        if (i != list.size()) {
            executeSql(str, list.subList(i, list.size()), jdbcRowConverter);
        }
    }

    private void eagerDeleteRecord(RowData rowData) throws IOException {
        HashSet hashSet = new HashSet();
        for (int i = 0; i < rowData.getArity(); i++) {
            if (rowData.isNullAt(i)) {
                hashSet.add(Integer.valueOf(i));
            }
        }
        String deleteStatementWithNull = this.adb30Dialect.getDeleteStatementWithNull(this.tableName, this.fieldNames, hashSet);
        if (hashSet.size() <= 0) {
            executeSql(deleteStatementWithNull, Collections.singletonList(rowData), this.rowConverter);
            return;
        }
        LogicalType[] logicalTypeArr = new LogicalType[rowData.getArity() - hashSet.size()];
        GenericRowData genericRowData = new GenericRowData(rowData.getArity() - hashSet.size());
        int i2 = 0;
        for (int i3 = 0; i3 < rowData.getArity(); i3++) {
            if (!hashSet.contains(Integer.valueOf(i3))) {
                logicalTypeArr[i2] = this.fieldTypes[i3];
                genericRowData.setField(i2, RowData.createFieldGetter(logicalTypeArr[i2], i3).getFieldOrNull(rowData));
                i2++;
            }
        }
        executeSql(deleteStatementWithNull, Collections.singletonList(genericRowData), new JdbcRowConverter(logicalTypeArr));
    }

    public void close() throws IOException {
        if (this.flusher != null) {
            this.flusher.cancel();
            this.flusher = null;
        }
        sync();
        try {
            if (this.connection != null && !this.connection.isClosed()) {
                this.connection.close();
            }
        } catch (Exception e) {
        }
        synchronized (Adb30OutputFormat.class) {
            if (DATASOURCE_POOL.remove(this.dataSourceKey) && !this.dataSource.isClosed()) {
                try {
                    this.dataSource.close();
                } catch (SQLException e2) {
                    LOG.error("", e2);
                }
                this.dataSource = null;
            }
        }
    }

    private boolean existsPrimaryKeys() {
        return this.pkNames != null && this.pkNames.length > 0;
    }

    private RowData getPrimaryKey(RowData rowData) {
        GenericRowData genericRowData = new GenericRowData(this.pkIndices.length);
        for (int i = 0; i < this.pkIndices.length; i++) {
            genericRowData.setField(i, safeGet(rowData, this.pkIndices[i], this.pkTypes[i]));
        }
        return genericRowData;
    }

    protected Object safeGet(RowData rowData, int i, LogicalType logicalType) {
        if (rowData == null || rowData.isNullAt(i)) {
            return null;
        }
        return RowData.createFieldGetter(logicalType, i).getFieldOrNull(rowData);
    }
}
