/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.stateful;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

public class SparkTimerInternals
implements TimerInternals {
    private final Instant highWatermark;
    private final Instant synchronizedProcessingTime;
    private final Set<TimerInternals.TimerData> timers = Sets.newHashSet();
    private Instant inputWatermark;

    private SparkTimerInternals(Instant lowWatermark, Instant highWatermark, Instant synchronizedProcessingTime) {
        this.inputWatermark = lowWatermark;
        this.highWatermark = highWatermark;
        this.synchronizedProcessingTime = synchronizedProcessingTime;
    }

    public static SparkTimerInternals forStreamFromSources(List<Integer> sourceIds, Map<Integer, GlobalWatermarkHolder.SparkWatermarks> watermarks) {
        if (watermarks == null || watermarks.isEmpty() || Collections.disjoint(sourceIds, watermarks.keySet())) {
            return new SparkTimerInternals(BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(0L));
        }
        Instant slowestLowWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
        Instant slowestHighWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
        Instant synchronizedProcessingTime = null;
        for (Integer sourceId : sourceIds) {
            GlobalWatermarkHolder.SparkWatermarks sparkWatermarks = watermarks.get(sourceId);
            if (sparkWatermarks == null) continue;
            slowestLowWatermark = slowestLowWatermark.isBefore((ReadableInstant)sparkWatermarks.getLowWatermark()) ? slowestLowWatermark : sparkWatermarks.getLowWatermark();
            Instant instant = slowestHighWatermark = slowestHighWatermark.isBefore((ReadableInstant)sparkWatermarks.getHighWatermark()) ? slowestHighWatermark : sparkWatermarks.getHighWatermark();
            if (synchronizedProcessingTime == null) {
                synchronizedProcessingTime = sparkWatermarks.getSynchronizedProcessingTime();
                continue;
            }
            Preconditions.checkArgument((boolean)sparkWatermarks.getSynchronizedProcessingTime().equals((Object)synchronizedProcessingTime), (Object)"Synchronized time is expected to keep synchronized across sources.");
        }
        return new SparkTimerInternals(slowestLowWatermark, slowestHighWatermark, synchronizedProcessingTime);
    }

    public static SparkTimerInternals global(Map<Integer, GlobalWatermarkHolder.SparkWatermarks> watermarks) {
        return watermarks == null ? SparkTimerInternals.forStreamFromSources(Collections.emptyList(), null) : SparkTimerInternals.forStreamFromSources(Lists.newArrayList(watermarks.keySet()), watermarks);
    }

    public Collection<TimerInternals.TimerData> getTimers() {
        return this.timers;
    }

    void addTimers(Iterator<TimerInternals.TimerData> timers) {
        while (timers.hasNext()) {
            TimerInternals.TimerData timer = timers.next();
            this.timers.add(timer);
        }
    }

    public void setTimer(TimerInternals.TimerData timer) {
        this.timers.add(timer);
    }

    public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
        throw new UnsupportedOperationException("Deleting a timer by ID is not yet supported.");
    }

    public void deleteTimer(TimerInternals.TimerData timer) {
        this.timers.remove(timer);
    }

    public Instant currentProcessingTime() {
        return Instant.now();
    }

    public @Nullable Instant currentSynchronizedProcessingTime() {
        return this.synchronizedProcessingTime;
    }

    public Instant currentInputWatermarkTime() {
        return this.inputWatermark;
    }

    public void advanceWatermark() {
        this.inputWatermark = this.highWatermark;
    }

    public @Nullable Instant currentOutputWatermarkTime() {
        return null;
    }

    public void setTimer(StateNamespace namespace, String timerId, String timerFamilyId, Instant target, Instant outputTimestamp, TimeDomain timeDomain) {
        throw new UnsupportedOperationException("Setting a timer by ID not yet supported.");
    }

    public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
        throw new UnsupportedOperationException("Deleting a timer by ID is not yet supported.");
    }

    public static Collection<byte[]> serializeTimers(Collection<TimerInternals.TimerData> timers, TimerInternals.TimerDataCoderV2 timerDataCoder) {
        return CoderHelpers.toByteArrays(timers, timerDataCoder);
    }

    public static Iterator<TimerInternals.TimerData> deserializeTimers(Collection<byte[]> serTimers, TimerInternals.TimerDataCoderV2 timerDataCoder) {
        return CoderHelpers.fromByteArrays(serTimers, timerDataCoder).iterator();
    }

    public String toString() {
        return "SparkTimerInternals{highWatermark=" + this.highWatermark + ", synchronizedProcessingTime=" + this.synchronizedProcessingTime + ", timers=" + this.timers + ", inputWatermark=" + this.inputWatermark + '}';
    }
}

