package com.alibaba.ververica.connectors.jdbc.dim;

import com.alibaba.ververica.connectors.common.dim.cache.AllCache;
import com.alibaba.ververica.connectors.common.dim.cache.CacheFactory;
import com.alibaba.ververica.connectors.common.dim.cache.CacheStrategy;
import com.alibaba.ververica.connectors.common.exception.ConnectorException;
import java.io.Serializable;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.inference.TypeTransformation;
import org.apache.flink.table.types.inference.TypeTransformations;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/jdbc/dim/AllCacheLookupFunction.class */
public abstract class AllCacheLookupFunction<K, V> extends TableFunction<V> {
    private static final Logger LOG = LoggerFactory.getLogger(AllCacheLookupFunction.class);
    private transient AllCache<K, List<V>> allCache;
    private transient CacheFactory cacheFactory;
    private transient ScheduledExecutorService reloadExecutor = null;
    private final CacheStrategy cacheStrategy = CacheStrategy.all();
    private final String tableName;
    private final long ttlMs;

    /* loaded from: input_file:com/alibaba/ververica/connectors/jdbc/dim/AllCacheLookupFunction$ReloadCacheRunner.class */
    private class ReloadCacheRunner implements Serializable, Runnable {
        private ReloadCacheRunner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            AllCacheLookupFunction.this.allCache.initialize();
            try {
                AllCacheLookupFunction.this.scanTable(AllCacheLookupFunction.this.allCache.getTempCache());
            } catch (Throwable th) {
                ConnectorException connectorException = new ConnectorException("Error happens in reload thread.", th);
                AllCacheLookupFunction.LOG.error("Error happens when scanning all data from MySQL.", connectorException);
                AllCacheLookupFunction.this.allCache.setException(connectorException);
            }
            AllCacheLookupFunction.this.allCache.switchCache();
        }
    }

    public AllCacheLookupFunction(String str, long j) {
        this.tableName = str;
        this.ttlMs = j;
    }

    public abstract void openConnection();

    public abstract void closeConnection();

    public abstract K geCacheKey(Object... objArr);

    public abstract void scanTable(Map<K, List<V>> map) throws SQLException, ClassNotFoundException;

    public abstract List<DataType> getArgumentDataTypes();

    public abstract DataType getOutputDataType();

    public TypeInference getTypeInference(DataTypeFactory dataTypeFactory) {
        return TypeInference.newBuilder().typedArguments((List) getArgumentDataTypes().stream().map(dataType -> {
            return DataTypeUtils.transform(dataType, new TypeTransformation[]{TypeTransformations.TO_INTERNAL_CLASS});
        }).collect(Collectors.toList())).outputTypeStrategy(TypeStrategies.explicit(DataTypeUtils.transform(getOutputDataType(), new TypeTransformation[]{TypeTransformations.TO_INTERNAL_CLASS}))).build();
    }

    public void open(FunctionContext functionContext) throws Exception {
        this.cacheFactory = CacheFactory.getInstance();
        this.allCache = (AllCache) this.cacheFactory.getCache(this.tableName, this.cacheStrategy, true);
        super.open(functionContext);
        openConnection();
        if (this.allCache.isRegisteredTimer.compareAndSet(false, true)) {
            this.reloadExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("allCache-reload").setDaemon(true).build());
            this.allCache.setScheduledFuture(this.reloadExecutor.scheduleWithFixedDelay(new Thread(new ReloadCacheRunner()), 0L, this.ttlMs, TimeUnit.MILLISECONDS));
        }
    }

    public void eval(Object... objArr) {
        while (!this.allCache.isLoadedOrThrowException()) {
            try {
                Thread.sleep(10L);
            } catch (Exception e) {
                throw new ConnectorException("AllCache exception.", e);
            }
        }
        K geCacheKey = geCacheKey(objArr);
        this.allCache.lock.readLock().lock();
        try {
            List<V> list = this.allCache.get(geCacheKey);
            this.allCache.lock.readLock().unlock();
            if (list != null) {
                Iterator<V> it = list.iterator();
                while (it.hasNext()) {
                    collect(it.next());
                }
            }
        } catch (Throwable th) {
            this.allCache.lock.readLock().unlock();
            throw th;
        }
    }

    public void close() throws Exception {
        try {
            try {
                if (this.cacheStrategy.isAllCache()) {
                    ScheduledFuture<?> scheduledFuture = this.allCache.getScheduledFuture();
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    if (null != this.reloadExecutor && !this.reloadExecutor.isShutdown()) {
                        this.reloadExecutor.shutdownNow();
                        this.reloadExecutor = null;
                    }
                }
                closeConnection();
                if (this.cacheStrategy.isAllCache()) {
                    this.allCache.isRegisteredTimer.compareAndSet(true, false);
                }
            } catch (Exception e) {
                LOG.error("Error happens when shutdown reload executor.", e);
                if (this.cacheStrategy.isAllCache()) {
                    this.allCache.isRegisteredTimer.compareAndSet(true, false);
                }
            }
            LOG.info("Release cache of table : {} ", this.tableName);
            this.cacheFactory.removeCache(this.tableName);
            super.close();
        } catch (Throwable th) {
            if (this.cacheStrategy.isAllCache()) {
                this.allCache.isRegisteredTimer.compareAndSet(true, false);
            }
            throw th;
        }
    }
}
