/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation.streaming;

import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.spark.streaming.Durations;
import org.joda.time.Duration;

public final class StreamingWindowPipelineDetector
extends SparkRunner.Evaluator {
    private static final org.apache.spark.streaming.Duration SPARK_MIN_WINDOW = Durations.milliseconds((long)500L);
    private boolean windowing;
    private org.apache.spark.streaming.Duration batchDuration;
    private static final TransformTranslator.FieldGetter WINDOW_FG = new TransformTranslator.FieldGetter(Window.Bound.class);

    public StreamingWindowPipelineDetector(SparkPipelineTranslator translator) {
        super(translator);
    }

    @Override
    protected <TransformT extends PTransform<? super PInput, POutput>> void doVisitTransform(TransformTreeNode node) {
        PTransform transform = node.getTransform();
        Class<Window.Bound> transformClass = transform.getClass();
        if (transformClass.isAssignableFrom(Window.Bound.class)) {
            WindowFn windowFn = (WindowFn)WINDOW_FG.get("windowFn", transform);
            if (windowFn instanceof FixedWindows) {
                this.setBatchDuration(((FixedWindows)windowFn).getSize());
            } else if (windowFn instanceof SlidingWindows) {
                if (((SlidingWindows)windowFn).getOffset().getMillis() > 0L) {
                    throw new UnsupportedOperationException("Spark does not support window offsets");
                }
                this.setBatchDuration(((SlidingWindows)windowFn).getSize());
            } else if (!(windowFn instanceof GlobalWindows)) {
                throw new IllegalStateException("Windowing function not supported: " + windowFn);
            }
        }
    }

    private void setBatchDuration(Duration duration) {
        Long durationMillis = duration.getMillis();
        if (durationMillis < SPARK_MIN_WINDOW.milliseconds()) {
            throw new IllegalArgumentException("Windowing of size " + durationMillis + "msec is not supported!");
        }
        if (!this.windowing || this.batchDuration.milliseconds() > durationMillis) {
            this.batchDuration = Durations.milliseconds((long)durationMillis);
        }
        this.windowing = true;
    }

    public boolean isWindowing() {
        return this.windowing;
    }

    public org.apache.spark.streaming.Duration getBatchDuration() {
        return this.batchDuration;
    }
}

