/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.runners.samza.SamzaRunner;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.TimerKey;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.samza.operators.TimerRegistry;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamzaTimerInternalsFactory<K>
implements TimerInternalsFactory<K> {
    private static final Logger LOG = LoggerFactory.getLogger(SamzaTimerInternalsFactory.class);
    private final Map<TimerKey, KeyedTimerData<K>> timerMap = new HashMap<TimerKey, KeyedTimerData<K>>();
    private final NavigableSet<KeyedTimerData<K>> eventTimeTimers = new TreeSet<KeyedTimerData<K>>();
    private final Coder<K> keyCoder;
    private final TimerRegistry<TimerKey<K>> timerRegistry;
    private Instant inputWatermark = new Instant(Long.MIN_VALUE);
    private Instant outputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;

    public SamzaTimerInternalsFactory(Coder<K> keyCoder, TimerRegistry<TimerKey<K>> timerRegistry) {
        this.keyCoder = keyCoder;
        this.timerRegistry = timerRegistry;
    }

    public TimerInternals timerInternalsForKey(@Nullable K key) {
        byte[] keyBytes;
        if (this.keyCoder == null) {
            if (key != null) {
                throw new IllegalArgumentException(String.format("Received non-null key for unkeyed timer factory. Key: %s", key));
            }
            keyBytes = null;
        } else {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            try {
                this.keyCoder.encode(key, (OutputStream)baos, Coder.Context.OUTER);
            }
            catch (IOException e) {
                throw new RuntimeException("Could not encode key: " + key);
            }
            keyBytes = baos.toByteArray();
        }
        return new SamzaTimerInternals(keyBytes, key);
    }

    public void setInputWatermark(Instant watermark) {
        if (watermark.isBefore((ReadableInstant)this.inputWatermark)) {
            throw new IllegalArgumentException("New input watermark is before current watermark");
        }
        LOG.debug("Advancing input watermark from {} to {}.", (Object)this.inputWatermark, (Object)watermark);
        this.inputWatermark = watermark;
    }

    public void setOutputWatermark(Instant watermark) {
        if (watermark.isAfter((ReadableInstant)this.inputWatermark)) {
            LOG.debug("Clipping new output watermark from {} to {}.", (Object)watermark, (Object)this.inputWatermark);
            watermark = this.inputWatermark;
        }
        if (watermark.isBefore((ReadableInstant)this.outputWatermark)) {
            throw new IllegalArgumentException("New output watermark is before current watermark");
        }
        LOG.debug("Advancing output watermark from {} to {}.", (Object)this.outputWatermark, (Object)watermark);
        this.outputWatermark = watermark;
    }

    public Collection<KeyedTimerData<K>> removeReadyTimers() {
        ArrayList<KeyedTimerData<K>> readyTimers = new ArrayList<KeyedTimerData<K>>();
        while (!this.eventTimeTimers.isEmpty() && ((KeyedTimerData)this.eventTimeTimers.first()).getTimerData().getTimestamp().isBefore((ReadableInstant)this.inputWatermark)) {
            KeyedTimerData<K> keyedTimerData = this.eventTimeTimers.pollFirst();
            TimerInternals.TimerData timerData = keyedTimerData.getTimerData();
            TimerKey<K> timerKey = new TimerKey<K>(keyedTimerData.getKey(), keyedTimerData.getKeyBytes(), timerData.getNamespace(), timerData.getTimerId());
            this.timerMap.remove(timerKey);
            readyTimers.add(keyedTimerData);
        }
        return readyTimers;
    }

    public Instant getInputWatermark() {
        return this.inputWatermark;
    }

    public Instant getOutputWatermark() {
        return this.outputWatermark;
    }

    private class SamzaTimerInternals
    implements TimerInternals {
        private final byte[] keyBytes;
        private final K key;

        public SamzaTimerInternals(byte[] keyBytes, K key) {
            this.keyBytes = keyBytes;
            this.key = key;
        }

        public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
            this.setTimer(TimerInternals.TimerData.of((String)timerId, (StateNamespace)namespace, (Instant)target, (TimeDomain)timeDomain));
        }

        public void setTimer(TimerInternals.TimerData timerData) {
            KeyedTimerData keyedTimerData = new KeyedTimerData(this.keyBytes, this.key, timerData);
            TimerKey timerKey = new TimerKey(this.key, this.keyBytes, timerData.getNamespace(), timerData.getTimerId());
            switch (timerData.getDomain()) {
                case EVENT_TIME: {
                    KeyedTimerData oldTimer = (KeyedTimerData)SamzaTimerInternalsFactory.this.timerMap.get(timerKey);
                    if (oldTimer != null) {
                        if (!oldTimer.getTimerData().getDomain().equals((Object)timerData.getDomain())) {
                            throw new IllegalArgumentException(String.format("Attempt to set %s for time domain %s, but it is already set for time domain %s", timerData.getTimerId(), timerData.getDomain(), oldTimer.getTimerData().getDomain()));
                        }
                        this.deleteTimer(oldTimer.getTimerData());
                    }
                    SamzaTimerInternalsFactory.this.eventTimeTimers.add(keyedTimerData);
                    SamzaTimerInternalsFactory.this.timerMap.put(timerKey, keyedTimerData);
                    break;
                }
                case PROCESSING_TIME: {
                    SamzaTimerInternalsFactory.this.timerRegistry.register(timerKey, timerData.getTimestamp().getMillis());
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("%s currently only supports even time or processing time", SamzaRunner.class));
                }
            }
        }

        public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
            this.deleteTimer(TimerInternals.TimerData.of((String)timerId, (StateNamespace)namespace, (Instant)Instant.now(), (TimeDomain)timeDomain));
        }

        public void deleteTimer(StateNamespace namespace, String timerId) {
            this.deleteTimer(TimerInternals.TimerData.of((String)timerId, (StateNamespace)namespace, (Instant)Instant.now(), (TimeDomain)TimeDomain.EVENT_TIME));
        }

        public void deleteTimer(TimerInternals.TimerData timerData) {
            TimerKey timerKey = new TimerKey(this.key, this.keyBytes, timerData.getNamespace(), timerData.getTimerId());
            switch (timerData.getDomain()) {
                case EVENT_TIME: {
                    KeyedTimerData keyedTimerData = (KeyedTimerData)SamzaTimerInternalsFactory.this.timerMap.remove(timerKey);
                    if (keyedTimerData == null) break;
                    SamzaTimerInternalsFactory.this.eventTimeTimers.remove(keyedTimerData);
                    break;
                }
                case PROCESSING_TIME: {
                    SamzaTimerInternalsFactory.this.timerRegistry.delete(timerKey);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("%s currently only supports event time", SamzaRunner.class));
                }
            }
        }

        public Instant currentProcessingTime() {
            return new Instant();
        }

        public Instant currentSynchronizedProcessingTime() {
            throw new UnsupportedOperationException(String.format("%s does not currently support synchronized processing time", SamzaRunner.class));
        }

        public Instant currentInputWatermarkTime() {
            return SamzaTimerInternalsFactory.this.inputWatermark;
        }

        public Instant currentOutputWatermarkTime() {
            return SamzaTimerInternalsFactory.this.outputWatermark;
        }
    }
}

