/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.lookup;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.DataFormatConverters;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;

public class AsyncLookupJoinRunner
extends RichAsyncFunction<BaseRow, BaseRow> {
    private static final long serialVersionUID = -6664660022391632480L;
    private final GeneratedFunction<AsyncFunction<BaseRow, Object>> generatedFetcher;
    private final GeneratedResultFuture<TableFunctionResultFuture<BaseRow>> generatedResultFuture;
    private final boolean isLeftOuterJoin;
    private final int asyncBufferCapacity;
    private final TypeInformation<?> fetcherReturnType;
    private final BaseRowTypeInfo rightRowTypeInfo;
    private transient AsyncFunction<BaseRow, Object> fetcher;
    private transient BlockingQueue<JoinedRowResultFuture> resultFutureBuffer;
    private transient List<JoinedRowResultFuture> allResultFutures;

    public AsyncLookupJoinRunner(GeneratedFunction<AsyncFunction<BaseRow, Object>> generatedFetcher, GeneratedResultFuture<TableFunctionResultFuture<BaseRow>> generatedResultFuture, TypeInformation<?> fetcherReturnType, BaseRowTypeInfo rightRowTypeInfo, boolean isLeftOuterJoin, int asyncBufferCapacity) {
        this.generatedFetcher = generatedFetcher;
        this.generatedResultFuture = generatedResultFuture;
        this.isLeftOuterJoin = isLeftOuterJoin;
        this.asyncBufferCapacity = asyncBufferCapacity;
        this.fetcherReturnType = fetcherReturnType;
        this.rightRowTypeInfo = rightRowTypeInfo;
    }

    public void open(Configuration parameters) throws Exception {
        DataFormatConverters.RowConverter rowConverter;
        super.open(parameters);
        this.fetcher = (AsyncFunction)this.generatedFetcher.newInstance(this.getRuntimeContext().getUserCodeClassLoader());
        FunctionUtils.setFunctionRuntimeContext(this.fetcher, (RuntimeContext)this.getRuntimeContext());
        FunctionUtils.openFunction(this.fetcher, (Configuration)parameters);
        this.generatedResultFuture.compile(this.getRuntimeContext().getUserCodeClassLoader());
        if (this.fetcherReturnType instanceof RowTypeInfo) {
            rowConverter = (DataFormatConverters.RowConverter)DataFormatConverters.getConverterForDataType(TypeConversions.fromLegacyInfoToDataType(this.fetcherReturnType));
        } else if (this.fetcherReturnType instanceof BaseRowTypeInfo) {
            rowConverter = null;
        } else {
            throw new IllegalStateException("This should never happen, currently fetcherReturnType can only be BaseRowTypeInfo or RowTypeInfo");
        }
        this.resultFutureBuffer = new ArrayBlockingQueue<JoinedRowResultFuture>(this.asyncBufferCapacity + 1);
        this.allResultFutures = new ArrayList<JoinedRowResultFuture>();
        for (int i = 0; i < this.asyncBufferCapacity + 1; ++i) {
            JoinedRowResultFuture rf = new JoinedRowResultFuture(this.resultFutureBuffer, this.createFetcherResultFuture(parameters), rowConverter, this.isLeftOuterJoin, this.rightRowTypeInfo.getArity());
            this.resultFutureBuffer.add(rf);
            this.allResultFutures.add(rf);
        }
    }

    public void asyncInvoke(BaseRow input, ResultFuture<BaseRow> resultFuture) throws Exception {
        JoinedRowResultFuture outResultFuture = this.resultFutureBuffer.take();
        outResultFuture.reset(input, resultFuture);
        this.fetcher.asyncInvoke((Object)input, (ResultFuture)outResultFuture);
    }

    public TableFunctionResultFuture<BaseRow> createFetcherResultFuture(Configuration parameters) throws Exception {
        TableFunctionResultFuture resultFuture = (TableFunctionResultFuture)((Object)this.generatedResultFuture.newInstance(this.getRuntimeContext().getUserCodeClassLoader()));
        FunctionUtils.setFunctionRuntimeContext((Function)resultFuture, (RuntimeContext)this.getRuntimeContext());
        FunctionUtils.openFunction((Function)resultFuture, (Configuration)parameters);
        return resultFuture;
    }

    public void close() throws Exception {
        super.close();
        if (this.fetcher != null) {
            FunctionUtils.closeFunction(this.fetcher);
        }
        for (JoinedRowResultFuture rf : this.allResultFutures) {
            rf.close();
        }
    }

    private static final class JoinedRowResultFuture
    implements ResultFuture<Object> {
        private final BlockingQueue<JoinedRowResultFuture> resultFutureBuffer;
        private final TableFunctionResultFuture<BaseRow> joinConditionResultFuture;
        private final DataFormatConverters.RowConverter rowConverter;
        private final boolean isLeftOuterJoin;
        private final DelegateResultFuture delegate;
        private final GenericRow nullRow;
        private BaseRow leftRow;
        private ResultFuture<BaseRow> realOutput;

        private JoinedRowResultFuture(BlockingQueue<JoinedRowResultFuture> resultFutureBuffer, TableFunctionResultFuture<BaseRow> joinConditionResultFuture, @Nullable DataFormatConverters.RowConverter rowConverter, boolean isLeftOuterJoin, int rightArity) {
            this.resultFutureBuffer = resultFutureBuffer;
            this.joinConditionResultFuture = joinConditionResultFuture;
            this.rowConverter = rowConverter;
            this.isLeftOuterJoin = isLeftOuterJoin;
            this.delegate = new DelegateResultFuture();
            this.nullRow = new GenericRow(rightArity);
        }

        public void reset(BaseRow row, ResultFuture<BaseRow> realOutput) {
            this.realOutput = realOutput;
            this.leftRow = row;
            this.joinConditionResultFuture.setInput(row);
            this.joinConditionResultFuture.setResultFuture(this.delegate);
            this.delegate.reset();
        }

        public void complete(Collection<Object> result) {
            Collection<Object> baseRows;
            if (this.rowConverter == null) {
                baseRows = result;
            } else {
                baseRows = new ArrayList<Object>(result.size());
                for (Object element : result) {
                    Row row = (Row)element;
                    baseRows.add(this.rowConverter.toInternal(row));
                }
            }
            try {
                this.joinConditionResultFuture.complete(baseRows);
            }
            catch (Throwable t) {
                this.completeExceptionally(t);
                return;
            }
            Collection rightRows = this.delegate.collection;
            if (rightRows == null || rightRows.isEmpty()) {
                if (this.isLeftOuterJoin) {
                    JoinedRow outRow = new JoinedRow(this.leftRow, this.nullRow);
                    outRow.setHeader(this.leftRow.getHeader());
                    this.realOutput.complete(Collections.singleton(outRow));
                } else {
                    this.realOutput.complete(Collections.emptyList());
                }
            } else {
                ArrayList<JoinedRow> outRows = new ArrayList<JoinedRow>();
                for (BaseRow rightRow : rightRows) {
                    JoinedRow outRow = new JoinedRow(this.leftRow, rightRow);
                    outRow.setHeader(this.leftRow.getHeader());
                    outRows.add(outRow);
                }
                this.realOutput.complete(outRows);
            }
            try {
                this.resultFutureBuffer.put(this);
            }
            catch (InterruptedException e) {
                this.completeExceptionally(e);
            }
        }

        public void completeExceptionally(Throwable error) {
            this.realOutput.completeExceptionally(error);
        }

        public void close() throws Exception {
            this.joinConditionResultFuture.close();
        }

        private final class DelegateResultFuture
        implements ResultFuture<BaseRow> {
            private Collection<BaseRow> collection;

            private DelegateResultFuture() {
            }

            public void reset() {
                this.collection = null;
            }

            public void complete(Collection<BaseRow> result) {
                this.collection = result;
            }

            public void completeExceptionally(Throwable error) {
                JoinedRowResultFuture.this.completeExceptionally(error);
            }
        }
    }
}

