package com.alibaba.ververica.connectors.common.dim;

import com.alibaba.ververica.connectors.common.dim.cache.AllCache;
import com.alibaba.ververica.connectors.common.dim.cache.Cache;
import com.alibaba.ververica.connectors.common.dim.cache.CacheFactory;
import com.alibaba.ververica.connectors.common.dim.cache.CacheStrategy;
import com.alibaba.ververica.connectors.common.dim.reload.CacheAllReloadConf;
import com.alibaba.ververica.connectors.common.dim.reload.SerializableRunnable;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.base.Preconditions;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/common/dim/DimJoinFetcher.class */
public abstract class DimJoinFetcher extends AbstractRichFunction {
    private static final Logger LOG = LoggerFactory.getLogger(DimJoinFetcher.class);
    static AtomicInteger counter = new AtomicInteger(0);
    protected final String sqlTableName;
    protected final String[] lookupKeys;
    protected final CacheStrategy cacheStrategy;
    protected SerializableRunnable cacheReloadRunner;
    protected CacheAllReloadConf reloadConf;
    protected transient CacheFactory<Object, RowData> one2oneCacheFactory;
    protected transient Cache<Object, RowData> one2oneCache;
    protected transient ScheduledExecutorService reloadExecutor;
    protected transient AllCache<Object, RowData> one2oneAllCacheHandler;

    protected DimJoinFetcher(String str, String[] strArr, CacheStrategy cacheStrategy) {
        Preconditions.checkArgument(null != str, "sqlTableName cannot be null!");
        Preconditions.checkArgument(null != strArr, "lookupKeys cannot be null!");
        Preconditions.checkArgument(null != cacheStrategy, "cacheStrategy cannot be null!");
        this.lookupKeys = strArr;
        this.sqlTableName = str;
        this.cacheStrategy = cacheStrategy;
    }

    public void setAllCacheReloadRunner(SerializableRunnable serializableRunnable, CacheAllReloadConf cacheAllReloadConf) {
        if (this.cacheStrategy.isAllCache()) {
            Objects.requireNonNull(serializableRunnable);
            Objects.requireNonNull(cacheAllReloadConf);
            Objects.requireNonNull(cacheAllReloadConf.timeRangeBlackList);
            this.cacheReloadRunner = serializableRunnable;
            this.reloadConf = cacheAllReloadConf;
        }
    }

    public abstract void openConnection(Configuration configuration);

    public abstract void closeConnection();

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        LOG.info("the {}th started worker, begin to prepare connection...", Integer.valueOf(counter.incrementAndGet()));
        openConnection(configuration);
        LOG.info("table " + this.sqlTableName + " preparing one2oneCache...");
        this.one2oneCacheFactory = CacheFactory.getInstance();
        this.one2oneCache = this.one2oneCacheFactory.getCache(this.sqlTableName, this.cacheStrategy);
        LOG.info("table " + this.sqlTableName + " one2oneCache prepared, strategy:" + this.cacheStrategy);
        if (this.cacheStrategy.isAllCache()) {
            this.one2oneAllCacheHandler = (AllCache) this.one2oneCache;
            if (this.one2oneAllCacheHandler.isRegisteredTimer.compareAndSet(false, true)) {
                this.reloadExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("one2oneCache-reload").setDaemon(true).build());
                this.one2oneAllCacheHandler.setScheduledFuture(this.reloadExecutor.scheduleWithFixedDelay(new Thread(this.cacheReloadRunner), 0L, this.reloadConf.ttlMs, TimeUnit.MILLISECONDS));
            }
        }
    }

    public void close() throws Exception {
        try {
            try {
                if (this.cacheStrategy.isAllCache()) {
                    LOG.info("start to cancel reloading thread...");
                    ScheduledFuture<?> scheduledFuture = this.one2oneAllCacheHandler.getScheduledFuture();
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    if (null != this.reloadExecutor && !this.reloadExecutor.isShutdown()) {
                        this.reloadExecutor.shutdownNow();
                        this.reloadExecutor = null;
                    }
                }
                LOG.info("start to close connection...");
                closeConnection();
                if (counter.decrementAndGet() == 0 && this.cacheStrategy.isAllCache()) {
                    this.one2oneAllCacheHandler.isRegisteredTimer.compareAndSet(true, false);
                }
            } catch (Exception e) {
                LOG.error("Error happens when shutdown reload executor.", e);
                if (counter.decrementAndGet() == 0 && this.cacheStrategy.isAllCache()) {
                    this.one2oneAllCacheHandler.isRegisteredTimer.compareAndSet(true, false);
                }
            }
            LOG.info("start to release cache of table:{} ...", this.sqlTableName);
            if (this.one2oneCacheFactory != null) {
                LOG.info("table " + this.sqlTableName + " one2oneCache removing...");
                this.one2oneCacheFactory.removeCache(this.sqlTableName);
                LOG.info("table " + this.sqlTableName + " one2oneCache removed");
            }
            super.close();
        } catch (Throwable th) {
            if (counter.decrementAndGet() == 0 && this.cacheStrategy.isAllCache()) {
                this.one2oneAllCacheHandler.isRegisteredTimer.compareAndSet(true, false);
            }
            throw th;
        }
    }

    public Object getKey(RowData rowData, List<Integer> list, LogicalType[] logicalTypeArr) {
        if (list.size() == 1) {
            return safeGet(rowData, list.get(0).intValue(), logicalTypeArr[0]);
        }
        GenericRowData genericRowData = new GenericRowData(list.size());
        for (int i = 0; i < list.size(); i++) {
            Object safeGet = safeGet(rowData, list.get(i).intValue(), logicalTypeArr[i]);
            if (safeGet == null) {
                return null;
            }
            genericRowData.setField(i, safeGet);
        }
        return genericRowData;
    }

    protected Object safeGet(RowData rowData, int i, LogicalType logicalType) {
        if (rowData == null || rowData.isNullAt(i)) {
            return null;
        }
        return RowData.get(rowData, i, logicalType);
    }

    protected int getColumnIndex(String str, String[] strArr) {
        for (int i = 0; i < strArr.length; i++) {
            if (str.equals(strArr[i])) {
                return i;
            }
        }
        return -1;
    }
}
