/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupsList;
import org.apache.flink.streaming.api.operators.HeapInternalTimerService;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;

@Internal
public class InternalTimeServiceManager<K, N> {
    private final int totalKeyGroups;
    private final KeyGroupsList localKeyGroupRange;
    private final KeyContext keyContext;
    private final ProcessingTimeService processingTimeService;
    private final Map<String, HeapInternalTimerService<K, N>> timerServices;

    InternalTimeServiceManager(int totalKeyGroups, KeyGroupsList localKeyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService) {
        Preconditions.checkArgument((totalKeyGroups > 0 ? 1 : 0) != 0);
        this.totalKeyGroups = totalKeyGroups;
        this.localKeyGroupRange = (KeyGroupsList)Preconditions.checkNotNull((Object)localKeyGroupRange);
        this.keyContext = (KeyContext)Preconditions.checkNotNull((Object)keyContext);
        this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
        this.timerServices = new HashMap<String, HeapInternalTimerService<K, N>>();
    }

    public InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
        HeapInternalTimerService<K, N> timerService = this.timerServices.get(name);
        if (timerService == null) {
            timerService = new HeapInternalTimerService(this.totalKeyGroups, this.localKeyGroupRange, this.keyContext, this.processingTimeService);
            this.timerServices.put(name, timerService);
        }
        timerService.startTimerService(keySerializer, namespaceSerializer, triggerable);
        return timerService;
    }

    public void advanceWatermark(Watermark watermark) throws Exception {
        for (HeapInternalTimerService<K, N> service : this.timerServices.values()) {
            service.advanceWatermark(watermark.getTimestamp());
        }
    }

    public void snapshotStateForKeyGroup(DataOutputViewStreamWrapper stream, int keyGroupIdx) throws Exception {
        stream.writeInt(this.timerServices.size());
        for (Map.Entry<String, HeapInternalTimerService<K, N>> entry : this.timerServices.entrySet()) {
            String serviceName = entry.getKey();
            HeapInternalTimerService<K, N> timerService = entry.getValue();
            stream.writeUTF(serviceName);
            timerService.snapshotTimersForKeyGroup(stream, keyGroupIdx);
        }
    }

    public void restoreStateForKeyGroup(DataInputViewStreamWrapper stream, int keyGroupIdx, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
        int noOfTimerServices = stream.readInt();
        for (int i = 0; i < noOfTimerServices; ++i) {
            String serviceName = stream.readUTF();
            HeapInternalTimerService<K, N> timerService = this.timerServices.get(serviceName);
            if (timerService == null) {
                timerService = new HeapInternalTimerService(this.totalKeyGroups, this.localKeyGroupRange, this.keyContext, this.processingTimeService);
                this.timerServices.put(serviceName, timerService);
            }
            timerService.restoreTimersForKeyGroup(stream, keyGroupIdx, userCodeClassLoader);
        }
    }

    @VisibleForTesting
    public int numProcessingTimeTimers() {
        int count = 0;
        for (HeapInternalTimerService<K, N> timerService : this.timerServices.values()) {
            count += timerService.numProcessingTimeTimers();
        }
        return count;
    }

    @VisibleForTesting
    public int numEventTimeTimers() {
        int count = 0;
        for (HeapInternalTimerService<K, N> timerService : this.timerServices.values()) {
            count += timerService.numEventTimeTimers();
        }
        return count;
    }
}

