package org.apache.flink.table.runtime.operators.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.ProjectedRowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.class */
public class SinkUpsertMaterializer extends TableStreamOperator<RowData> implements OneInputStreamOperator<RowData, RowData> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(SinkUpsertMaterializer.class);
    private static final String STATE_CLEARED_WARN_MSG = "The state is cleared because of state ttl. This will result in incorrect result. You can increase the state ttl to avoid this.";
    private final StateTtlConfig ttlConfig;
    private final GeneratedRecordEqualiser generatedRecordEqualiser;
    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
    private final TypeSerializer<RowData> serializer;
    private final int[] inputUpsertKey;
    private final boolean hasUpsertKey;
    private transient RecordEqualiser equaliser;
    private transient ValueState<List<RowData>> state;
    private transient TimestampedCollector<RowData> collector;
    private transient ProjectedRowData upsertKeyProjectedRow1;
    private transient ProjectedRowData upsertKeyProjectedRow2;

    /* renamed from: org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$types$RowKind = new int[RowKind.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_AFTER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_BEFORE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.DELETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public SinkUpsertMaterializer(StateTtlConfig stateTtlConfig, TypeSerializer<RowData> typeSerializer, GeneratedRecordEqualiser generatedRecordEqualiser, @Nullable GeneratedRecordEqualiser generatedRecordEqualiser2, @Nullable int[] iArr) {
        this.ttlConfig = stateTtlConfig;
        this.serializer = typeSerializer;
        this.generatedRecordEqualiser = generatedRecordEqualiser;
        this.generatedUpsertKeyEqualiser = generatedRecordEqualiser2;
        this.inputUpsertKey = iArr;
        this.hasUpsertKey = null != iArr && iArr.length > 0;
        if (this.hasUpsertKey) {
            Preconditions.checkNotNull(generatedRecordEqualiser2, "GeneratedUpsertKeyEqualiser cannot be null when inputUpsertKey is not empty!");
        }
    }

    @Override // org.apache.flink.table.runtime.operators.TableStreamOperator
    public void open() throws Exception {
        super.open();
        if (this.hasUpsertKey) {
            this.equaliser = this.generatedUpsertKeyEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
            this.upsertKeyProjectedRow1 = ProjectedRowData.from(this.inputUpsertKey);
            this.upsertKeyProjectedRow2 = ProjectedRowData.from(this.inputUpsertKey);
        } else {
            this.equaliser = this.generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
        }
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("values", new ListSerializer(this.serializer));
        if (this.ttlConfig.isEnabled()) {
            valueStateDescriptor.enableTimeToLive(this.ttlConfig);
        }
        this.state = getRuntimeContext().getState(valueStateDescriptor);
        this.collector = new TimestampedCollector<>(this.output);
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        List<RowData> list = (List) this.state.value();
        if (list == null) {
            list = new ArrayList(2);
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$types$RowKind[rowData.getRowKind().ordinal()]) {
            case 1:
            case 2:
                addRow(list, rowData);
                return;
            case 3:
            case 4:
                retractRow(list, rowData);
                return;
            default:
                return;
        }
    }

    private void addRow(List<RowData> list, RowData rowData) throws IOException {
        RowKind rowKind = list.isEmpty() ? RowKind.INSERT : RowKind.UPDATE_AFTER;
        if (this.hasUpsertKey) {
            int findFirst = findFirst(list, rowData);
            if (findFirst == -1) {
                list.add(rowData);
            } else {
                list.set(findFirst, rowData);
            }
        } else {
            list.add(rowData);
        }
        rowData.setRowKind(rowKind);
        this.collector.collect(rowData);
        this.state.update(list);
    }

    private void retractRow(List<RowData> list, RowData rowData) throws IOException {
        int size = list.size() - 1;
        int findFirst = findFirst(list, rowData);
        if (findFirst == -1) {
            LOG.info(STATE_CLEARED_WARN_MSG);
            return;
        }
        list.remove(findFirst);
        if (list.isEmpty()) {
            rowData.setRowKind(RowKind.DELETE);
            this.collector.collect(rowData);
        } else if (findFirst == size) {
            RowData rowData2 = list.get(list.size() - 1);
            rowData2.setRowKind(RowKind.UPDATE_AFTER);
            this.collector.collect(rowData2);
        }
        if (list.isEmpty()) {
            this.state.clear();
        } else {
            this.state.update(list);
        }
    }

    private int findFirst(List<RowData> list, RowData rowData) {
        Iterator<RowData> it = list.iterator();
        int i = 0;
        while (it.hasNext()) {
            if (equalsIgnoreRowKind(rowData, it.next())) {
                return i;
            }
            i++;
        }
        return -1;
    }

    private boolean equalsIgnoreRowKind(RowData rowData, RowData rowData2) {
        rowData.setRowKind(rowData2.getRowKind());
        return this.hasUpsertKey ? this.equaliser.equals(this.upsertKeyProjectedRow1.replaceRow(rowData), this.upsertKeyProjectedRow2.replaceRow(rowData2)) : this.equaliser.equals(rowData, rowData2);
    }
}
