/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.datastream.impl.common;

import java.util.function.Supplier;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.impl.common.TimestampCollector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

public class KeyCheckedOutputCollector<KEY, OUT>
extends TimestampCollector<OUT> {
    private final TimestampCollector<OUT> collector;
    private final KeySelector<OUT, KEY> outKeySelector;
    private final Supplier<KEY> currentKeyGetter;

    public KeyCheckedOutputCollector(TimestampCollector<OUT> collector, KeySelector<OUT, KEY> outKeySelector, Supplier<KEY> currentKeyGetter) {
        this.collector = collector;
        this.outKeySelector = outKeySelector;
        this.currentKeyGetter = currentKeyGetter;
    }

    public void collect(OUT outputRecord) {
        this.checkOutputKey(outputRecord);
        this.collector.collect(outputRecord);
    }

    public void collectAndOverwriteTimestamp(OUT outputRecord, long timestamp) {
        this.checkOutputKey(outputRecord);
        this.collector.collectAndOverwriteTimestamp(outputRecord, timestamp);
    }

    private void checkOutputKey(OUT outputRecord) {
        try {
            KEY currentKey = this.currentKeyGetter.get();
            Object outputKey = ((KeySelector)Preconditions.checkNotNull(this.outKeySelector)).getKey(outputRecord);
            if (!outputKey.equals(currentKey)) {
                throw new IllegalStateException("Output key must equals to input key if the output key selector is not null.");
            }
        }
        catch (Exception e) {
            ExceptionUtils.rethrow((Throwable)e);
        }
    }
}

