/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.source;

import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks;
import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
import org.apache.flink.streaming.api.operators.source.WatermarkToDataOutput;
import org.apache.flink.streaming.api.operators.util.PausableRelativeClock;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.RelativeClock;

@Internal
public class ProgressiveTimestampsAndWatermarks<T>
implements TimestampsAndWatermarks<T> {
    private final TimestampAssigner<T> timestampAssigner;
    private final WatermarkGeneratorSupplier<T> watermarksFactory;
    private final TimestampsAndWatermarks.TimestampsAndWatermarksContextProvider watermarksContextProvider;
    private final ProcessingTimeService timeService;
    private final long periodicWatermarkInterval;
    private final RelativeClock mainInputActivityClock;
    private final Clock clock;
    private final TaskIOMetricGroup taskIOMetricGroup;
    @Nullable
    private SplitLocalOutputs<T> currentPerSplitOutputs;
    @Nullable
    private StreamingReaderOutput<T> currentMainOutput;
    @Nullable
    private ScheduledFuture<?> periodicEmitHandle;

    public ProgressiveTimestampsAndWatermarks(TimestampAssigner<T> timestampAssigner, WatermarkGeneratorSupplier<T> watermarksFactory, TimestampsAndWatermarks.TimestampsAndWatermarksContextProvider watermarksContextProvider, ProcessingTimeService timeService, Duration periodicWatermarkInterval, RelativeClock mainInputActivityClock, Clock clock, TaskIOMetricGroup taskIOMetricGroup) {
        long periodicWatermarkIntervalMillis;
        this.timestampAssigner = timestampAssigner;
        this.watermarksFactory = watermarksFactory;
        this.watermarksContextProvider = watermarksContextProvider;
        this.timeService = timeService;
        this.mainInputActivityClock = mainInputActivityClock;
        this.clock = clock;
        this.taskIOMetricGroup = taskIOMetricGroup;
        try {
            periodicWatermarkIntervalMillis = periodicWatermarkInterval.toMillis();
        }
        catch (ArithmeticException ignored) {
            periodicWatermarkIntervalMillis = Long.MAX_VALUE;
        }
        this.periodicWatermarkInterval = periodicWatermarkIntervalMillis;
    }

    @Override
    public ReaderOutput<T> createMainOutput(PushingAsyncDataInput.DataOutput<T> output, TimestampsAndWatermarks.WatermarkUpdateListener watermarkUpdateListener) {
        Preconditions.checkState((this.currentMainOutput == null && this.currentPerSplitOutputs == null ? 1 : 0) != 0, (Object)"already created a main output");
        WatermarkToDataOutput watermarkOutput = new WatermarkToDataOutput(output, watermarkUpdateListener);
        IdlenessManager idlenessManager = new IdlenessManager(watermarkOutput);
        WatermarkGenerator watermarkGenerator = this.watermarksFactory.createWatermarkGenerator((WatermarkGeneratorSupplier.Context)this.watermarksContextProvider.create(this.mainInputActivityClock));
        this.currentPerSplitOutputs = new SplitLocalOutputs<T>(output, idlenessManager.getSplitLocalOutput(), watermarkUpdateListener, this.timestampAssigner, this.watermarksFactory, this.watermarksContextProvider, this.clock, this.taskIOMetricGroup);
        this.currentMainOutput = new StreamingReaderOutput<T>(output, (WatermarkOutput)idlenessManager.getMainOutput(), this.timestampAssigner, watermarkGenerator, this.currentPerSplitOutputs);
        return this.currentMainOutput;
    }

    @Override
    public void startPeriodicWatermarkEmits() {
        Preconditions.checkState((this.periodicEmitHandle == null ? 1 : 0) != 0, (Object)"periodic emitter already started");
        if (this.periodicWatermarkInterval == 0L) {
            return;
        }
        this.periodicEmitHandle = this.timeService.scheduleWithFixedDelay(this::emitImmediateWatermark, this.periodicWatermarkInterval, this.periodicWatermarkInterval);
    }

    @Override
    public void stopPeriodicWatermarkEmits() {
        if (this.periodicEmitHandle != null) {
            this.periodicEmitHandle.cancel(false);
            this.periodicEmitHandle = null;
        }
    }

    @Override
    public void emitImmediateWatermark(long wallClockTimestamp) {
        if (this.currentPerSplitOutputs != null) {
            this.currentPerSplitOutputs.emitPeriodicWatermark();
        }
        if (this.currentMainOutput != null) {
            this.currentMainOutput.emitPeriodicWatermark();
        }
    }

    @Override
    public void pauseOrResumeSplits(Collection<String> splitsToPause, Collection<String> splitsToResume) {
        this.currentPerSplitOutputs.pauseOrResumeSplits(splitsToPause, splitsToResume);
    }

    private static class IdlenessManager {
        private final WatermarkOutput underlyingOutput;
        private final IdlenessAwareWatermarkOutput splitLocalOutput;
        private final IdlenessAwareWatermarkOutput mainOutput;

        IdlenessManager(WatermarkOutput underlyingOutput) {
            this.underlyingOutput = underlyingOutput;
            this.splitLocalOutput = new IdlenessAwareWatermarkOutput(underlyingOutput);
            this.mainOutput = new IdlenessAwareWatermarkOutput(underlyingOutput);
        }

        IdlenessAwareWatermarkOutput getSplitLocalOutput() {
            return this.splitLocalOutput;
        }

        IdlenessAwareWatermarkOutput getMainOutput() {
            return this.mainOutput;
        }

        void maybeMarkUnderlyingOutputAsIdle() {
            if (this.splitLocalOutput.isIdle && this.mainOutput.isIdle) {
                this.underlyingOutput.markIdle();
            }
        }

        private class IdlenessAwareWatermarkOutput
        implements WatermarkOutput {
            private final WatermarkOutput underlyingOutput;
            private boolean isIdle = true;

            private IdlenessAwareWatermarkOutput(WatermarkOutput underlyingOutput) {
                this.underlyingOutput = underlyingOutput;
            }

            public void emitWatermark(Watermark watermark) {
                this.underlyingOutput.emitWatermark(watermark);
                this.isIdle = false;
            }

            public void markIdle() {
                this.isIdle = true;
                IdlenessManager.this.maybeMarkUnderlyingOutputAsIdle();
            }

            public void markActive() {
                this.isIdle = false;
                this.underlyingOutput.markActive();
            }
        }
    }

    private static final class SplitLocalOutputs<T> {
        private final WatermarkOutputMultiplexer watermarkMultiplexer;
        private final Map<String, SourceOutputWithWatermarks<T>> localOutputs;
        private final Map<String, PausableRelativeClock> inputActivityClocks = new HashMap<String, PausableRelativeClock>();
        private final PushingAsyncDataInput.DataOutput<T> recordOutput;
        private final TimestampAssigner<T> timestampAssigner;
        private final WatermarkGeneratorSupplier<T> watermarksFactory;
        private final TimestampsAndWatermarks.TimestampsAndWatermarksContextProvider watermarksContextProvider;
        private final TimestampsAndWatermarks.WatermarkUpdateListener watermarkUpdateListener;
        private final Clock clock;
        private final TaskIOMetricGroup taskIOMetricGroup;

        private SplitLocalOutputs(PushingAsyncDataInput.DataOutput<T> recordOutput, WatermarkOutput watermarkOutput, TimestampsAndWatermarks.WatermarkUpdateListener watermarkUpdateListener, TimestampAssigner<T> timestampAssigner, WatermarkGeneratorSupplier<T> watermarksFactory, TimestampsAndWatermarks.TimestampsAndWatermarksContextProvider watermarksContextProvider, Clock clock, TaskIOMetricGroup taskIOMetricGroup) {
            this.recordOutput = recordOutput;
            this.timestampAssigner = timestampAssigner;
            this.watermarksFactory = watermarksFactory;
            this.watermarksContextProvider = watermarksContextProvider;
            this.watermarkUpdateListener = watermarkUpdateListener;
            this.clock = clock;
            this.taskIOMetricGroup = taskIOMetricGroup;
            this.watermarkMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput);
            this.localOutputs = new LinkedHashMap<String, SourceOutputWithWatermarks<T>>();
        }

        SourceOutput<T> createOutputForSplit(final String splitId) {
            SourceOutputWithWatermarks<T> previous = this.localOutputs.get(splitId);
            if (previous != null) {
                return previous;
            }
            PausableRelativeClock inputActivityClock = this.createInputActivityClock(splitId);
            this.watermarkMultiplexer.registerNewOutput(splitId, new WatermarkOutputMultiplexer.WatermarkUpdateListener(){

                public void onWatermarkUpdate(long watermark) {
                    watermarkUpdateListener.updateCurrentSplitWatermark(splitId, watermark);
                }

                public void onIdleUpdate(boolean idle) {
                    watermarkUpdateListener.updateCurrentSplitIdle(splitId, idle);
                }
            });
            WatermarkOutput onEventOutput = this.watermarkMultiplexer.getImmediateOutput(splitId);
            WatermarkOutput periodicOutput = this.watermarkMultiplexer.getDeferredOutput(splitId);
            WatermarkGenerator watermarks = this.watermarksFactory.createWatermarkGenerator((WatermarkGeneratorSupplier.Context)this.watermarksContextProvider.create(inputActivityClock));
            SourceOutputWithWatermarks<T> localOutput = SourceOutputWithWatermarks.createWithSeparateOutputs(this.recordOutput, onEventOutput, periodicOutput, this.timestampAssigner, watermarks);
            this.localOutputs.put(splitId, localOutput);
            return localOutput;
        }

        private PausableRelativeClock createInputActivityClock(String splitId) {
            PausableRelativeClock inputActivityClock = new PausableRelativeClock(this.clock);
            this.inputActivityClocks.put(splitId, inputActivityClock);
            this.taskIOMetricGroup.registerBackPressureListener(inputActivityClock);
            return inputActivityClock;
        }

        void releaseOutputForSplit(String splitId) {
            this.watermarkUpdateListener.splitFinished(splitId);
            this.localOutputs.remove(splitId);
            this.watermarkMultiplexer.unregisterOutput(splitId);
            PausableRelativeClock inputActivityClock = Objects.requireNonNull(this.inputActivityClocks.remove(splitId));
            this.taskIOMetricGroup.unregisterBackPressureListener(inputActivityClock);
        }

        void emitPeriodicWatermark() {
            for (SourceOutputWithWatermarks<T> output : this.localOutputs.values()) {
                output.emitPeriodicWatermark();
            }
            this.watermarkMultiplexer.onPeriodicEmit();
        }

        public void pauseOrResumeSplits(Collection<String> splitsToPause, Collection<String> splitsToResume) {
            for (String splitId : splitsToPause) {
                this.inputActivityClocks.get(splitId).pause();
            }
            for (String splitId : splitsToResume) {
                this.inputActivityClocks.get(splitId).unPause();
            }
        }
    }

    private static final class StreamingReaderOutput<T>
    extends SourceOutputWithWatermarks<T>
    implements ReaderOutput<T> {
        private final SplitLocalOutputs<T> splitLocalOutputs;

        StreamingReaderOutput(PushingAsyncDataInput.DataOutput<T> output, WatermarkOutput watermarkOutput, TimestampAssigner<T> timestampAssigner, WatermarkGenerator<T> watermarkGenerator, SplitLocalOutputs<T> splitLocalOutputs) {
            super(output, watermarkOutput, watermarkOutput, timestampAssigner, watermarkGenerator);
            this.splitLocalOutputs = splitLocalOutputs;
        }

        public SourceOutput<T> createOutputForSplit(String splitId) {
            return this.splitLocalOutputs.createOutputForSplit(splitId);
        }

        public void releaseOutputForSplit(String splitId) {
            this.splitLocalOutputs.releaseOutputForSplit(splitId);
        }
    }
}

