/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ScheduledFuture;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

@Deprecated
public class StreamSourceContexts {
    public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(ProcessingTimeService processingTimeService, Object checkpointLock, Output<StreamRecord<OUT>> output, long watermarkInterval, long idleTimeout, boolean emitProgressiveWatermarks) {
        ManualWatermarkContext ctx = new ManualWatermarkContext(output, processingTimeService, checkpointLock, idleTimeout, emitProgressiveWatermarks);
        return new SwitchingOnClose(ctx);
    }

    private static abstract class WatermarkContext<T>
    implements SourceFunction.SourceContext<T> {
        protected final ProcessingTimeService timeService;
        protected final Object checkpointLock;
        protected final long idleTimeout;
        private ScheduledFuture<?> nextCheck;
        private volatile boolean failOnNextCheck;

        public WatermarkContext(ProcessingTimeService timeService, Object checkpointLock, long idleTimeout) {
            this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
            this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "Checkpoint Lock cannot be null.");
            if (idleTimeout != -1L) {
                Preconditions.checkArgument(idleTimeout >= 1L, "The idle timeout cannot be smaller than 1 ms.");
            }
            this.idleTimeout = idleTimeout;
            this.scheduleNextIdleDetectionTask();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public final void collect(T element) {
            Object object = this.checkpointLock;
            synchronized (object) {
                this.processAndEmitWatermarkStatus(WatermarkStatus.ACTIVE);
                if (this.nextCheck != null) {
                    this.failOnNextCheck = false;
                } else {
                    this.scheduleNextIdleDetectionTask();
                }
                this.processAndCollect(element);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public final void collectWithTimestamp(T element, long timestamp) {
            Object object = this.checkpointLock;
            synchronized (object) {
                this.processAndEmitWatermarkStatus(WatermarkStatus.ACTIVE);
                if (this.nextCheck != null) {
                    this.failOnNextCheck = false;
                } else {
                    this.scheduleNextIdleDetectionTask();
                }
                this.processAndCollectWithTimestamp(element, timestamp);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public final void emitWatermark(Watermark mark) {
            if (this.allowWatermark(mark)) {
                Object object = this.checkpointLock;
                synchronized (object) {
                    this.processAndEmitWatermarkStatus(WatermarkStatus.ACTIVE);
                    if (this.nextCheck != null) {
                        this.failOnNextCheck = false;
                    } else {
                        this.scheduleNextIdleDetectionTask();
                    }
                    this.processAndEmitWatermark(mark);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public final void markAsTemporarilyIdle() {
            Object object = this.checkpointLock;
            synchronized (object) {
                this.processAndEmitWatermarkStatus(WatermarkStatus.IDLE);
            }
        }

        @Override
        public Object getCheckpointLock() {
            return this.checkpointLock;
        }

        @Override
        public void close() {
            this.cancelNextIdleDetectionTask();
        }

        private void scheduleNextIdleDetectionTask() {
            if (this.idleTimeout != -1L) {
                this.failOnNextCheck = true;
                this.nextCheck = this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + this.idleTimeout, new IdlenessDetectionTask());
            }
        }

        protected void cancelNextIdleDetectionTask() {
            ScheduledFuture<?> nextCheck = this.nextCheck;
            if (nextCheck != null) {
                nextCheck.cancel(true);
            }
        }

        protected abstract void processAndCollect(T var1);

        protected abstract void processAndCollectWithTimestamp(T var1, long var2);

        protected abstract boolean allowWatermark(Watermark var1);

        protected abstract void processAndEmitWatermark(Watermark var1);

        protected abstract void processAndEmitWatermarkStatus(WatermarkStatus var1);

        private class IdlenessDetectionTask
        implements ProcessingTimeService.ProcessingTimeCallback {
            private IdlenessDetectionTask() {
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onProcessingTime(long timestamp) throws Exception {
                Object object = WatermarkContext.this.checkpointLock;
                synchronized (object) {
                    WatermarkContext.this.nextCheck = null;
                    if (WatermarkContext.this.failOnNextCheck) {
                        WatermarkContext.this.markAsTemporarilyIdle();
                    } else {
                        WatermarkContext.this.scheduleNextIdleDetectionTask();
                    }
                }
            }
        }
    }

    private static class ManualWatermarkContext<T>
    extends WatermarkContext<T> {
        private final boolean emitProgressiveWatermarks;
        private final Output<StreamRecord<T>> output;
        private final StreamRecord<T> reuse;
        private boolean idle = false;

        private ManualWatermarkContext(Output<StreamRecord<T>> output, ProcessingTimeService timeService, Object checkpointLock, long idleTimeout, boolean emitProgressiveWatermarks) {
            super(timeService, checkpointLock, idleTimeout);
            this.emitProgressiveWatermarks = emitProgressiveWatermarks;
            this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
            this.reuse = new StreamRecord<Object>(null);
        }

        @Override
        protected void processAndCollect(T element) {
            this.output.collect(this.reuse.replace(element));
        }

        @Override
        protected void processAndCollectWithTimestamp(T element, long timestamp) {
            this.output.collect(this.reuse.replace(element, timestamp));
        }

        @Override
        protected void processAndEmitWatermark(Watermark mark) {
            this.output.emitWatermark(mark);
        }

        @Override
        protected void processAndEmitWatermarkStatus(WatermarkStatus watermarkStatus) {
            if (this.idle != watermarkStatus.isIdle()) {
                this.output.emitWatermarkStatus(watermarkStatus);
            }
            this.idle = watermarkStatus.isIdle();
        }

        @Override
        protected boolean allowWatermark(Watermark mark) {
            return this.emitProgressiveWatermarks || mark.getTimestamp() == Long.MAX_VALUE;
        }
    }

    private static class AutomaticWatermarkContext<T>
    extends WatermarkContext<T> {
        private final Output<StreamRecord<T>> output;
        private final StreamRecord<T> reuse;
        private final long watermarkInterval;
        private volatile ScheduledFuture<?> nextWatermarkTimer;
        private volatile long nextWatermarkTime;
        private long lastRecordTime;
        private boolean idle = false;

        private AutomaticWatermarkContext(Output<StreamRecord<T>> output, long watermarkInterval, ProcessingTimeService timeService, Object checkpointLock, long idleTimeout) {
            super(timeService, checkpointLock, idleTimeout);
            this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
            Preconditions.checkArgument(watermarkInterval >= 1L, "The watermark interval cannot be smaller than 1 ms.");
            this.watermarkInterval = watermarkInterval;
            this.reuse = new StreamRecord<Object>(null);
            this.lastRecordTime = Long.MIN_VALUE;
            long now = this.timeService.getCurrentProcessingTime();
            this.nextWatermarkTimer = this.timeService.registerTimer(now + watermarkInterval, new WatermarkEmittingTask(this.timeService, checkpointLock, output));
        }

        @Override
        protected void processAndCollect(T element) {
            this.lastRecordTime = this.timeService.getCurrentProcessingTime();
            this.output.collect(this.reuse.replace(element, this.lastRecordTime));
            if (this.lastRecordTime > this.nextWatermarkTime) {
                long watermarkTime = this.lastRecordTime - this.lastRecordTime % this.watermarkInterval;
                this.nextWatermarkTime = watermarkTime + this.watermarkInterval;
                this.output.emitWatermark(new Watermark(watermarkTime));
            }
        }

        @Override
        protected void processAndCollectWithTimestamp(T element, long timestamp) {
            this.processAndCollect(element);
        }

        @Override
        protected boolean allowWatermark(Watermark mark) {
            return mark.getTimestamp() == Long.MAX_VALUE && this.nextWatermarkTime != Long.MAX_VALUE;
        }

        @Override
        protected void processAndEmitWatermark(Watermark mark) {
            this.nextWatermarkTime = Long.MAX_VALUE;
            this.output.emitWatermark(mark);
            ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer;
            if (nextWatermarkTimer != null) {
                nextWatermarkTimer.cancel(true);
            }
        }

        @Override
        protected void processAndEmitWatermarkStatus(WatermarkStatus watermarkStatus) {
            if (this.idle != watermarkStatus.isIdle()) {
                this.output.emitWatermarkStatus(watermarkStatus);
            }
            this.idle = watermarkStatus.isIdle();
        }

        @Override
        public void close() {
            super.close();
            ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer;
            if (nextWatermarkTimer != null) {
                nextWatermarkTimer.cancel(true);
            }
        }

        private class WatermarkEmittingTask
        implements ProcessingTimeService.ProcessingTimeCallback {
            private final ProcessingTimeService timeService;
            private final Object lock;
            private final Output<StreamRecord<T>> output;

            private WatermarkEmittingTask(ProcessingTimeService timeService, Object checkpointLock, Output<StreamRecord<T>> output) {
                this.timeService = timeService;
                this.lock = checkpointLock;
                this.output = output;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onProcessingTime(long timestamp) {
                long currentTime = this.timeService.getCurrentProcessingTime();
                Object object = this.lock;
                synchronized (object) {
                    if (!AutomaticWatermarkContext.this.idle) {
                        if (AutomaticWatermarkContext.this.idleTimeout != -1L && currentTime - AutomaticWatermarkContext.this.lastRecordTime > AutomaticWatermarkContext.this.idleTimeout) {
                            AutomaticWatermarkContext.this.markAsTemporarilyIdle();
                            AutomaticWatermarkContext.this.cancelNextIdleDetectionTask();
                        } else if (currentTime > AutomaticWatermarkContext.this.nextWatermarkTime) {
                            long watermarkTime = currentTime - currentTime % AutomaticWatermarkContext.this.watermarkInterval;
                            this.output.emitWatermark(new Watermark(watermarkTime));
                            AutomaticWatermarkContext.this.nextWatermarkTime = watermarkTime + AutomaticWatermarkContext.this.watermarkInterval;
                        }
                    }
                }
                long nextWatermark = currentTime + AutomaticWatermarkContext.this.watermarkInterval;
                AutomaticWatermarkContext.this.nextWatermarkTimer = this.timeService.registerTimer(nextWatermark, new WatermarkEmittingTask(this.timeService, this.lock, this.output));
            }
        }
    }

    private static class NonTimestampContext<T>
    implements SourceFunction.SourceContext<T> {
        private final Object lock;
        private final Output<StreamRecord<T>> output;
        private final StreamRecord<T> reuse;

        private NonTimestampContext(Object checkpointLock, Output<StreamRecord<T>> output) {
            this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null.");
            this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
            this.reuse = new StreamRecord<Object>(null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void collect(T element) {
            Object object = this.lock;
            synchronized (object) {
                this.output.collect(this.reuse.replace(element));
            }
        }

        @Override
        public void collectWithTimestamp(T element, long timestamp) {
            this.collect(element);
        }

        @Override
        public void emitWatermark(Watermark mark) {
        }

        @Override
        public void markAsTemporarilyIdle() {
        }

        @Override
        public Object getCheckpointLock() {
            return this.lock;
        }

        @Override
        public void close() {
        }
    }

    private static class ClosedContext<T>
    implements SourceFunction.SourceContext<T> {
        private final Object checkpointLock;

        private ClosedContext(Object checkpointLock) {
            this.checkpointLock = checkpointLock;
        }

        @Override
        public void collect(T element) {
            this.throwException();
        }

        @Override
        public void collectWithTimestamp(T element, long timestamp) {
            this.throwException();
        }

        @Override
        public void emitWatermark(Watermark mark) {
            this.throwException();
        }

        @Override
        public void markAsTemporarilyIdle() {
            this.throwException();
        }

        @Override
        public Object getCheckpointLock() {
            return this.checkpointLock;
        }

        @Override
        public void close() {
        }

        private void throwException() {
            throw new FlinkRuntimeException("The Source Context has been closed already.");
        }
    }

    private static class SwitchingOnClose<T>
    implements SourceFunction.SourceContext<T> {
        private SourceFunction.SourceContext<T> nestedContext;

        private SwitchingOnClose(SourceFunction.SourceContext<T> nestedContext) {
            this.nestedContext = nestedContext;
        }

        @Override
        public void collect(T element) {
            this.nestedContext.collect(element);
        }

        @Override
        public void collectWithTimestamp(T element, long timestamp) {
            this.nestedContext.collectWithTimestamp(element, timestamp);
        }

        @Override
        public void emitWatermark(Watermark mark) {
            this.nestedContext.emitWatermark(mark);
        }

        @Override
        public void markAsTemporarilyIdle() {
            this.nestedContext.markAsTemporarilyIdle();
        }

        @Override
        public Object getCheckpointLock() {
            return this.nestedContext.getCheckpointLock();
        }

        @Override
        public void close() {
            this.nestedContext.close();
            this.nestedContext = new ClosedContext(this.nestedContext.getCheckpointLock());
        }
    }
}

