/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupsList;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
import org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.api.operators.TimerSerializer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class InternalTimeServiceManagerImpl<K>
implements InternalTimeServiceManager<K> {
    protected static final Logger LOG = LoggerFactory.getLogger(InternalTimeServiceManagerImpl.class);
    @VisibleForTesting
    static final String TIMER_STATE_PREFIX = "_timer_state";
    @VisibleForTesting
    static final String PROCESSING_TIMER_PREFIX = "_timer_state/processing_";
    @VisibleForTesting
    static final String EVENT_TIMER_PREFIX = "_timer_state/event_";
    private final TaskIOMetricGroup taskIOMetricGroup;
    private final KeyGroupRange localKeyGroupRange;
    private final KeyContext keyContext;
    private final PriorityQueueSetFactory priorityQueueSetFactory;
    private final ProcessingTimeService processingTimeService;
    private final StreamTaskCancellationContext cancellationContext;
    private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;

    private InternalTimeServiceManagerImpl(TaskIOMetricGroup taskIOMetricGroup, KeyGroupRange localKeyGroupRange, KeyContext keyContext, PriorityQueueSetFactory priorityQueueSetFactory, ProcessingTimeService processingTimeService, StreamTaskCancellationContext cancellationContext) {
        this.taskIOMetricGroup = taskIOMetricGroup;
        this.localKeyGroupRange = (KeyGroupRange)Preconditions.checkNotNull((Object)localKeyGroupRange);
        this.priorityQueueSetFactory = (PriorityQueueSetFactory)Preconditions.checkNotNull((Object)priorityQueueSetFactory);
        this.keyContext = (KeyContext)Preconditions.checkNotNull((Object)keyContext);
        this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
        this.cancellationContext = cancellationContext;
        this.timerServices = new HashMap();
    }

    public static <K> InternalTimeServiceManagerImpl<K> create(TaskIOMetricGroup taskIOMetricGroup, CheckpointableKeyedStateBackend<K> keyedStateBackend, ClassLoader userClassloader, KeyContext keyContext, ProcessingTimeService processingTimeService, Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates, StreamTaskCancellationContext cancellationContext) throws Exception {
        KeyGroupRange keyGroupRange = keyedStateBackend.getKeyGroupRange();
        InternalTimeServiceManagerImpl<K> timeServiceManager = new InternalTimeServiceManagerImpl<K>(taskIOMetricGroup, keyGroupRange, keyContext, (PriorityQueueSetFactory)keyedStateBackend, processingTimeService, cancellationContext);
        for (KeyGroupStatePartitionStreamProvider streamProvider : rawKeyedStates) {
            int keyGroupIdx = streamProvider.getKeyGroupId();
            Preconditions.checkArgument((boolean)keyGroupRange.contains(keyGroupIdx), (Object)("Key Group " + keyGroupIdx + " does not belong to the local range."));
            super.restoreStateForKeyGroup(streamProvider.getStream(), keyGroupIdx, userClassloader);
        }
        return timeServiceManager;
    }

    @Override
    public <N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
        Preconditions.checkNotNull(keySerializer, (String)"Timers can only be used on keyed operators.");
        TimerSerializer<K, N> timerSerializer = new TimerSerializer<K, N>(keySerializer, namespaceSerializer);
        InternalTimerServiceImpl<K, N> timerService = this.registerOrGetTimerService(name, timerSerializer);
        timerService.startTimerService(timerSerializer.getKeySerializer(), timerSerializer.getNamespaceSerializer(), triggerable);
        return timerService;
    }

    <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {
        InternalTimerServiceImpl<K, Object> timerService = this.timerServices.get(name);
        if (timerService == null) {
            timerService = new InternalTimerServiceImpl<K, N>(this.taskIOMetricGroup, this.localKeyGroupRange, this.keyContext, this.processingTimeService, this.createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer), this.createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer), this.cancellationContext);
            this.timerServices.put(name, timerService);
        }
        return timerService;
    }

    @Override
    public <N> InternalTimerService<N> getAsyncInternalTimerService(String name, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable, AsyncExecutionController<K> asyncExecutionController) {
        Preconditions.checkNotNull(keySerializer, (String)"Timers can only be used on keyed operators.");
        TimerSerializer<K, N> timerSerializer = new TimerSerializer<K, N>(keySerializer, namespaceSerializer);
        InternalTimerServiceAsyncImpl<K, N> timerService = this.registerOrGetAsyncTimerService(name, timerSerializer, asyncExecutionController);
        timerService.startTimerService(timerSerializer.getKeySerializer(), timerSerializer.getNamespaceSerializer(), triggerable);
        return timerService;
    }

    <N> InternalTimerServiceAsyncImpl<K, N> registerOrGetAsyncTimerService(String name, TimerSerializer<K, N> timerSerializer, AsyncExecutionController<K> asyncExecutionController) {
        InternalTimerServiceAsyncImpl<K, N> timerService = (InternalTimerServiceAsyncImpl<K, N>)this.timerServices.get(name);
        if (timerService == null) {
            timerService = new InternalTimerServiceAsyncImpl<K, N>(this.taskIOMetricGroup, this.localKeyGroupRange, this.keyContext, this.processingTimeService, this.createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer), this.createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer), this.cancellationContext, asyncExecutionController);
            this.timerServices.put(name, timerService);
        }
        return timerService;
    }

    Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() {
        return Collections.unmodifiableMap(this.timerServices);
    }

    private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(String name, TimerSerializer<K, N> timerSerializer) {
        return this.priorityQueueSetFactory.create(name, timerSerializer);
    }

    @Override
    public void advanceWatermark(Watermark watermark) throws Exception {
        for (InternalTimerServiceImpl<K, ?> service : this.timerServices.values()) {
            service.advanceWatermark(watermark.getTimestamp());
        }
    }

    @Override
    public void snapshotToRawKeyedState(KeyedStateCheckpointOutputStream out, String operatorName) throws Exception {
        try {
            KeyGroupsList allKeyGroups = out.getKeyGroupList();
            Iterator iterator = allKeyGroups.iterator();
            while (iterator.hasNext()) {
                int keyGroupIdx = (Integer)iterator.next();
                out.startNewKeyGroup(keyGroupIdx);
                this.snapshotStateForKeyGroup((DataOutputView)new DataOutputViewStreamWrapper((OutputStream)out), keyGroupIdx);
            }
        }
        catch (Exception exception) {
            throw new Exception("Could not write timer service of " + operatorName + " to checkpoint state stream.", exception);
        }
        finally {
            try {
                out.close();
            }
            catch (Exception closeException) {
                LOG.warn("Could not close raw keyed operator state stream for {}. This might have prevented deleting some state data.", (Object)operatorName, (Object)closeException);
            }
        }
    }

    private void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException {
        InternalTimerServiceSerializationProxy serializationProxy = new InternalTimerServiceSerializationProxy(this, keyGroupIdx);
        serializationProxy.write(stream);
    }

    private void restoreStateForKeyGroup(InputStream stream, int keyGroupIdx, ClassLoader userCodeClassLoader) throws IOException {
        InternalTimerServiceSerializationProxy serializationProxy = new InternalTimerServiceSerializationProxy(this, userCodeClassLoader, keyGroupIdx);
        serializationProxy.read(stream);
    }

    @VisibleForTesting
    public int numProcessingTimeTimers() {
        int count = 0;
        for (InternalTimerServiceImpl<K, ?> timerService : this.timerServices.values()) {
            count += timerService.numProcessingTimeTimers();
        }
        return count;
    }

    @VisibleForTesting
    public int numEventTimeTimers() {
        int count = 0;
        for (InternalTimerServiceImpl<K, ?> timerService : this.timerServices.values()) {
            count += timerService.numEventTimeTimers();
        }
        return count;
    }
}

