/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.util;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.beam.runners.spark.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.spark.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.spark.repackaged.com.google.common.cache.CacheBuilder;
import org.apache.beam.runners.spark.repackaged.com.google.common.cache.CacheLoader;
import org.apache.beam.runners.spark.repackaged.com.google.common.cache.LoadingCache;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Maps;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.spark.SparkEnv;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BlockResult;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import scala.Option;

public class GlobalWatermarkHolder {
    private static final Map<Integer, Queue<SparkWatermarks>> sourceTimes = new HashMap<Integer, Queue<SparkWatermarks>>();
    private static final BlockId WATERMARKS_BLOCK_ID = BlockId.apply((String)"broadcast_0WATERMARKS");
    private static volatile Map<Integer, SparkWatermarks> driverWatermarks = null;
    private static volatile LoadingCache<String, Map<Integer, SparkWatermarks>> watermarkCache = null;

    public static void add(int sourceId, SparkWatermarks sparkWatermarks) {
        Queue<SparkWatermarks> timesQueue = sourceTimes.get(sourceId);
        if (timesQueue == null) {
            timesQueue = new ConcurrentLinkedQueue<SparkWatermarks>();
        }
        timesQueue.offer(sparkWatermarks);
        sourceTimes.put(sourceId, timesQueue);
    }

    @VisibleForTesting
    public static void addAll(Map<Integer, Queue<SparkWatermarks>> sourceTimes) {
        for (Map.Entry<Integer, Queue<SparkWatermarks>> en : sourceTimes.entrySet()) {
            int sourceId = en.getKey();
            Queue<SparkWatermarks> timesQueue = en.getValue();
            while (!timesQueue.isEmpty()) {
                GlobalWatermarkHolder.add(sourceId, timesQueue.poll());
            }
        }
    }

    public static Map<Integer, SparkWatermarks> get(Long cacheInterval) {
        if (driverWatermarks != null) {
            return driverWatermarks;
        }
        if (watermarkCache == null) {
            GlobalWatermarkHolder.initWatermarkCache(cacheInterval);
        }
        try {
            return watermarkCache.get("SINGLETON");
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private static synchronized void initWatermarkCache(Long batchDuration) {
        if (watermarkCache == null) {
            watermarkCache = CacheBuilder.newBuilder().expireAfterWrite(batchDuration / 2L, TimeUnit.MILLISECONDS).build(new WatermarksLoader());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void advance() {
        Class<GlobalWatermarkHolder> clazz = GlobalWatermarkHolder.class;
        synchronized (GlobalWatermarkHolder.class) {
            BlockManager blockManager = SparkEnv.get().blockManager();
            if (sourceTimes.isEmpty()) {
                // ** MonitorExit[var0] (shouldn't be in output)
                return;
            }
            HashMap<Integer, SparkWatermarks> newValues = new HashMap<Integer, SparkWatermarks>();
            for (Map.Entry<Integer, Queue<SparkWatermarks>> en : sourceTimes.entrySet()) {
                SparkWatermarks next;
                Map current;
                if (en.getValue().isEmpty()) continue;
                Integer sourceId = en.getKey();
                Queue<SparkWatermarks> timesQueue = en.getValue();
                Instant currentLowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
                Instant currentHighWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
                Instant currentSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
                Option currentOption = blockManager.getRemote(WATERMARKS_BLOCK_ID);
                if (currentOption.isDefined()) {
                    current = (Map)((BlockResult)currentOption.get()).data().next();
                } else {
                    current = Maps.newHashMap();
                    blockManager.putSingle(WATERMARKS_BLOCK_ID, current, StorageLevel.MEMORY_ONLY(), true);
                }
                if (current.containsKey(sourceId)) {
                    SparkWatermarks currentTimes = (SparkWatermarks)current.get(sourceId);
                    currentLowWatermark = currentTimes.getLowWatermark();
                    currentHighWatermark = currentTimes.getHighWatermark();
                    currentSynchronizedProcessingTime = currentTimes.getSynchronizedProcessingTime();
                }
                Instant nextLowWatermark = (next = timesQueue.poll()).getLowWatermark().isAfter((ReadableInstant)currentLowWatermark) ? next.getLowWatermark() : currentLowWatermark;
                Instant nextHighWatermark = next.getHighWatermark().isAfter((ReadableInstant)currentHighWatermark) ? next.getHighWatermark() : currentHighWatermark;
                Instant nextSynchronizedProcessingTime = next.getSynchronizedProcessingTime();
                Preconditions.checkState(!nextLowWatermark.isAfter((ReadableInstant)nextHighWatermark), String.format("Low watermark %s cannot be later then high watermark %s", nextLowWatermark, nextHighWatermark));
                Preconditions.checkState(nextSynchronizedProcessingTime.isAfter((ReadableInstant)currentSynchronizedProcessingTime), "Synchronized processing time must advance.");
                newValues.put(sourceId, new SparkWatermarks(nextLowWatermark, nextHighWatermark, nextSynchronizedProcessingTime));
            }
            if (!newValues.isEmpty()) {
                driverWatermarks = newValues;
                blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
                blockManager.putSingle(WATERMARKS_BLOCK_ID, newValues, StorageLevel.MEMORY_ONLY(), true);
            }
            // ** MonitorExit[var0] (shouldn't be in output)
            return;
        }
    }

    @VisibleForTesting
    public static synchronized void clear() {
        sourceTimes.clear();
        driverWatermarks = null;
        SparkEnv sparkEnv = SparkEnv.get();
        if (sparkEnv != null) {
            BlockManager blockManager = sparkEnv.blockManager();
            blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
        }
    }

    private static class WatermarksLoader
    extends CacheLoader<String, Map<Integer, SparkWatermarks>> {
        private WatermarksLoader() {
        }

        @Override
        public Map<Integer, SparkWatermarks> load(@Nonnull String key) throws Exception {
            Option blockResultOption = SparkEnv.get().blockManager().getRemote(WATERMARKS_BLOCK_ID);
            if (blockResultOption.isDefined()) {
                return (Map)((BlockResult)blockResultOption.get()).data().next();
            }
            return Maps.newHashMap();
        }
    }

    public static class WatermarksListener
    extends JavaStreamingListener {
        public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
            GlobalWatermarkHolder.advance();
        }
    }

    public static class SparkWatermarks
    implements Serializable {
        private final Instant lowWatermark;
        private final Instant highWatermark;
        private final Instant synchronizedProcessingTime;

        @VisibleForTesting
        public SparkWatermarks(Instant lowWatermark, Instant highWatermark, Instant synchronizedProcessingTime) {
            this.lowWatermark = lowWatermark;
            this.highWatermark = highWatermark;
            this.synchronizedProcessingTime = synchronizedProcessingTime;
        }

        public Instant getLowWatermark() {
            return this.lowWatermark;
        }

        public Instant getHighWatermark() {
            return this.highWatermark;
        }

        public Instant getSynchronizedProcessingTime() {
            return this.synchronizedProcessingTime;
        }

        public String toString() {
            return "SparkWatermarks{lowWatermark=" + this.lowWatermark + ", highWatermark=" + this.highWatermark + ", synchronizedProcessingTime=" + this.synchronizedProcessingTime + '}';
        }
    }
}

