package com.alibaba.ververica.connectors.jdbc.dim;

import com.alibaba.ververica.connectors.adb30.Adb30Options;
import com.alibaba.ververica.connectors.common.dim.cache.CacheStrategy;
import com.alibaba.ververica.connectors.common.dim.cache.ListAllCache;
import com.alibaba.ververica.connectors.common.dim.reload.CacheAllReloadConf;
import com.alibaba.ververica.connectors.common.dim.reload.SerializableRunnable;
import com.alibaba.ververica.connectors.common.util.DateUtil;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/jdbc/dim/JdbcAllCacheRowFetcherBase.class */
public abstract class JdbcAllCacheRowFetcherBase extends JdbcRowFetcherBase {
    private static final long serialVersionUID = 6941593589582551911L;
    private static final Logger LOG = LoggerFactory.getLogger(JdbcAllCacheRowFetcherBase.class);
    private final String scanTemplate;

    /* loaded from: input_file:com/alibaba/ververica/connectors/jdbc/dim/JdbcAllCacheRowFetcherBase$ReloadCacheRunner.class */
    private class ReloadCacheRunner extends SerializableRunnable {
        private ReloadCacheRunner() {
        }

        @Override // com.alibaba.ververica.connectors.common.dim.reload.SerializableRunnable, java.lang.Runnable
        public void run() {
            int i = 0;
            while (true) {
                int i2 = i;
                i++;
                if (i2 >= JdbcAllCacheRowFetcherBase.this.maxRetries) {
                    return;
                }
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    if (DateUtil.isTimeInRange(JdbcAllCacheRowFetcherBase.this.reloadConf.timeRangeBlackList, currentTimeMillis)) {
                        if (JdbcAllCacheRowFetcherBase.this.allCacheHandler.isLoaded()) {
                            JdbcAllCacheRowFetcherBase.LOG.info("Current time {} is in reload black list, so try to reload one2oneCache next time.", Long.valueOf(currentTimeMillis));
                            JdbcAllCacheRowFetcherBase.this.closePooledConnection();
                            return;
                        }
                        JdbcAllCacheRowFetcherBase.LOG.info("Current time {} is in reload black list, but this is the first time to load one2oneCache, so still load.", Long.valueOf(currentTimeMillis));
                    }
                    JdbcAllCacheRowFetcherBase.this.createConnection();
                    JdbcAllCacheRowFetcherBase.LOG.info("Reloading all data from MySQL '{}' ...", JdbcAllCacheRowFetcherBase.this.sqlTableName);
                    JdbcAllCacheRowFetcherBase.this.allCacheHandler.initialize();
                    long nanoTime = System.nanoTime();
                    PreparedStatement prepareStatement = JdbcAllCacheRowFetcherBase.this.connection.prepareStatement(JdbcAllCacheRowFetcherBase.this.scanTemplate);
                    ResultSet executeQuery = prepareStatement.executeQuery();
                    int i3 = 0;
                    while (executeQuery.next()) {
                        i3++;
                        JdbcAllCacheRowFetcherBase.this.updateCache(executeQuery);
                    }
                    executeQuery.close();
                    prepareStatement.close();
                    JdbcAllCacheRowFetcherBase.LOG.info("Loaded {} rows from MySQL '{}' into cache, used {} ms.", new Object[]{Integer.valueOf(i3), JdbcAllCacheRowFetcherBase.this.sqlTableName, Long.valueOf((System.nanoTime() - nanoTime) / 1000000)});
                    JdbcAllCacheRowFetcherBase.this.allCacheHandler.switchCache();
                    JdbcAllCacheRowFetcherBase.this.closePooledConnection();
                    return;
                } catch (Throwable th) {
                    try {
                        RuntimeException runtimeException = new RuntimeException("Error happens in reload thread.", th);
                        if ((th instanceof InterruptedException) || (th instanceof OutOfMemoryError)) {
                            JdbcAllCacheRowFetcherBase.LOG.error("Error happens when scanning all data from MySQL.", runtimeException);
                            JdbcAllCacheRowFetcherBase.this.allCacheHandler.setException(runtimeException);
                        } else if (i < JdbcAllCacheRowFetcherBase.this.maxRetries) {
                            JdbcAllCacheRowFetcherBase.LOG.warn("Error happens when scanning all data from MySQL, try for the {} time.", Integer.valueOf(i), runtimeException);
                            try {
                                if (!JdbcAllCacheRowFetcherBase.this.isClosed) {
                                    Thread.sleep(Math.min(1000 * i, Adb30Options.MAX_RETRY_SLEEP_TIME));
                                }
                            } catch (InterruptedException e) {
                                JdbcAllCacheRowFetcherBase.LOG.error("Cache reload thread is interrupted", e);
                            }
                        } else {
                            JdbcAllCacheRowFetcherBase.LOG.error("Error happens when scanning all data from MySQL.", runtimeException);
                            JdbcAllCacheRowFetcherBase.this.allCacheHandler.setException(runtimeException);
                        }
                        JdbcAllCacheRowFetcherBase.this.closePooledConnection();
                    } catch (Throwable th2) {
                        JdbcAllCacheRowFetcherBase.this.closePooledConnection();
                        throw th2;
                    }
                }
            }
        }
    }

    public JdbcAllCacheRowFetcherBase(String str, TableSchema tableSchema, String[] strArr, CacheAllReloadConf cacheAllReloadConf, int i, int i2, String str2) {
        super(str, tableSchema, strArr, CacheStrategy.all(cacheAllReloadConf.ttlMs), i, i2, str2);
        setAllCacheReloadRunner(new ReloadCacheRunner(), cacheAllReloadConf);
        this.scanTemplate = str2;
        LOG.info("scanTemplate is : {}", str2);
    }

    @Override // com.alibaba.ververica.connectors.jdbc.dim.JdbcRowFetcherBase
    public void flatMap(RowData rowData, Collector<RowData> collector) throws Exception {
        Object sourceKey = getSourceKey(rowData);
        if (sourceKey == null) {
            LOG.debug("Join {} on an empty key of row: {}", getConnectorType(), rowData);
            return;
        }
        while (!this.allCacheHandler.isLoadedOrThrowException()) {
            Thread.sleep(10L);
        }
        this.allCacheHandler.lock.readLock().lock();
        try {
            List list = (List) this.allCacheHandler.get(sourceKey);
            this.allCacheHandler.lock.readLock().unlock();
            if (list != null) {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    collector.collect((RowData) it.next());
                }
            }
        } catch (Throwable th) {
            this.allCacheHandler.lock.readLock().unlock();
            throw th;
        }
    }

    protected void closePooledConnection() {
    }

    protected void updateCache(ResultSet resultSet) throws Exception {
        RowData internal = this.jdbcRowConverter.toInternal(resultSet);
        Object prepareCacheKey = prepareCacheKey(internal);
        if (this.hasPrimaryKey) {
            this.allCacheHandler.put(prepareCacheKey, Collections.singletonList(internal));
        } else {
            ((ListAllCache) this.allCacheHandler).append(prepareCacheKey, Collections.singletonList(internal));
        }
    }

    @Override // com.alibaba.ververica.connectors.jdbc.dim.JdbcRowFetcherBase
    public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
        flatMap((RowData) obj, (Collector<RowData>) collector);
    }
}
