package org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.state.v2.MapStateDescriptor;
import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;
import org.apache.flink.table.runtime.operators.join.stream.utils.OuterRecord;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/OuterJoinRecordAsyncStateViews.class */
public final class OuterJoinRecordAsyncStateViews {

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/OuterJoinRecordAsyncStateViews$InputSideHasNoUniqueKey.class */
    private static final class InputSideHasNoUniqueKey implements OuterJoinRecordAsyncStateView {
        private final MapState<RowData, Tuple2<Integer, Integer>> recordState;

        private InputSideHasNoUniqueKey(StreamingRuntimeContext streamingRuntimeContext, String str, InternalTypeInfo<RowData> internalTypeInfo, StateTtlConfig stateTtlConfig) {
            MapStateDescriptor mapStateDescriptor = new MapStateDescriptor(str, internalTypeInfo, new TupleTypeInfo(new TypeInformation[]{Types.INT, Types.INT}));
            if (stateTtlConfig.isEnabled()) {
                mapStateDescriptor.enableTimeToLive(stateTtlConfig);
            }
            this.recordState = streamingRuntimeContext.getMapState(mapStateDescriptor);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.OuterJoinRecordAsyncStateView
        public StateFuture<Void> addRecord(RowData rowData, int i) {
            return this.recordState.asyncGet(rowData).thenApply(tuple2 -> {
                if (tuple2 == null) {
                    return Tuple2.of(1, Integer.valueOf(i));
                }
                tuple2.f0 = Integer.valueOf(((Integer) tuple2.f0).intValue() + 1);
                tuple2.f1 = Integer.valueOf(i);
                return tuple2;
            }).thenCompose(tuple22 -> {
                return this.recordState.asyncPut(rowData, tuple22);
            });
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.OuterJoinRecordAsyncStateView
        public StateFuture<Void> updateNumOfAssociations(RowData rowData, int i) {
            return this.recordState.asyncGet(rowData).thenApply(tuple2 -> {
                if (tuple2 == null) {
                    return Tuple2.of(1, Integer.valueOf(i));
                }
                tuple2.f1 = Integer.valueOf(i);
                return tuple2;
            }).thenCompose(tuple22 -> {
                return this.recordState.asyncPut(rowData, tuple22);
            });
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.JoinRecordAsyncStateView
        public StateFuture<Void> retractRecord(RowData rowData) {
            return this.recordState.asyncGet(rowData).thenCompose(tuple2 -> {
                if (tuple2 == null) {
                    return StateFutureUtils.completedVoidFuture();
                }
                if (((Integer) tuple2.f0).intValue() <= 1) {
                    return this.recordState.asyncRemove(rowData);
                }
                tuple2.f0 = Integer.valueOf(((Integer) tuple2.f0).intValue() - 1);
                return this.recordState.asyncPut(rowData, tuple2);
            });
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.OuterJoinRecordAsyncStateView
        public StateFuture<List<OuterRecord>> findMatchedRecordsAndNumOfAssociations(Function<RowData, Boolean> function) {
            ArrayList arrayList = new ArrayList();
            return this.recordState.asyncEntries().thenCompose(stateIterator -> {
                return stateIterator.onNext(entry -> {
                    RowData rowData = (RowData) entry.getKey();
                    int intValue = ((Integer) ((Tuple2) entry.getValue()).f0).intValue();
                    int intValue2 = ((Integer) ((Tuple2) entry.getValue()).f1).intValue();
                    if (((Boolean) function.apply(rowData)).booleanValue()) {
                        for (int i = 0; i < intValue; i++) {
                            arrayList.add(new OuterRecord(rowData, intValue2));
                        }
                    }
                });
            }).thenApply(r3 -> {
                return arrayList;
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/OuterJoinRecordAsyncStateViews$InputSideHasUniqueKey.class */
    private static final class InputSideHasUniqueKey implements OuterJoinRecordAsyncStateView {
        private final MapState<RowData, Tuple2<RowData, Integer>> recordState;
        private final KeySelector<RowData, RowData> uniqueKeySelector;

        private InputSideHasUniqueKey(StreamingRuntimeContext streamingRuntimeContext, String str, InternalTypeInfo<RowData> internalTypeInfo, InternalTypeInfo<RowData> internalTypeInfo2, KeySelector<RowData, RowData> keySelector, StateTtlConfig stateTtlConfig) {
            Preconditions.checkNotNull(internalTypeInfo2);
            Preconditions.checkNotNull(keySelector);
            MapStateDescriptor mapStateDescriptor = new MapStateDescriptor(str, internalTypeInfo2, new TupleTypeInfo(new TypeInformation[]{internalTypeInfo, Types.INT}));
            if (stateTtlConfig.isEnabled()) {
                mapStateDescriptor.enableTimeToLive(stateTtlConfig);
            }
            this.recordState = streamingRuntimeContext.getMapState(mapStateDescriptor);
            this.uniqueKeySelector = keySelector;
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.OuterJoinRecordAsyncStateView
        public StateFuture<Void> addRecord(RowData rowData, int i) {
            return StateFutureUtils.completedVoidFuture().thenCompose(obj -> {
                return this.recordState.asyncPut((RowData) this.uniqueKeySelector.getKey(rowData), Tuple2.of(rowData, Integer.valueOf(i)));
            });
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.OuterJoinRecordAsyncStateView
        public StateFuture<Void> updateNumOfAssociations(RowData rowData, int i) {
            return StateFutureUtils.completedVoidFuture().thenCompose(obj -> {
                return this.recordState.asyncPut((RowData) this.uniqueKeySelector.getKey(rowData), Tuple2.of(rowData, Integer.valueOf(i)));
            });
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.JoinRecordAsyncStateView
        public StateFuture<Void> retractRecord(RowData rowData) {
            return StateFutureUtils.completedVoidFuture().thenCompose(obj -> {
                return this.recordState.asyncRemove((RowData) this.uniqueKeySelector.getKey(rowData));
            });
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.OuterJoinRecordAsyncStateView
        public StateFuture<List<OuterRecord>> findMatchedRecordsAndNumOfAssociations(Function<RowData, Boolean> function) {
            ArrayList arrayList = new ArrayList();
            return this.recordState.asyncValues().thenCompose(stateIterator -> {
                return stateIterator.onNext(tuple2 -> {
                    if (((Boolean) function.apply((RowData) tuple2.f0)).booleanValue()) {
                        arrayList.add(new OuterRecord((RowData) tuple2.f0, ((Integer) tuple2.f1).intValue()));
                    }
                });
            }).thenApply(r3 -> {
                return arrayList;
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/OuterJoinRecordAsyncStateViews$JoinKeyContainsUniqueKey.class */
    private static final class JoinKeyContainsUniqueKey implements OuterJoinRecordAsyncStateView {
        private final ValueState<Tuple2<RowData, Integer>> recordState;
        private final List<OuterRecord> reusedList;

        private JoinKeyContainsUniqueKey(StreamingRuntimeContext streamingRuntimeContext, String str, InternalTypeInfo<RowData> internalTypeInfo, StateTtlConfig stateTtlConfig) {
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor(str, new TupleTypeInfo(new TypeInformation[]{internalTypeInfo, Types.INT}));
            if (stateTtlConfig.isEnabled()) {
                valueStateDescriptor.enableTimeToLive(stateTtlConfig);
            }
            this.recordState = streamingRuntimeContext.getValueState(valueStateDescriptor);
            this.reusedList = new ArrayList(1);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.OuterJoinRecordAsyncStateView
        public StateFuture<Void> addRecord(RowData rowData, int i) {
            return this.recordState.asyncUpdate(Tuple2.of(rowData, Integer.valueOf(i)));
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.OuterJoinRecordAsyncStateView
        public StateFuture<Void> updateNumOfAssociations(RowData rowData, int i) {
            return this.recordState.asyncUpdate(Tuple2.of(rowData, Integer.valueOf(i)));
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.JoinRecordAsyncStateView
        public StateFuture<Void> retractRecord(RowData rowData) {
            return this.recordState.asyncClear();
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.OuterJoinRecordAsyncStateView
        public StateFuture<List<OuterRecord>> findMatchedRecordsAndNumOfAssociations(Function<RowData, Boolean> function) {
            return this.recordState.asyncValue().thenCompose(tuple2 -> {
                this.reusedList.clear();
                if (tuple2 != null && ((Boolean) function.apply((RowData) tuple2.f0)).booleanValue()) {
                    this.reusedList.add(new OuterRecord((RowData) tuple2.f0, ((Integer) tuple2.f1).intValue()));
                }
                return StateFutureUtils.completedFuture(this.reusedList);
            });
        }
    }

    public static OuterJoinRecordAsyncStateView create(StreamingRuntimeContext streamingRuntimeContext, String str, JoinInputSideSpec joinInputSideSpec, InternalTypeInfo<RowData> internalTypeInfo, long j) {
        StateTtlConfig createTtlConfig = StateConfigUtil.createTtlConfig(j);
        return joinInputSideSpec.hasUniqueKey() ? joinInputSideSpec.joinKeyContainsUniqueKey() ? new JoinKeyContainsUniqueKey(streamingRuntimeContext, str, internalTypeInfo, createTtlConfig) : new InputSideHasUniqueKey(streamingRuntimeContext, str, internalTypeInfo, joinInputSideSpec.getUniqueKeyType(), joinInputSideSpec.getUniqueKeySelector(), createTtlConfig) : new InputSideHasNoUniqueKey(streamingRuntimeContext, str, internalTypeInfo, createTtlConfig);
    }
}
