/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import com.google.auto.value.AutoValue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.SamzaRunner;
import org.apache.beam.runners.samza.runtime.AutoValue_SamzaTimerInternalsFactory_TimerKey;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.state.SamzaMapState;
import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.samza.operators.Scheduler;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamzaTimerInternalsFactory<K>
implements TimerInternalsFactory<K> {
    private static final Logger LOG = LoggerFactory.getLogger(SamzaTimerInternalsFactory.class);
    private final NavigableSet<KeyedTimerData<K>> eventTimeBuffer;
    private final Coder<K> keyCoder;
    private final Scheduler<KeyedTimerData<K>> timerRegistry;
    private final SamzaTimerState state;
    private final PCollection.IsBounded isBounded;
    private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
    private Instant outputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
    private final int maxEventTimerBufferSize;
    private long maxEventTimeInBuffer;
    private final long maxReadyTimersToProcessOnce;

    private SamzaTimerInternalsFactory(Coder<K> keyCoder, Scheduler<KeyedTimerData<K>> timerRegistry, String timerStateId, SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory, Coder<BoundedWindow> windowCoder, PCollection.IsBounded isBounded, SamzaPipelineOptions pipelineOptions) {
        this.keyCoder = keyCoder;
        this.timerRegistry = timerRegistry;
        this.eventTimeBuffer = new TreeSet<KeyedTimerData<K>>();
        this.maxEventTimerBufferSize = pipelineOptions.getEventTimerBufferSize();
        this.maxEventTimeInBuffer = Long.MAX_VALUE;
        this.maxReadyTimersToProcessOnce = pipelineOptions.getMaxReadyTimersToProcessOnce();
        this.state = new SamzaTimerState(timerStateId, nonKeyedStateInternalsFactory, windowCoder);
        this.isBounded = isBounded;
    }

    static <K> SamzaTimerInternalsFactory<K> createTimerInternalFactory(Coder<K> keyCoder, Scheduler<KeyedTimerData<K>> timerRegistry, String timerStateId, SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory, WindowingStrategy<?, BoundedWindow> windowingStrategy, PCollection.IsBounded isBounded, SamzaPipelineOptions pipelineOptions) {
        Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
        return new SamzaTimerInternalsFactory<K>(keyCoder, timerRegistry, timerStateId, nonKeyedStateInternalsFactory, (Coder<BoundedWindow>)windowCoder, isBounded, pipelineOptions);
    }

    public TimerInternals timerInternalsForKey(@Nullable K key) {
        byte[] keyBytes;
        if (this.keyCoder == null) {
            if (key != null) {
                throw new IllegalArgumentException(String.format("Received non-null key for unkeyed timer factory. Key: %s", key));
            }
            keyBytes = null;
        } else {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            try {
                this.keyCoder.encode(key, (OutputStream)baos);
            }
            catch (IOException e) {
                throw new RuntimeException("Could not encode key: " + key, e);
            }
            keyBytes = baos.toByteArray();
        }
        return new SamzaTimerInternals(keyBytes, key);
    }

    public void setInputWatermark(Instant watermark) {
        if (watermark.isBefore((ReadableInstant)this.inputWatermark)) {
            throw new IllegalArgumentException("New input watermark is before current watermark");
        }
        LOG.debug("Advancing input watermark from {} to {}.", (Object)this.inputWatermark, (Object)watermark);
        this.inputWatermark = watermark;
    }

    public void setOutputWatermark(Instant watermark) {
        if (watermark.isAfter((ReadableInstant)this.inputWatermark)) {
            LOG.debug("Clipping new output watermark from {} to {}.", (Object)watermark, (Object)this.inputWatermark);
            watermark = this.inputWatermark;
        }
        if (watermark.isBefore((ReadableInstant)this.outputWatermark)) {
            throw new IllegalArgumentException("New output watermark is before current watermark");
        }
        LOG.debug("Advancing output watermark from {} to {}.", (Object)this.outputWatermark, (Object)watermark);
        this.outputWatermark = watermark;
    }

    public Collection<KeyedTimerData<K>> removeReadyTimers() {
        ArrayList<KeyedTimerData<K>> readyTimers = new ArrayList<KeyedTimerData<K>>();
        while (!this.eventTimeBuffer.isEmpty() && !((KeyedTimerData)this.eventTimeBuffer.first()).getTimerData().getTimestamp().isAfter((ReadableInstant)this.inputWatermark) && (long)readyTimers.size() < this.maxReadyTimersToProcessOnce) {
            KeyedTimerData<K> keyedTimerData = this.eventTimeBuffer.pollFirst();
            readyTimers.add(keyedTimerData);
            this.state.deletePersisted(keyedTimerData);
            if (!this.eventTimeBuffer.isEmpty()) continue;
            this.state.reloadEventTimeTimers();
        }
        LOG.debug("Removed {} ready timers", (Object)readyTimers.size());
        if ((long)readyTimers.size() == this.maxReadyTimersToProcessOnce && !this.eventTimeBuffer.isEmpty() && ((KeyedTimerData)this.eventTimeBuffer.first()).getTimerData().getTimestamp().isBefore((ReadableInstant)this.inputWatermark)) {
            LOG.warn("Loaded {} expired timers, the remaining will be processed at next watermark.", (Object)this.maxReadyTimersToProcessOnce);
        }
        return readyTimers;
    }

    public void removeProcessingTimer(KeyedTimerData<K> keyedTimerData) {
        this.state.deletePersisted(keyedTimerData);
    }

    public Instant getInputWatermark() {
        return this.inputWatermark;
    }

    public Instant getOutputWatermark() {
        return this.outputWatermark;
    }

    NavigableSet<KeyedTimerData<K>> getEventTimeBuffer() {
        return this.eventTimeBuffer;
    }

    public static class TimerKeyCoder<K>
    extends StructuredCoder<TimerKey<K>> {
        private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
        private final Coder<K> keyCoder;
        private final Coder<? extends BoundedWindow> windowCoder;

        TimerKeyCoder(Coder<K> keyCoder, Coder<? extends BoundedWindow> windowCoder) {
            this.keyCoder = keyCoder;
            this.windowCoder = windowCoder;
        }

        public void encode(TimerKey<K> value, OutputStream outStream) throws CoderException, IOException {
            STRING_CODER.encode(value.getTimerId(), outStream);
            STRING_CODER.encode(value.getStateNamespace().stringKey(), outStream);
            if (this.keyCoder != null) {
                this.keyCoder.encode(value.getKey(), outStream);
            }
            STRING_CODER.encode(value.getTimerFamilyId(), outStream);
        }

        public TimerKey<K> decode(InputStream inStream) throws CoderException, IOException {
            String timerId = STRING_CODER.decode(inStream);
            StateNamespace namespace = StateNamespaces.fromString((String)STRING_CODER.decode(inStream), this.windowCoder);
            Object key = null;
            if (this.keyCoder != null) {
                key = this.keyCoder.decode(inStream);
            }
            String timerFamilyId = inStream.available() > 0 ? STRING_CODER.decode(inStream) : "";
            return TimerKey.builder().setTimerId(timerId).setStateNamespace(namespace).setKey(key).setTimerFamilyId(timerFamilyId).build();
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return Arrays.asList(this.keyCoder, this.windowCoder);
        }

        public void verifyDeterministic() throws Coder.NonDeterministicException {
        }
    }

    @AutoValue
    static abstract class TimerKey<K> {
        TimerKey() {
        }

        abstract @Nullable K getKey();

        abstract StateNamespace getStateNamespace();

        abstract String getTimerId();

        abstract String getTimerFamilyId();

        static <K> Builder<K> builder() {
            return new AutoValue_SamzaTimerInternalsFactory_TimerKey.Builder();
        }

        static <K> TimerKey<K> of(KeyedTimerData<K> keyedTimerData) {
            TimerInternals.TimerData timerData = keyedTimerData.getTimerData();
            return TimerKey.of(keyedTimerData.getKey(), timerData.getNamespace(), timerData.getTimerId(), timerData.getTimerFamilyId());
        }

        static <K> TimerKey<K> of(K key, StateNamespace namespace, String timerId, String timerFamilyId) {
            return TimerKey.builder().setKey(key).setStateNamespace(namespace).setTimerId(timerId).setTimerFamilyId(timerFamilyId).build();
        }

        static <K> KeyedTimerData<K> toKeyedTimerData(TimerKey<K> timerKey, long timestamp, TimeDomain domain, Coder<K> keyCoder) {
            byte[] keyBytes = null;
            if (keyCoder != null && timerKey.getKey() != null) {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                try {
                    keyCoder.encode(timerKey.getKey(), (OutputStream)baos);
                }
                catch (IOException e) {
                    throw new RuntimeException("Could not encode key: " + timerKey.getKey(), e);
                }
                keyBytes = baos.toByteArray();
            }
            return new KeyedTimerData<K>(keyBytes, timerKey.getKey(), TimerInternals.TimerData.of((String)timerKey.getTimerId(), (String)timerKey.getTimerFamilyId(), (StateNamespace)timerKey.getStateNamespace(), (Instant)new Instant(timestamp), (Instant)new Instant(timestamp), (TimeDomain)domain));
        }

        @AutoValue.Builder
        static abstract class Builder<K> {
            Builder() {
            }

            abstract Builder<K> setKey(K var1);

            abstract Builder<K> setStateNamespace(StateNamespace var1);

            abstract Builder<K> setTimerId(String var1);

            abstract Builder<K> setTimerFamilyId(String var1);

            abstract TimerKey<K> build();
        }
    }

    private class SamzaTimerState {
        private final SamzaMapState<TimerKey<K>, Long> eventTimeTimerState;
        private final SamzaSetState<KeyedTimerData<K>> timestampSortedEventTimeTimerState;
        private final SamzaMapState<TimerKey<K>, Long> processingTimeTimerState;

        SamzaTimerState(String timerStateId, SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory, Coder<BoundedWindow> windowCoder) {
            this.eventTimeTimerState = (SamzaMapState)nonKeyedStateInternalsFactory.stateInternalsForKey(null).state(StateNamespaces.global(), StateTags.map((String)(timerStateId + "-et"), new TimerKeyCoder(SamzaTimerInternalsFactory.this.keyCoder, windowCoder), (Coder)VarLongCoder.of()));
            this.timestampSortedEventTimeTimerState = (SamzaSetState)nonKeyedStateInternalsFactory.stateInternalsForKey(null).state(StateNamespaces.global(), StateTags.set((String)(timerStateId + "-ts"), new KeyedTimerData.KeyedTimerDataCoder(SamzaTimerInternalsFactory.this.keyCoder, windowCoder)));
            this.processingTimeTimerState = (SamzaMapState)nonKeyedStateInternalsFactory.stateInternalsForKey(null).state(StateNamespaces.global(), StateTags.map((String)(timerStateId + "-pt"), new TimerKeyCoder(SamzaTimerInternalsFactory.this.keyCoder, windowCoder), (Coder)VarLongCoder.of()));
            this.init();
        }

        Long get(KeyedTimerData<K> keyedTimerData) {
            return this.get(TimerKey.of(keyedTimerData), keyedTimerData.getTimerData().getDomain());
        }

        Long get(TimerKey<K> key, TimeDomain domain) {
            switch (domain) {
                case EVENT_TIME: {
                    return (Long)this.eventTimeTimerState.get(key).read();
                }
                case PROCESSING_TIME: {
                    return (Long)this.processingTimeTimerState.get(key).read();
                }
            }
            throw new UnsupportedOperationException(String.format("%s currently only supports event time", SamzaRunner.class));
        }

        void persist(KeyedTimerData<K> keyedTimerData) {
            TimerKey timerKey = TimerKey.of(keyedTimerData);
            switch (keyedTimerData.getTimerData().getDomain()) {
                case EVENT_TIME: {
                    Long timestamp = (Long)this.eventTimeTimerState.get(timerKey).read();
                    if (timestamp != null) {
                        KeyedTimerData keyedTimerDataInStore = TimerKey.toKeyedTimerData(timerKey, timestamp, TimeDomain.EVENT_TIME, SamzaTimerInternalsFactory.this.keyCoder);
                        this.timestampSortedEventTimeTimerState.remove(keyedTimerDataInStore);
                    }
                    this.eventTimeTimerState.put(timerKey, keyedTimerData.getTimerData().getTimestamp().getMillis());
                    this.timestampSortedEventTimeTimerState.add(keyedTimerData);
                    break;
                }
                case PROCESSING_TIME: {
                    this.processingTimeTimerState.put(timerKey, keyedTimerData.getTimerData().getTimestamp().getMillis());
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("%s currently only supports event time or processing time", SamzaRunner.class));
                }
            }
        }

        void deletePersisted(KeyedTimerData<K> keyedTimerData) {
            TimerKey timerKey = TimerKey.of(keyedTimerData);
            switch (keyedTimerData.getTimerData().getDomain()) {
                case EVENT_TIME: {
                    this.eventTimeTimerState.remove(timerKey);
                    this.timestampSortedEventTimeTimerState.remove(keyedTimerData);
                    break;
                }
                case PROCESSING_TIME: {
                    this.processingTimeTimerState.remove(timerKey);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("%s currently only supports event time or processing time", SamzaRunner.class));
                }
            }
        }

        private void reloadEventTimeTimers() {
            Iterator iter = (Iterator)this.timestampSortedEventTimeTimerState.readIterator().read();
            while (iter.hasNext() && SamzaTimerInternalsFactory.this.eventTimeBuffer.size() < SamzaTimerInternalsFactory.this.maxEventTimerBufferSize) {
                KeyedTimerData keyedTimerData = (KeyedTimerData)iter.next();
                SamzaTimerInternalsFactory.this.eventTimeBuffer.add(keyedTimerData);
                SamzaTimerInternalsFactory.this.maxEventTimeInBuffer = keyedTimerData.getTimerData().getTimestamp().getMillis();
            }
            this.timestampSortedEventTimeTimerState.closeIterators();
            LOG.info("Loaded {} event time timers in memory", (Object)SamzaTimerInternalsFactory.this.eventTimeBuffer.size());
            if (SamzaTimerInternalsFactory.this.eventTimeBuffer.size() < SamzaTimerInternalsFactory.this.maxEventTimerBufferSize) {
                LOG.debug("Event time timers in State is empty, filled {} timers out of {} buffer capacity", (Object)SamzaTimerInternalsFactory.this.eventTimeBuffer.size(), (Object)SamzaTimerInternalsFactory.this.maxEventTimeInBuffer);
                SamzaTimerInternalsFactory.this.maxEventTimeInBuffer = Long.MAX_VALUE;
            }
        }

        private void loadProcessingTimeTimers() {
            Iterator iter = (Iterator)this.processingTimeTimerState.readIterator().read();
            int count = 0;
            while (iter.hasNext()) {
                Map.Entry entry = (Map.Entry)iter.next();
                KeyedTimerData keyedTimerData = TimerKey.toKeyedTimerData((TimerKey)entry.getKey(), (Long)entry.getValue(), TimeDomain.PROCESSING_TIME, SamzaTimerInternalsFactory.this.keyCoder);
                SamzaTimerInternalsFactory.this.timerRegistry.schedule(keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
                ++count;
            }
            this.processingTimeTimerState.closeIterators();
            LOG.info("Loaded {} processing time timers in memory", (Object)count);
        }

        private void init() {
            Iterator eventTimersIter = (Iterator)this.eventTimeTimerState.readIterator().read();
            if (eventTimersIter.hasNext()) {
                Iterator sortedEventTimerIter = (Iterator)this.timestampSortedEventTimeTimerState.readIterator().read();
                if (!sortedEventTimerIter.hasNext()) {
                    while (eventTimersIter.hasNext()) {
                        Map.Entry entry = (Map.Entry)eventTimersIter.next();
                        KeyedTimerData keyedTimerData = TimerKey.toKeyedTimerData((TimerKey)entry.getKey(), (Long)entry.getValue(), TimeDomain.EVENT_TIME, SamzaTimerInternalsFactory.this.keyCoder);
                        this.timestampSortedEventTimeTimerState.add(keyedTimerData);
                    }
                }
                this.timestampSortedEventTimeTimerState.closeIterators();
            }
            this.eventTimeTimerState.closeIterators();
            this.reloadEventTimeTimers();
            this.loadProcessingTimeTimers();
        }
    }

    private class SamzaTimerInternals
    implements TimerInternals {
        private final byte[] keyBytes;
        private final K key;

        public SamzaTimerInternals(byte[] keyBytes, K key) {
            this.keyBytes = keyBytes;
            this.key = key;
        }

        public void setTimer(StateNamespace namespace, String timerId, String timerFamilyId, Instant target, Instant outputTimestamp, TimeDomain timeDomain) {
            this.setTimer(TimerInternals.TimerData.of((String)timerId, (String)timerFamilyId, (StateNamespace)namespace, (Instant)target, (Instant)outputTimestamp, (TimeDomain)timeDomain));
        }

        public void setTimer(TimerInternals.TimerData timerData) {
            if (SamzaTimerInternalsFactory.this.isBounded == PCollection.IsBounded.UNBOUNDED && timerData.getTimestamp().getMillis() > GlobalWindow.INSTANCE.maxTimestamp().getMillis()) {
                return;
            }
            KeyedTimerData keyedTimerData = new KeyedTimerData(this.keyBytes, this.key, timerData);
            if (SamzaTimerInternalsFactory.this.eventTimeBuffer.contains(keyedTimerData)) {
                return;
            }
            Long lastTimestamp = SamzaTimerInternalsFactory.this.state.get(keyedTimerData);
            Long newTimestamp = timerData.getTimestamp().getMillis();
            if (newTimestamp.equals(lastTimestamp)) {
                return;
            }
            if (lastTimestamp != null) {
                this.deleteTimer(timerData.getNamespace(), timerData.getTimerId(), timerData.getTimerFamilyId(), new Instant((Object)lastTimestamp), new Instant((Object)lastTimestamp), timerData.getDomain());
            }
            SamzaTimerInternalsFactory.this.state.persist(keyedTimerData);
            switch (timerData.getDomain()) {
                case EVENT_TIME: {
                    if (newTimestamp >= SamzaTimerInternalsFactory.this.maxEventTimeInBuffer) break;
                    SamzaTimerInternalsFactory.this.eventTimeBuffer.add(keyedTimerData);
                    if (SamzaTimerInternalsFactory.this.eventTimeBuffer.size() <= SamzaTimerInternalsFactory.this.maxEventTimerBufferSize) break;
                    SamzaTimerInternalsFactory.this.eventTimeBuffer.pollLast();
                    SamzaTimerInternalsFactory.this.maxEventTimeInBuffer = ((KeyedTimerData)SamzaTimerInternalsFactory.this.eventTimeBuffer.last()).getTimerData().getTimestamp().getMillis();
                    break;
                }
                case PROCESSING_TIME: {
                    SamzaTimerInternalsFactory.this.timerRegistry.schedule(keyedTimerData, timerData.getTimestamp().getMillis());
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("%s currently only supports even time or processing time", SamzaRunner.class));
                }
            }
        }

        @Deprecated
        public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
            this.deleteTimer(namespace, timerId, timerFamilyId, TimeDomain.EVENT_TIME);
        }

        @Deprecated
        public void deleteTimer(TimerInternals.TimerData timerData) {
            this.deleteTimer(timerData.getNamespace(), timerData.getTimerId(), timerData.getTimerFamilyId(), timerData.getDomain());
        }

        public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) {
            TimerKey timerKey = TimerKey.of(this.key, namespace, timerId, timerFamilyId);
            Long lastTimestamp = SamzaTimerInternalsFactory.this.state.get(timerKey, timeDomain);
            if (lastTimestamp == null) {
                return;
            }
            Instant timestamp = Instant.ofEpochMilli((long)lastTimestamp);
            this.deleteTimer(namespace, timerId, timerFamilyId, timestamp, timestamp, timeDomain);
        }

        private void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId, Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) {
            TimerInternals.TimerData timerData = TimerInternals.TimerData.of((String)timerId, (String)timerFamilyId, (StateNamespace)namespace, (Instant)timestamp, (Instant)outputTimestamp, (TimeDomain)timeDomain);
            KeyedTimerData keyedTimerData = new KeyedTimerData(this.keyBytes, this.key, timerData);
            SamzaTimerInternalsFactory.this.state.deletePersisted(keyedTimerData);
            switch (timerData.getDomain()) {
                case EVENT_TIME: {
                    SamzaTimerInternalsFactory.this.eventTimeBuffer.remove(keyedTimerData);
                    break;
                }
                case PROCESSING_TIME: {
                    SamzaTimerInternalsFactory.this.timerRegistry.delete(keyedTimerData);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("%s currently only supports event time or processing time", SamzaRunner.class));
                }
            }
        }

        public Instant currentProcessingTime() {
            return new Instant();
        }

        public Instant currentSynchronizedProcessingTime() {
            throw new UnsupportedOperationException(String.format("%s does not currently support synchronized processing time", SamzaRunner.class));
        }

        public Instant currentInputWatermarkTime() {
            return SamzaTimerInternalsFactory.this.inputWatermark;
        }

        public Instant currentOutputWatermarkTime() {
            return SamzaTimerInternalsFactory.this.outputWatermark;
        }
    }
}

