/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util.watermark;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.watermark.WatermarkDeclaration;
import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
import org.apache.flink.datastream.api.function.ProcessFunction;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.streaming.runtime.watermark.WatermarkCombiner;
import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkCombiner;

public final class WatermarkUtils {
    public static Set<AbstractInternalWatermarkDeclaration<?>> getInternalWatermarkDeclarationsFromStreamGraph(StreamGraph streamGraph) {
        Collection<StreamNode> streamNodes = streamGraph.getStreamNodes();
        Set<WatermarkDeclaration> declarations = streamNodes.stream().map(StreamNode::getOperatorFactory).filter(factory -> factory instanceof SimpleOperatorFactory || factory instanceof SourceOperatorFactory).map(factory -> {
            if (factory instanceof SimpleOperatorFactory) {
                return WatermarkUtils.getWatermarkDeclarations(((SimpleOperatorFactory)factory).getOperator());
            }
            return ((SourceOperatorFactory)factory).getSourceWatermarkDeclarations();
        }).flatMap(Collection::stream).collect(Collectors.toSet());
        return WatermarkUtils.convertToInternalWatermarkDeclarations(declarations);
    }

    private static Collection<? extends WatermarkDeclaration> getWatermarkDeclarations(StreamOperator<?> streamOperator) {
        Object f;
        if (streamOperator instanceof AbstractAsyncStateUdfStreamOperator && (f = ((AbstractAsyncStateUdfStreamOperator)streamOperator).getUserFunction()) instanceof ProcessFunction) {
            return ((ProcessFunction)f).declareWatermarks();
        }
        return Collections.emptySet();
    }

    public static Set<AbstractInternalWatermarkDeclaration<?>> convertToInternalWatermarkDeclarations(Set<WatermarkDeclaration> watermarkDeclarations) {
        return watermarkDeclarations.stream().map(AbstractInternalWatermarkDeclaration::from).collect(Collectors.toSet());
    }

    public static void addEventTimeWatermarkCombinerIfNeeded(Set<AbstractInternalWatermarkDeclaration<?>> watermarkDeclarationSet, Map<String, WatermarkCombiner> watermarkCombiners, int numberOfInputChannels) {
        if (watermarkDeclarationSet.stream().anyMatch(declaration -> EventTimeExtension.isEventTimeWatermark((String)declaration.getIdentifier()))) {
            EventTimeWatermarkCombiner eventTimeWatermarkCombiner = new EventTimeWatermarkCombiner(numberOfInputChannels);
            watermarkCombiners.put(EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.getIdentifier(), eventTimeWatermarkCombiner);
            watermarkCombiners.put(EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.getIdentifier(), eventTimeWatermarkCombiner);
        }
    }
}

