package org.apache.flink.table.runtime.functions.table.lookup;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.ThreadSafeSimpleCounter;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;

/* loaded from: input_file:org/apache/flink/table/runtime/functions/table/lookup/CachingAsyncLookupFunction.class */
public class CachingAsyncLookupFunction extends AsyncLookupFunction {
    private static final long serialVersionUID = 1;
    public static final String LOOKUP_CACHE_METRIC_GROUP_NAME = "cache";
    private static final long UNINITIALIZED = -1;
    private final AsyncLookupFunction delegate;
    private LookupCache cache;
    private transient String cacheIdentifier;
    private transient CacheMetricGroup cacheMetricGroup;
    private transient Counter loadCounter;
    private transient Counter numLoadFailuresCounter;
    private volatile long latestLoadTime = UNINITIALIZED;

    public CachingAsyncLookupFunction(LookupCache lookupCache, AsyncLookupFunction asyncLookupFunction) {
        this.cache = lookupCache;
        this.delegate = asyncLookupFunction;
    }

    public void open(FunctionContext functionContext) throws Exception {
        this.cacheIdentifier = functionIdentifier();
        this.cache = LookupCacheManager.getInstance().registerCacheIfAbsent(this.cacheIdentifier, this.cache);
        this.cacheMetricGroup = new InternalCacheMetricGroup(functionContext.getMetricGroup(), "cache");
        this.loadCounter = new ThreadSafeSimpleCounter();
        this.cacheMetricGroup.loadCounter(this.loadCounter);
        this.numLoadFailuresCounter = new ThreadSafeSimpleCounter();
        this.cacheMetricGroup.numLoadFailuresCounter(this.numLoadFailuresCounter);
        this.cache.open(this.cacheMetricGroup);
        this.delegate.open(functionContext);
    }

    public CompletableFuture<Collection<RowData>> asyncLookup(RowData rowData) {
        Collection ifPresent = this.cache.getIfPresent(rowData);
        if (ifPresent != null) {
            return CompletableFuture.completedFuture(ifPresent);
        }
        long currentTimeMillis = System.currentTimeMillis();
        return this.delegate.asyncLookup(rowData).whenComplete((collection, th) -> {
            if (th != null) {
                this.numLoadFailuresCounter.inc();
                throw new RuntimeException(String.format("Failed to lookup key '%s'", rowData), th);
            }
            updateLatestLoadTime(System.currentTimeMillis() - currentTimeMillis);
            this.loadCounter.inc();
            Collection collection = collection;
            if (collection == null || collection.isEmpty()) {
                collection = Collections.emptyList();
            }
            this.cache.put(rowData, collection);
        });
    }

    public void close() throws Exception {
        this.delegate.close();
        if (this.cacheIdentifier != null) {
            LookupCacheManager.getInstance().unregisterCache(this.cacheIdentifier);
        }
    }

    @VisibleForTesting
    public LookupCache getCache() {
        return this.cache;
    }

    private synchronized void updateLatestLoadTime(long j) {
        if (this.latestLoadTime == UNINITIALIZED) {
            this.cacheMetricGroup.latestLoadTimeGauge(() -> {
                return Long.valueOf(this.latestLoadTime);
            });
        }
        this.latestLoadTime = j;
    }
}
