/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.generated;

import java.util.ArrayList;
import java.util.Arrays;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
import org.apache.flink.table.runtime.generated.WatermarkGenerator;
import org.apache.flink.table.watermark.WatermarkEmitStrategy;
import org.apache.flink.table.watermark.WatermarkParams;

@Internal
public class GeneratedWatermarkGeneratorSupplier
implements WatermarkGeneratorSupplier<RowData> {
    private static final long serialVersionUID = 1L;
    private final GeneratedWatermarkGenerator generatedWatermarkGenerator;
    private final WatermarkParams watermarkParams;

    public GeneratedWatermarkGeneratorSupplier(GeneratedWatermarkGenerator generatedWatermarkGenerator, @Nullable WatermarkParams watermarkParams) {
        this.generatedWatermarkGenerator = generatedWatermarkGenerator;
        this.watermarkParams = watermarkParams;
    }

    public org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        ArrayList<Object> references = new ArrayList<Object>(Arrays.asList(this.generatedWatermarkGenerator.getReferences()));
        references.add(context);
        WatermarkGenerator innerWatermarkGenerator = (WatermarkGenerator)((Object)new GeneratedWatermarkGenerator(this.generatedWatermarkGenerator.getClassName(), this.generatedWatermarkGenerator.getCode(), references.toArray()).newInstance(Thread.currentThread().getContextClassLoader()));
        try {
            innerWatermarkGenerator.open(DefaultOpenContext.INSTANCE);
        }
        catch (Exception e) {
            throw new RuntimeException("Fail to instantiate generated watermark generator.", e);
        }
        WatermarkEmitStrategy watermarkEmitStrategy = this.watermarkParams == null ? WatermarkEmitStrategy.ON_PERIODIC : this.watermarkParams.getEmitStrategy();
        return new DefaultWatermarkGenerator(innerWatermarkGenerator, watermarkEmitStrategy);
    }

    public static class DefaultWatermarkGenerator
    implements org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> {
        private static final long serialVersionUID = 1L;
        private final WatermarkGenerator innerWatermarkGenerator;
        private final WatermarkEmitStrategy watermarkEmitStrategy;
        private Long currentWatermark = Long.MIN_VALUE;

        public DefaultWatermarkGenerator(WatermarkGenerator watermarkGenerator, WatermarkEmitStrategy watermarkEmitStrategy) {
            this.innerWatermarkGenerator = watermarkGenerator;
            this.watermarkEmitStrategy = watermarkEmitStrategy;
        }

        public void onEvent(RowData event, long eventTimestamp, WatermarkOutput output) {
            try {
                Long watermark = this.innerWatermarkGenerator.currentWatermark(event);
                if (watermark != null) {
                    this.currentWatermark = watermark;
                    if (this.watermarkEmitStrategy.isOnEvent()) {
                        output.emitWatermark(new Watermark(this.currentWatermark.longValue()));
                    }
                }
            }
            catch (Exception e) {
                throw new RuntimeException(String.format("Generated WatermarkGenerator fails to generate for row: %s.", event), e);
            }
        }

        public void onPeriodicEmit(WatermarkOutput output) {
            if (this.watermarkEmitStrategy.isOnPeriodic()) {
                output.emitWatermark(new Watermark(this.currentWatermark.longValue()));
            }
        }
    }
}

