/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sorted.state;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceWithAsyncState;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.WrappingRuntimeException;

public class BatchExecutionInternalTimeServiceManager<K>
implements InternalTimeServiceManager<K>,
KeyedStateBackend.KeySelectionListener<K> {
    private final ProcessingTimeService processingTimeService;
    private final Map<String, BatchExecutionInternalTimeService<K, ?>> timerServices = new HashMap();
    private final boolean asyncStateProcessingMode;

    public BatchExecutionInternalTimeServiceManager(ProcessingTimeService processingTimeService, boolean asyncStateProcessingMode) {
        this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
        this.asyncStateProcessingMode = asyncStateProcessingMode;
    }

    @Override
    public <N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
        BatchExecutionInternalTimeService<K, ?> timerService = this.timerServices.get(name);
        if (timerService == null) {
            timerService = this.asyncStateProcessingMode ? new BatchExecutionInternalTimeServiceWithAsyncState<K, N>(this.processingTimeService, triggerable) : new BatchExecutionInternalTimeService<K, N>(this.processingTimeService, triggerable);
            this.timerServices.put(name, timerService);
        }
        return timerService;
    }

    @Override
    public void advanceWatermark(Watermark watermark) {
        if (watermark.getTimestamp() == Long.MAX_VALUE) {
            this.keySelected(null);
        }
    }

    @Override
    public boolean tryAdvanceWatermark(Watermark watermark, InternalTimeServiceManager.ShouldStopAdvancingFn shouldStopAdvancingFn) {
        this.advanceWatermark(watermark);
        return true;
    }

    @Override
    public void snapshotToRawKeyedState(KeyedStateCheckpointOutputStream context, String operatorName) throws Exception {
        throw new UnsupportedOperationException("Checkpoints are not supported in BATCH execution");
    }

    public static <K> InternalTimeServiceManager<K> create(TaskIOMetricGroup taskIOMetricGroup, PriorityQueueSetFactory factory, KeyGroupRange keyGroupRange, ClassLoader userClassloader, KeyContext keyContext, ProcessingTimeService processingTimeService, Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates, StreamTaskCancellationContext cancellationContext) {
        CheckpointableKeyedStateBackend keyedStateBackend;
        BatchExecutionKeyedStateBackend theFactory = null;
        boolean asyncStateProcessingMode = false;
        if (factory instanceof BatchExecutionKeyedStateBackend) {
            theFactory = (BatchExecutionKeyedStateBackend)factory;
        } else if (factory instanceof AsyncKeyedStateBackendAdaptor && (keyedStateBackend = ((AsyncKeyedStateBackendAdaptor)factory).getKeyedStateBackend()) instanceof BatchExecutionKeyedStateBackend) {
            theFactory = (BatchExecutionKeyedStateBackend)keyedStateBackend;
            asyncStateProcessingMode = true;
        }
        if (theFactory == null) {
            throw new IllegalStateException("Batch execution specific time service can work only with BatchExecutionKeyedStateBackend");
        }
        BatchExecutionInternalTimeServiceManager<K> timeServiceManager = new BatchExecutionInternalTimeServiceManager<K>(processingTimeService, asyncStateProcessingMode);
        theFactory.registerKeySelectionListener(timeServiceManager);
        return timeServiceManager;
    }

    @Override
    public void keySelected(K newKey) {
        try {
            for (BatchExecutionInternalTimeService<K, ?> value : this.timerServices.values()) {
                value.setCurrentKey(newKey);
            }
        }
        catch (Exception e) {
            throw new WrappingRuntimeException((Throwable)e);
        }
    }
}

