/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.lookup.FullCacheLookupTable;
import org.apache.paimon.flink.lookup.LookupTable;
import org.apache.paimon.flink.lookup.PartitionLoader;
import org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable;
import org.apache.paimon.flink.lookup.RefreshBlacklist;
import org.apache.paimon.flink.lookup.ReopenException;
import org.apache.paimon.flink.lookup.partitioner.ShuffleStrategy;
import org.apache.paimon.flink.query.RemoteTableQuery;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.lookup.rocksdb.RocksDBOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileStoreLookupFunction
implements Serializable,
Closeable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(FileStoreLookupFunction.class);
    private final FileStoreTable table;
    @Nullable
    private final PartitionLoader partitionLoader;
    private final List<String> projectFields;
    private final List<String> joinKeys;
    @Nullable
    private final Predicate predicate;
    @Nullable
    private final RefreshBlacklist refreshBlacklist;
    @Nullable
    private final ShuffleStrategy strategy;
    private final List<InternalRow.FieldGetter> projectFieldsGetters;
    private transient File path;
    private transient LookupTable lookupTable;
    private transient Duration refreshInterval;
    private transient long nextRefreshTime;
    protected FunctionContext functionContext;
    @Nullable
    private Filter<InternalRow> cacheRowFilter;

    public FileStoreLookupFunction(FileStoreTable table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate, @Nullable ShuffleStrategy strategy) {
        if (!TableScanUtils.supportCompactDiffStreamingReading(table)) {
            TableScanUtils.streamingReadingValidate(table);
        }
        this.table = table;
        this.partitionLoader = PartitionLoader.of(table);
        RowType rowType = table.rowType();
        this.joinKeys = Arrays.stream(joinKeyIndex).mapToObj(i -> rowType.getFieldNames().get(projection[i])).collect(Collectors.toList());
        this.projectFields = Arrays.stream(projection).mapToObj(i -> rowType.getFieldNames().get(i)).collect(Collectors.toList());
        this.projectFieldsGetters = Arrays.stream(projection).mapToObj(i -> InternalRow.createFieldGetter(rowType.getTypeAt(i), i)).collect(Collectors.toList());
        for (String field : table.primaryKeys()) {
            if (this.projectFields.contains(field)) continue;
            this.projectFields.add(field);
        }
        if (this.partitionLoader != null) {
            this.partitionLoader.addPartitionKeysTo(this.joinKeys, this.projectFields);
        }
        this.predicate = predicate;
        this.refreshBlacklist = RefreshBlacklist.create(table.options().get(FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key()));
        this.strategy = strategy;
    }

    public void open(FunctionContext context) throws Exception {
        this.functionContext = context;
        String tmpDirectory = FileStoreLookupFunction.getTmpDirectory(context);
        this.open(tmpDirectory);
    }

    void open(String tmpDirectory) throws Exception {
        this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
        if (!this.path.mkdirs()) {
            throw new RuntimeException("Failed to create dir: " + this.path);
        }
        this.open();
    }

    private void open() throws Exception {
        this.nextRefreshTime = -1L;
        Options options = Options.fromMap(this.table.options());
        this.refreshInterval = options.getOptional(RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL).orElse(options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL));
        List<String> fieldNames = this.table.rowType().getFieldNames();
        int[] projection = this.projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
        LOG.info("lookup projection fields in lookup table:{}, join fields in lookup table:{}", this.projectFields, this.joinKeys);
        LOG.info("Creating lookup table for {}.", (Object)this.table.name());
        if (options.get(FlinkConnectorOptions.LOOKUP_CACHE_MODE) == FlinkConnectorOptions.LookupCacheMode.AUTO && new HashSet<String>(this.table.primaryKeys()).equals(new HashSet<String>(this.joinKeys))) {
            if (RemoteTableQuery.isRemoteServiceAvailable(this.table)) {
                this.lookupTable = PrimaryKeyPartialLookupTable.createRemoteTable(this.table, projection, this.joinKeys);
                LOG.info("Remote service is available. Created PrimaryKeyPartialLookupTable with remote service.");
            } else {
                try {
                    this.lookupTable = PrimaryKeyPartialLookupTable.createLocalTable(this.table, projection, this.path, this.joinKeys, this.getRequireCachedBucketIds());
                    LOG.info("Remote service isn't available. Created PrimaryKeyPartialLookupTable with LocalQueryExecutor.");
                }
                catch (UnsupportedOperationException e) {
                    LOG.info("Remote service isn't available. Cannot create PrimaryKeyPartialLookupTable with LocalQueryExecutor because {}. Will create FullCacheLookupTable.", (Object)e.getMessage());
                }
            }
        }
        if (this.lookupTable == null) {
            FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(this.table, projection, this.predicate, this.createProjectedPredicate(projection), this.path, this.joinKeys, this.getRequireCachedBucketIds());
            this.lookupTable = FullCacheLookupTable.create(context, options.get(RocksDBOptions.LOOKUP_CACHE_ROWS));
            LOG.info("Created {}.", (Object)this.lookupTable.getClass().getSimpleName());
        }
        if (this.partitionLoader != null) {
            this.partitionLoader.open();
            this.partitionLoader.checkRefresh();
            List<BinaryRow> partitions = this.partitionLoader.partitions();
            if (!partitions.isEmpty()) {
                this.lookupTable.specificPartitionFilter(this.partitionLoader.createSpecificPartFilter());
            }
        }
        if (this.cacheRowFilter != null) {
            this.lookupTable.specifyCacheRowFilter(this.cacheRowFilter);
        }
        this.lookupTable.open();
    }

    @Nullable
    private Predicate createProjectedPredicate(int[] projection) {
        Predicate adjustedPredicate = null;
        if (this.predicate != null) {
            adjustedPredicate = PredicateBuilder.transformFieldMapping(this.predicate, IntStream.range(0, this.table.rowType().getFieldCount()).map(i -> Ints.indexOf(projection, i)).toArray()).orElse(null);
        }
        return adjustedPredicate;
    }

    public Collection<RowData> lookup(RowData keyRow) {
        try {
            this.tryRefresh();
            if (LOG.isDebugEnabled()) {
                LOG.debug("lookup key:{}", (Object)keyRow.toString());
            }
            FlinkRowWrapper key = new FlinkRowWrapper(keyRow);
            if (this.partitionLoader == null) {
                return this.lookupInternal(key);
            }
            if (this.partitionLoader.partitions().isEmpty()) {
                return Collections.emptyList();
            }
            ArrayList<RowData> rows = new ArrayList<RowData>();
            for (BinaryRow partition : this.partitionLoader.partitions()) {
                rows.addAll(this.lookupInternal(JoinedRow.join(key, partition)));
            }
            return rows;
        }
        catch (ReopenException | OutOfRangeException e) {
            this.reopen();
            return this.lookup(keyRow);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private List<RowData> lookupInternal(InternalRow key) throws IOException {
        ArrayList<RowData> rows = new ArrayList<RowData>();
        List<InternalRow> lookupResults = this.lookupTable.get(key);
        for (InternalRow matchedRow : lookupResults) {
            rows.add(new FlinkRowData(matchedRow));
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("matched rows in lookup table, size:{}, rows:{}", (Object)lookupResults.size(), lookupResults.stream().map(row -> this.logRow(this.projectFieldsGetters, (InternalRow)row)).collect(Collectors.toList()));
        }
        return rows;
    }

    private void reopen() {
        try {
            this.close();
            this.open();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    void tryRefresh() throws Exception {
        if (this.refreshBlacklist != null && !this.refreshBlacklist.canRefresh()) {
            return;
        }
        if (this.partitionLoader != null) {
            boolean partitionChanged = this.partitionLoader.checkRefresh();
            List<BinaryRow> partitions = this.partitionLoader.partitions();
            if (partitions.isEmpty()) {
                return;
            }
            if (partitionChanged) {
                this.lookupTable.specificPartitionFilter(this.partitionLoader.createSpecificPartFilter());
                this.lookupTable.close();
                this.lookupTable.open();
                return;
            }
        }
        if (this.shouldRefreshLookupTable()) {
            this.lookupTable.refresh();
            this.nextRefreshTime = System.currentTimeMillis() + this.refreshInterval.toMillis();
        }
    }

    private boolean shouldRefreshLookupTable() {
        if (this.nextRefreshTime > System.currentTimeMillis()) {
            return false;
        }
        if (this.nextRefreshTime > 0L) {
            LOG.info("Lookup table {} has refreshed after {} second(s), refreshing", (Object)this.table.name(), (Object)(this.refreshInterval.toMillis() / 1000L));
        }
        return true;
    }

    @VisibleForTesting
    LookupTable lookupTable() {
        return this.lookupTable;
    }

    @VisibleForTesting
    long nextBlacklistCheckTime() {
        return this.refreshBlacklist == null ? -1L : this.refreshBlacklist.nextBlacklistCheckTime();
    }

    @Override
    public void close() throws IOException {
        if (this.lookupTable != null) {
            this.lookupTable.close();
            this.lookupTable = null;
        }
        if (this.path != null) {
            FileIOUtils.deleteDirectoryQuietly(this.path);
        }
    }

    private static String getTmpDirectory(FunctionContext context) {
        try {
            Field field = context.getClass().getDeclaredField("context");
            field.setAccessible(true);
            StreamingRuntimeContext runtimeContext = FileStoreLookupFunction.extractStreamingRuntimeContext(field.get(context));
            String[] tmpDirectories = runtimeContext.getTaskManagerRuntimeInfo().getTmpDirectories();
            return tmpDirectories[ThreadLocalRandom.current().nextInt(tmpDirectories.length)];
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException(e);
        }
    }

    private static StreamingRuntimeContext extractStreamingRuntimeContext(Object runtimeContext) throws NoSuchFieldException, IllegalAccessException {
        if (runtimeContext instanceof StreamingRuntimeContext) {
            return (StreamingRuntimeContext)runtimeContext;
        }
        Field field = runtimeContext.getClass().getDeclaredField("runtimeContext");
        field.setAccessible(true);
        return FileStoreLookupFunction.extractStreamingRuntimeContext(field.get(runtimeContext));
    }

    protected Set<Integer> getRequireCachedBucketIds() {
        if (this.strategy == null) {
            return null;
        }
        Integer indexOfThisSubtask = RuntimeContextUtils.getIndexOfThisSubtask(this.functionContext);
        Integer numberOfParallelSubtasks = RuntimeContextUtils.getNumberOfParallelSubtasks(this.functionContext);
        if (indexOfThisSubtask == null) {
            Preconditions.checkState(numberOfParallelSubtasks == null);
            return null;
        }
        Preconditions.checkState(numberOfParallelSubtasks != null);
        return this.strategy.getRequiredCacheBucketIds(indexOfThisSubtask, numberOfParallelSubtasks);
    }

    protected void setCacheRowFilter(@Nullable Filter<InternalRow> cacheRowFilter) {
        this.cacheRowFilter = cacheRowFilter;
    }

    private String logRow(List<InternalRow.FieldGetter> fieldGetters, InternalRow row) {
        ArrayList<String> rowValues = new ArrayList<String>(fieldGetters.size());
        for (InternalRow.FieldGetter fieldGetter : fieldGetters) {
            Object fieldValue = fieldGetter.getFieldOrNull(row);
            String value = fieldValue == null ? "null" : fieldValue.toString();
            rowValues.add(value);
        }
        return ((Object)rowValues).toString();
    }
}

