/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.deltajoin;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.CollectionSupplier;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache;
import org.apache.flink.table.runtime.operators.join.lookup.CalcCollectionCollector;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncDeltaJoinRunner
extends RichAsyncFunction<RowData, RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncDeltaJoinRunner.class);
    private static final long serialVersionUID = 1L;
    private static final String METRIC_DELTA_JOIN_LEFT_CALL_ASYNC_FETCH_COST_TIME = "deltaJoinLeftCallAsyncFetchCostTime";
    private static final String METRIC_DELTA_JOIN_RIGHT_CALL_ASYNC_FETCH_COST_TIME = "deltaJoinRightCallAsyncFetchCostTime";
    private final GeneratedFunction<AsyncFunction<RowData, Object>> generatedFetcher;
    private final DataStructureConverter<RowData, Object> fetcherConverter;
    @Nullable
    private final GeneratedFunction<FlatMapFunction<RowData, RowData>> lookupSideGeneratedCalc;
    private final GeneratedResultFuture<TableFunctionResultFuture<RowData>> generatedResultFuture;
    private final int asyncBufferCapacity;
    private transient AsyncFunction<RowData, Object> fetcher;
    protected final RowDataSerializer lookupSideRowSerializer;
    private final boolean treatRightAsLookupTable;
    private final boolean enableCache;
    private final RowDataKeySelector leftJoinKeySelector;
    private final RowDataKeySelector leftUpsertKeySelector;
    private final RowDataKeySelector rightJoinKeySelector;
    private final RowDataKeySelector rightUpsertKeySelector;
    private transient DeltaJoinCache cache;
    private transient BlockingQueue<JoinedRowResultFuture> resultFutureBuffer;
    private transient List<JoinedRowResultFuture> allResultFutures;
    private transient long callAsyncFetchCostTime = 0L;

    public AsyncDeltaJoinRunner(GeneratedFunction<AsyncFunction<RowData, Object>> generatedFetcher, DataStructureConverter<RowData, Object> fetcherConverter, @Nullable GeneratedFunction<FlatMapFunction<RowData, RowData>> lookupSideGeneratedCalc, GeneratedResultFuture<TableFunctionResultFuture<RowData>> generatedResultFuture, RowDataSerializer lookupSideRowSerializer, RowDataKeySelector leftJoinKeySelector, RowDataKeySelector leftUpsertKeySelector, RowDataKeySelector rightJoinKeySelector, RowDataKeySelector rightUpsertKeySelector, int asyncBufferCapacity, boolean treatRightAsLookupTable, boolean enableCache) {
        this.generatedFetcher = generatedFetcher;
        this.fetcherConverter = fetcherConverter;
        this.lookupSideGeneratedCalc = lookupSideGeneratedCalc;
        this.generatedResultFuture = generatedResultFuture;
        this.lookupSideRowSerializer = lookupSideRowSerializer;
        this.leftJoinKeySelector = leftJoinKeySelector;
        this.leftUpsertKeySelector = leftUpsertKeySelector;
        this.rightJoinKeySelector = rightJoinKeySelector;
        this.rightUpsertKeySelector = rightUpsertKeySelector;
        this.asyncBufferCapacity = asyncBufferCapacity;
        this.treatRightAsLookupTable = treatRightAsLookupTable;
        this.enableCache = enableCache;
    }

    public void setCache(DeltaJoinCache cache) {
        this.cache = cache;
    }

    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
        this.fetcher = (AsyncFunction)this.generatedFetcher.newInstance(this.getRuntimeContext().getUserCodeClassLoader());
        FunctionUtils.setFunctionRuntimeContext(this.fetcher, (RuntimeContext)this.getRuntimeContext());
        FunctionUtils.openFunction(this.fetcher, (OpenContext)openContext);
        if (this.lookupSideGeneratedCalc != null) {
            this.lookupSideGeneratedCalc.compile(this.getRuntimeContext().getUserCodeClassLoader());
        }
        this.generatedResultFuture.compile(this.getRuntimeContext().getUserCodeClassLoader());
        this.fetcherConverter.open(this.getRuntimeContext().getUserCodeClassLoader());
        this.resultFutureBuffer = new ArrayBlockingQueue<JoinedRowResultFuture>(this.asyncBufferCapacity + 1);
        this.allResultFutures = new ArrayList<JoinedRowResultFuture>();
        LOG.info("Begin to initialize reusable result futures with size {}", (Object)(this.asyncBufferCapacity + 1));
        for (int i = 0; i < this.asyncBufferCapacity + 1; ++i) {
            JoinedRowResultFuture rf = new JoinedRowResultFuture(this.resultFutureBuffer, this.createCalcFunction(openContext), this.createFetcherResultFuture(openContext), this.fetcherConverter, this.treatRightAsLookupTable, this.leftUpsertKeySelector, this.rightUpsertKeySelector, this.lookupSideRowSerializer, this.enableCache, this.cache);
            this.resultFutureBuffer.add(rf);
            this.allResultFutures.add(rf);
        }
        LOG.info("Finish initializing reusable result futures");
        this.getRuntimeContext().getMetricGroup().gauge(this.treatRightAsLookupTable ? METRIC_DELTA_JOIN_LEFT_CALL_ASYNC_FETCH_COST_TIME : METRIC_DELTA_JOIN_RIGHT_CALL_ASYNC_FETCH_COST_TIME, () -> this.callAsyncFetchCostTime);
    }

    public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) throws Exception {
        Optional<Collection<Object>> dataFromCache;
        JoinedRowResultFuture outResultFuture = this.resultFutureBuffer.take();
        RowData streamJoinKey = null;
        if (this.enableCache) {
            if (this.treatRightAsLookupTable) {
                streamJoinKey = (RowData)this.leftJoinKeySelector.getKey(input);
                this.cache.requestRightCache();
            } else {
                streamJoinKey = (RowData)this.rightJoinKeySelector.getKey(input);
                this.cache.requestLeftCache();
            }
        }
        outResultFuture.reset(streamJoinKey, input, resultFuture);
        if (this.enableCache && (dataFromCache = this.tryGetDataFromCache(streamJoinKey)).isPresent()) {
            outResultFuture.complete(dataFromCache.get(), true);
            return;
        }
        long startTime = System.currentTimeMillis();
        this.fetcher.asyncInvoke((Object)input, (ResultFuture)outResultFuture);
        this.callAsyncFetchCostTime = System.currentTimeMillis() - startTime;
    }

    @Nullable
    private FlatMapFunction<RowData, RowData> createCalcFunction(OpenContext openContext) throws Exception {
        FlatMapFunction calc = null;
        if (this.lookupSideGeneratedCalc != null) {
            calc = (FlatMapFunction)this.lookupSideGeneratedCalc.newInstance(this.getRuntimeContext().getUserCodeClassLoader());
            FunctionUtils.setFunctionRuntimeContext((Function)calc, (RuntimeContext)this.getRuntimeContext());
            FunctionUtils.openFunction((Function)calc, (OpenContext)openContext);
        }
        return calc;
    }

    public TableFunctionResultFuture<RowData> createFetcherResultFuture(OpenContext openContext) throws Exception {
        TableFunctionResultFuture resultFuture = (TableFunctionResultFuture)((Object)this.generatedResultFuture.newInstance(this.getRuntimeContext().getUserCodeClassLoader()));
        FunctionUtils.setFunctionRuntimeContext((Function)resultFuture, (RuntimeContext)this.getRuntimeContext());
        FunctionUtils.openFunction((Function)resultFuture, (OpenContext)openContext);
        return resultFuture;
    }

    public void close() throws Exception {
        super.close();
        if (this.fetcher != null) {
            FunctionUtils.closeFunction(this.fetcher);
        }
        if (this.allResultFutures != null) {
            for (JoinedRowResultFuture rf : this.allResultFutures) {
                rf.close();
            }
        }
    }

    @VisibleForTesting
    public AsyncFunction<RowData, Object> getFetcher() {
        return this.fetcher;
    }

    @VisibleForTesting
    public DeltaJoinCache getCache() {
        return this.cache;
    }

    private Optional<Collection<Object>> tryGetDataFromCache(RowData joinKey) {
        Preconditions.checkState((boolean)this.enableCache);
        if (this.treatRightAsLookupTable) {
            LinkedHashMap<RowData, Object> rightRows = this.cache.getData(joinKey, true);
            if (rightRows != null) {
                this.cache.hitRightCache();
                return Optional.of(rightRows.values());
            }
        } else {
            LinkedHashMap<RowData, Object> leftRows = this.cache.getData(joinKey, false);
            if (leftRows != null) {
                this.cache.hitLeftCache();
                return Optional.of(leftRows.values());
            }
        }
        return Optional.empty();
    }

    @VisibleForTesting
    public static final class JoinedRowResultFuture
    implements ResultFuture<Object> {
        private final BlockingQueue<JoinedRowResultFuture> resultFutureBuffer;
        @Nullable
        private final FlatMapFunction<RowData, RowData> calcFunction;
        private final CalcCollectionCollector calcCollector;
        private final TableFunctionResultFuture<RowData> joinConditionResultFuture;
        private final DataStructureConverter<RowData, Object> resultConverter;
        private final boolean enableCache;
        private final DeltaJoinCache cache;
        private final DelegateResultFuture delegate;
        private final boolean treatRightAsLookupTable;
        private final RowDataKeySelector leftUpsertKeySelector;
        private final RowDataKeySelector rightUpsertKeySelector;
        @Nullable
        private RowData streamJoinKey;
        private RowData streamRow;
        private ResultFuture<RowData> realOutput;

        private JoinedRowResultFuture(BlockingQueue<JoinedRowResultFuture> resultFutureBuffer, @Nullable FlatMapFunction<RowData, RowData> calcFunction, TableFunctionResultFuture<RowData> joinConditionResultFuture, DataStructureConverter<RowData, Object> resultConverter, boolean treatRightAsLookupTable, RowDataKeySelector leftUpsertKeySelector, RowDataKeySelector rightUpsertKeySelector, RowDataSerializer lookupSideRowSerializer, boolean enableCache, DeltaJoinCache cache) {
            this.resultFutureBuffer = resultFutureBuffer;
            this.calcFunction = calcFunction;
            this.calcCollector = new CalcCollectionCollector(lookupSideRowSerializer);
            this.joinConditionResultFuture = joinConditionResultFuture;
            this.resultConverter = resultConverter;
            this.enableCache = enableCache;
            this.cache = cache;
            this.delegate = new DelegateResultFuture();
            this.treatRightAsLookupTable = treatRightAsLookupTable;
            this.leftUpsertKeySelector = leftUpsertKeySelector;
            this.rightUpsertKeySelector = rightUpsertKeySelector;
        }

        public void reset(@Nullable RowData joinKey, RowData row, ResultFuture<RowData> realOutput) {
            Preconditions.checkState((this.enableCache && joinKey != null || !this.enableCache && joinKey == null ? 1 : 0) != 0);
            this.realOutput = realOutput;
            this.streamJoinKey = joinKey;
            this.streamRow = row;
            this.joinConditionResultFuture.setInput(row);
            this.joinConditionResultFuture.setResultFuture(this.delegate);
            this.delegate.reset();
            this.calcCollector.reset();
        }

        public void complete(Collection<Object> result) {
            this.complete(result, false);
        }

        public void complete(Collection<Object> result, boolean fromCache) {
            Collection<RowData> rowDataCollection;
            if (result == null) {
                result = Collections.emptyList();
            }
            Collection<RowData> lookupRowsAfterCalc = rowDataCollection = this.convertToInternalData(result);
            if (!fromCache && this.calcFunction != null && rowDataCollection != null) {
                for (RowData row : rowDataCollection) {
                    try {
                        this.calcFunction.flatMap((Object)row, (Collector)this.calcCollector);
                    }
                    catch (Exception e) {
                        this.completeExceptionally(e);
                    }
                }
                lookupRowsAfterCalc = this.calcCollector.getCollection();
            }
            try {
                this.updateCacheIfNecessary(lookupRowsAfterCalc);
            }
            catch (Throwable t) {
                LOG.info("Failed to update the cache", t);
                this.completeExceptionally(t);
                return;
            }
            try {
                this.joinConditionResultFuture.complete(lookupRowsAfterCalc);
            }
            catch (Throwable t) {
                this.completeExceptionally(t);
                return;
            }
            Collection<RowData> lookupRowsAfterJoin = this.delegate.collection;
            if (lookupRowsAfterJoin == null || lookupRowsAfterJoin.isEmpty()) {
                this.realOutput.complete(Collections.emptyList());
            } else {
                ArrayList<JoinedRowData> outRows = new ArrayList<JoinedRowData>();
                for (RowData lookupRow : lookupRowsAfterJoin) {
                    JoinedRowData outRow = this.treatRightAsLookupTable ? new JoinedRowData(this.streamRow.getRowKind(), this.streamRow, lookupRow) : new JoinedRowData(this.streamRow.getRowKind(), lookupRow, this.streamRow);
                    outRows.add(outRow);
                }
                this.realOutput.complete(outRows);
            }
            try {
                this.resultFutureBuffer.put(this);
            }
            catch (InterruptedException e) {
                this.completeExceptionally(e);
            }
        }

        public void completeExceptionally(Throwable error) {
            this.realOutput.completeExceptionally(error);
        }

        public void complete(CollectionSupplier<Object> supplier) {
            throw new UnsupportedOperationException();
        }

        public void close() throws Exception {
            this.joinConditionResultFuture.close();
        }

        private void updateCacheIfNecessary(Collection<RowData> lookupRows) throws Exception {
            if (!this.enableCache) {
                return;
            }
            if (this.treatRightAsLookupTable) {
                LinkedHashMap<RowData, Object> leftCacheData;
                if (this.cache.getData(this.streamJoinKey, true) == null) {
                    this.cache.buildCache(this.streamJoinKey, this.buildMapWithUkAsKeys(lookupRows, true), true);
                }
                if ((leftCacheData = this.cache.getData(this.streamJoinKey, false)) != null) {
                    RowData uk = (RowData)this.leftUpsertKeySelector.getKey(this.streamRow);
                    this.cache.upsertCache(this.streamJoinKey, uk, this.streamRow, false);
                }
            } else {
                LinkedHashMap<RowData, Object> rightCacheData;
                if (this.cache.getData(this.streamJoinKey, false) == null) {
                    this.cache.buildCache(this.streamJoinKey, this.buildMapWithUkAsKeys(lookupRows, false), false);
                }
                if ((rightCacheData = this.cache.getData(this.streamJoinKey, true)) != null) {
                    RowData uk = (RowData)this.rightUpsertKeySelector.getKey(this.streamRow);
                    this.cache.upsertCache(this.streamJoinKey, uk, this.streamRow, true);
                }
            }
        }

        private LinkedHashMap<RowData, Object> buildMapWithUkAsKeys(Collection<RowData> lookupRows, boolean treatRightAsLookupTable) throws Exception {
            LinkedHashMap<RowData, Object> map = new LinkedHashMap<RowData, Object>();
            for (RowData lookupRow : lookupRows) {
                RowData uk;
                RowData rowData = this.convertToInternalData(lookupRow);
                if (treatRightAsLookupTable) {
                    uk = (RowData)this.rightUpsertKeySelector.getKey(rowData);
                    map.put(uk, lookupRow);
                    continue;
                }
                uk = (RowData)this.leftUpsertKeySelector.getKey(rowData);
                map.put(uk, lookupRow);
            }
            return map;
        }

        private RowData convertToInternalData(Object data) {
            if (this.resultConverter.isIdentityConversion()) {
                return (RowData)data;
            }
            return this.resultConverter.toInternal(data);
        }

        private Collection<RowData> convertToInternalData(Collection<Object> data) {
            if (this.resultConverter.isIdentityConversion()) {
                return data;
            }
            ArrayList<RowData> result = new ArrayList<RowData>(data.size());
            for (Object element : data) {
                result.add(this.resultConverter.toInternal(element));
            }
            return result;
        }

        private final class DelegateResultFuture
        implements ResultFuture<RowData> {
            private Collection<RowData> collection;

            private DelegateResultFuture() {
            }

            public void reset() {
                this.collection = null;
            }

            public void complete(Collection<RowData> result) {
                this.collection = result;
            }

            public void completeExceptionally(Throwable error) {
                JoinedRowResultFuture.this.completeExceptionally(error);
            }

            public void complete(CollectionSupplier<RowData> supplier) {
                throw new UnsupportedOperationException();
            }
        }
    }
}

