package org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.state.v2.MapStateDescriptor;
import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;
import org.apache.flink.table.runtime.operators.join.stream.utils.OuterRecord;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews.class */
public final class JoinRecordAsyncStateViews {

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews$InputSideHasNoUniqueKey.class */
    private static final class InputSideHasNoUniqueKey implements JoinRecordAsyncStateView {
        private final MapState<RowData, Integer> recordState;

        private InputSideHasNoUniqueKey(StreamingRuntimeContext streamingRuntimeContext, String str, InternalTypeInfo<RowData> internalTypeInfo, StateTtlConfig stateTtlConfig) {
            MapStateDescriptor mapStateDescriptor = new MapStateDescriptor(str, internalTypeInfo, Types.INT);
            if (stateTtlConfig.isEnabled()) {
                mapStateDescriptor.enableTimeToLive(stateTtlConfig);
            }
            this.recordState = streamingRuntimeContext.getMapState(mapStateDescriptor);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.JoinRecordAsyncStateView
        public StateFuture<Void> addRecord(RowData rowData) {
            return this.recordState.asyncGet(rowData).thenApply(num -> {
                if (num != null) {
                    return Integer.valueOf(num.intValue() + 1);
                }
                return 1;
            }).thenCompose(num2 -> {
                return this.recordState.asyncPut(rowData, num2);
            });
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.JoinRecordAsyncStateView
        public StateFuture<Void> retractRecord(RowData rowData) {
            return this.recordState.asyncGet(rowData).thenCompose(num -> {
                return num != null ? num.intValue() > 1 ? this.recordState.asyncPut(rowData, Integer.valueOf(num.intValue() - 1)) : this.recordState.asyncRemove(rowData) : StateFutureUtils.completedVoidFuture();
            });
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.JoinRecordAsyncStateView
        public StateFuture<List<OuterRecord>> findMatchedRecords(Function<RowData, Boolean> function) {
            ArrayList arrayList = new ArrayList();
            return this.recordState.asyncEntries().thenCompose(stateIterator -> {
                return stateIterator.onNext(entry -> {
                    RowData rowData = (RowData) entry.getKey();
                    int intValue = ((Integer) entry.getValue()).intValue();
                    if (((Boolean) function.apply(rowData)).booleanValue()) {
                        for (int i = 0; i < intValue; i++) {
                            arrayList.add(new OuterRecord(rowData));
                        }
                    }
                });
            }).thenApply(r3 -> {
                return arrayList;
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews$InputSideHasUniqueKey.class */
    private static final class InputSideHasUniqueKey implements JoinRecordAsyncStateView {
        private final MapState<RowData, RowData> recordState;
        private final KeySelector<RowData, RowData> uniqueKeySelector;

        private InputSideHasUniqueKey(StreamingRuntimeContext streamingRuntimeContext, String str, InternalTypeInfo<RowData> internalTypeInfo, InternalTypeInfo<RowData> internalTypeInfo2, KeySelector<RowData, RowData> keySelector, StateTtlConfig stateTtlConfig) {
            Preconditions.checkNotNull(internalTypeInfo2);
            Preconditions.checkNotNull(keySelector);
            MapStateDescriptor mapStateDescriptor = new MapStateDescriptor(str, internalTypeInfo2, internalTypeInfo);
            if (stateTtlConfig.isEnabled()) {
                mapStateDescriptor.enableTimeToLive(stateTtlConfig);
            }
            this.recordState = streamingRuntimeContext.getMapState(mapStateDescriptor);
            this.uniqueKeySelector = keySelector;
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.JoinRecordAsyncStateView
        public StateFuture<Void> addRecord(RowData rowData) {
            return StateFutureUtils.completedVoidFuture().thenCompose(obj -> {
                return this.recordState.asyncPut((RowData) this.uniqueKeySelector.getKey(rowData), rowData);
            });
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.JoinRecordAsyncStateView
        public StateFuture<Void> retractRecord(RowData rowData) {
            return StateFutureUtils.completedVoidFuture().thenCompose(obj -> {
                return this.recordState.asyncRemove((RowData) this.uniqueKeySelector.getKey(rowData));
            });
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.JoinRecordAsyncStateView
        public StateFuture<List<OuterRecord>> findMatchedRecords(Function<RowData, Boolean> function) {
            ArrayList arrayList = new ArrayList();
            return this.recordState.asyncValues().thenCompose(stateIterator -> {
                return stateIterator.onNext(rowData -> {
                    if (((Boolean) function.apply(rowData)).booleanValue()) {
                        arrayList.add(new OuterRecord(rowData));
                    }
                });
            }).thenApply(r3 -> {
                return arrayList;
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews$JoinKeyContainsUniqueKey.class */
    private static final class JoinKeyContainsUniqueKey implements JoinRecordAsyncStateView {
        private final ValueState<RowData> recordState;
        private final List<OuterRecord> reusedList;

        private JoinKeyContainsUniqueKey(StreamingRuntimeContext streamingRuntimeContext, String str, InternalTypeInfo<RowData> internalTypeInfo, StateTtlConfig stateTtlConfig) {
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor(str, internalTypeInfo);
            if (stateTtlConfig.isEnabled()) {
                valueStateDescriptor.enableTimeToLive(stateTtlConfig);
            }
            this.recordState = streamingRuntimeContext.getValueState(valueStateDescriptor);
            this.reusedList = new ArrayList(1);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.JoinRecordAsyncStateView
        public StateFuture<Void> addRecord(RowData rowData) {
            return this.recordState.asyncUpdate(rowData);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.JoinRecordAsyncStateView
        public StateFuture<Void> retractRecord(RowData rowData) {
            return this.recordState.asyncClear();
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.asyncprocessing.state.JoinRecordAsyncStateView
        public StateFuture<List<OuterRecord>> findMatchedRecords(Function<RowData, Boolean> function) {
            return this.recordState.asyncValue().thenApply(rowData -> {
                this.reusedList.clear();
                if (rowData != null && ((Boolean) function.apply(rowData)).booleanValue()) {
                    this.reusedList.add(new OuterRecord(rowData));
                }
                return this.reusedList;
            });
        }
    }

    public static JoinRecordAsyncStateView create(StreamingRuntimeContext streamingRuntimeContext, String str, JoinInputSideSpec joinInputSideSpec, InternalTypeInfo<RowData> internalTypeInfo, long j) {
        StateTtlConfig createTtlConfig = StateConfigUtil.createTtlConfig(j);
        return joinInputSideSpec.hasUniqueKey() ? joinInputSideSpec.joinKeyContainsUniqueKey() ? new JoinKeyContainsUniqueKey(streamingRuntimeContext, str, internalTypeInfo, createTtlConfig) : new InputSideHasUniqueKey(streamingRuntimeContext, str, internalTypeInfo, joinInputSideSpec.getUniqueKeyType(), joinInputSideSpec.getUniqueKeySelector(), createTtlConfig) : new InputSideHasNoUniqueKey(streamingRuntimeContext, str, internalTypeInfo, createTtlConfig);
    }
}
