/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Locale;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.IndexedCombinedWatermarkStatus;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContextHandler;
import org.apache.flink.streaming.api.operators.MailboxWatermarkProcessor;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamOperatorStateHandler;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.YieldingOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.LatencyStats;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>,
YieldingOperator<OUT>,
StreamOperatorStateHandler.CheckpointedStreamOperator,
KeyContextHandler,
Serializable {
    private static final long serialVersionUID = 1L;
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperator.class);
    private transient StreamTask<?, ?> container;
    protected transient StreamConfig config;
    protected transient Output<StreamRecord<OUT>> output;
    protected transient IndexedCombinedWatermarkStatus combinedWatermark;
    private transient StreamingRuntimeContext runtimeContext;
    @Nullable
    private transient MailboxExecutor mailboxExecutor;
    @Nullable
    private transient MailboxWatermarkProcessor watermarkProcessor;
    protected transient KeySelector<?, ?> stateKeySelector1;
    protected transient KeySelector<?, ?> stateKeySelector2;
    protected transient StreamOperatorStateHandler stateHandler;
    protected transient InternalTimeServiceManager<?> timeServiceManager;
    protected transient InternalOperatorMetricGroup metrics;
    protected transient LatencyStats latencyStats;
    protected transient ProcessingTimeService processingTimeService;
    protected transient RecordAttributes lastRecordAttributes1;
    protected transient RecordAttributes lastRecordAttributes2;

    public AbstractStreamOperator() {
    }

    public AbstractStreamOperator(StreamOperatorParameters<OUT> parameters) {
        if (parameters != null) {
            this.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
            this.processingTimeService = Preconditions.checkNotNull(parameters.getProcessingTimeService());
        }
    }

    protected void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        Environment environment = containingTask.getEnvironment();
        this.container = containingTask;
        this.config = config;
        this.output = output;
        this.metrics = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
        this.combinedWatermark = IndexedCombinedWatermarkStatus.forInputsCount(2);
        try {
            LatencyStats.Granularity granularity;
            Configuration taskManagerConfig = environment.getTaskManagerInfo().getConfiguration();
            int historySize = taskManagerConfig.get(MetricOptions.LATENCY_HISTORY_SIZE);
            if (historySize <= 0) {
                LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, (Object)historySize);
                historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
            }
            String configuredGranularity = taskManagerConfig.get(MetricOptions.LATENCY_SOURCE_GRANULARITY);
            try {
                granularity = LatencyStats.Granularity.valueOf(configuredGranularity.toUpperCase(Locale.ROOT));
            }
            catch (IllegalArgumentException iae) {
                granularity = LatencyStats.Granularity.OPERATOR;
                LOG.warn("Configured value {} option for {} is invalid. Defaulting to {}.", new Object[]{configuredGranularity, MetricOptions.LATENCY_SOURCE_GRANULARITY.key(), granularity});
            }
            MetricGroup taskMetricGroup = this.metrics.getTaskMetricGroup();
            this.latencyStats = new LatencyStats(taskMetricGroup.addGroup("latency"), historySize, this.container.getIndexInSubtaskGroup(), this.getOperatorID(), granularity);
        }
        catch (Exception e) {
            LOG.warn("An error occurred while instantiating latency metrics.", (Throwable)e);
            this.latencyStats = new LatencyStats(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().addGroup("latency"), 1, 0, new OperatorID(), LatencyStats.Granularity.SINGLE);
        }
        this.runtimeContext = new StreamingRuntimeContext(environment, environment.getAccumulatorRegistry().getUserMap(), this.getMetricGroup(), this.getOperatorID(), this.getProcessingTimeService(), null, environment.getExternalResourceInfoProvider());
        this.stateKeySelector1 = config.getStatePartitioner(0, this.getUserCodeClassloader());
        this.stateKeySelector2 = config.getStatePartitioner(1, this.getUserCodeClassloader());
        this.lastRecordAttributes1 = RecordAttributes.EMPTY_RECORD_ATTRIBUTES;
        this.lastRecordAttributes2 = RecordAttributes.EMPTY_RECORD_ATTRIBUTES;
    }

    protected void setProcessingTimeService(ProcessingTimeService processingTimeService) {
        this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
    }

    @Override
    public OperatorMetricGroup getMetricGroup() {
        return this.metrics;
    }

    protected void beforeInitializeStateHandler() {
    }

    @Override
    public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception {
        TypeSerializer keySerializer = this.config.getStateKeySerializer(this.getUserCodeClassloader());
        StreamTask<?, ?> containingTask = Preconditions.checkNotNull(this.getContainingTask());
        CloseableRegistry streamTaskCloseableRegistry = Preconditions.checkNotNull(containingTask.getCancelables());
        StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext(this.getOperatorID(), this.getClass().getSimpleName(), this.getProcessingTimeService(), this, keySerializer, streamTaskCloseableRegistry, this.metrics, this.config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND, this.runtimeContext.getJobConfiguration(), this.runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), this.runtimeContext.getUserCodeClassLoader()), this.isUsingCustomRawKeyedState(), this.isAsyncKeyOrderedProcessingEnabled());
        this.stateHandler = new StreamOperatorStateHandler(context, this.getExecutionConfig(), streamTaskCloseableRegistry);
        this.timeServiceManager = this.isAsyncKeyOrderedProcessingEnabled() ? context.asyncInternalTimerServiceManager() : context.internalTimerServiceManager();
        this.beforeInitializeStateHandler();
        this.stateHandler.initializeOperatorState(this);
        this.runtimeContext.setKeyedStateStore(this.stateHandler.getKeyedStateStore().orElse(null));
    }

    @Internal
    protected boolean isUsingCustomRawKeyedState() {
        return false;
    }

    @Internal
    public boolean isAsyncKeyOrderedProcessingEnabled() {
        return false;
    }

    @Override
    @Internal
    public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
        this.mailboxExecutor = mailboxExecutor;
    }

    @Internal
    public boolean useSplittableTimers() {
        return false;
    }

    @Internal
    private boolean areSplittableTimersConfigured() {
        return AbstractStreamOperator.areSplittableTimersConfigured(this.config);
    }

    static boolean areSplittableTimersConfigured(StreamConfig config) {
        return config.isCheckpointingEnabled() && config.isUnalignedCheckpointsEnabled() && config.isUnalignedCheckpointsSplittableTimersEnabled();
    }

    @Override
    public void open() throws Exception {
        if (this.useSplittableTimers() && this.areSplittableTimersConfigured() && this.getTimeServiceManager().isPresent()) {
            this.watermarkProcessor = new MailboxWatermarkProcessor<OUT>(this.output, this.mailboxExecutor, this.getTimeServiceManager().get());
        }
    }

    @Override
    public void finish() throws Exception {
    }

    @Override
    public void close() throws Exception {
        if (this.stateHandler != null) {
            this.stateHandler.dispose();
        }
    }

    @Override
    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
    }

    @Override
    public OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory) throws Exception {
        return this.stateHandler.snapshotState(this, Optional.ofNullable(this.timeServiceManager), this.getOperatorName(), checkpointId, timestamp, checkpointOptions, factory, this.isUsingCustomRawKeyedState(), this.isAsyncKeyOrderedProcessingEnabled());
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.stateHandler.notifyCheckpointComplete(checkpointId);
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        this.stateHandler.notifyCheckpointAborted(checkpointId);
    }

    public ExecutionConfig getExecutionConfig() {
        return this.container.getExecutionConfig();
    }

    public StreamConfig getOperatorConfig() {
        return this.config;
    }

    public StreamTask<?, ?> getContainingTask() {
        return this.container;
    }

    public ClassLoader getUserCodeClassloader() {
        return this.container.getUserCodeClassLoader();
    }

    protected String getOperatorName() {
        if (this.runtimeContext != null) {
            return this.runtimeContext.getTaskInfo().getTaskNameWithSubtasks();
        }
        return this.getClass().getSimpleName();
    }

    @VisibleForTesting
    public StreamingRuntimeContext getRuntimeContext() {
        return this.runtimeContext;
    }

    public <K> KeyedStateBackend<K> getKeyedStateBackend() {
        return this.stateHandler.getKeyedStateBackend();
    }

    @VisibleForTesting
    public OperatorStateBackend getOperatorStateBackend() {
        return this.stateHandler.getOperatorStateBackend();
    }

    @VisibleForTesting
    public ProcessingTimeService getProcessingTimeService() {
        return this.processingTimeService;
    }

    protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return this.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
    }

    protected <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        return this.stateHandler.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
    }

    public <S extends State, N> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return this.stateHandler.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
    }

    @Override
    public void setKeyContextElement1(StreamRecord record) throws Exception {
        this.setKeyContextElement(record, this.stateKeySelector1);
    }

    @Override
    public void setKeyContextElement2(StreamRecord record) throws Exception {
        this.setKeyContextElement(record, this.stateKeySelector2);
    }

    @Override
    @Internal
    public boolean hasKeyContext1() {
        return this.stateKeySelector1 != null;
    }

    @Override
    @Internal
    public boolean hasKeyContext2() {
        return this.stateKeySelector2 != null;
    }

    private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
        if (selector != null) {
            Object key = selector.getKey(record.getValue());
            this.setCurrentKey(key);
        }
    }

    @Override
    public void setCurrentKey(Object key) {
        this.stateHandler.setCurrentKey(key);
    }

    @Override
    public Object getCurrentKey() {
        return this.stateHandler.getCurrentKey();
    }

    public KeyedStateStore getKeyedStateStore() {
        if (this.stateHandler == null) {
            return null;
        }
        return this.stateHandler.getKeyedStateStore().orElse(null);
    }

    protected KeySelector<?, ?> getStateKeySelector1() {
        return this.stateKeySelector1;
    }

    protected KeySelector<?, ?> getStateKeySelector2() {
        return this.stateKeySelector2;
    }

    public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
        this.reportOrForwardLatencyMarker(latencyMarker);
    }

    public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception {
        this.reportOrForwardLatencyMarker(latencyMarker);
    }

    public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception {
        this.reportOrForwardLatencyMarker(latencyMarker);
    }

    protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
        this.latencyStats.reportLatency(marker);
        this.output.emitLatencyMarker(marker);
    }

    public <K, N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
        if (this.timeServiceManager == null) {
            throw new RuntimeException("The timer service has not been initialized.");
        }
        InternalTimeServiceManager<?> keyedTimeServiceHandler = this.timeServiceManager;
        TypeSerializer keySerializer = this.stateHandler.getKeySerializer();
        Preconditions.checkState(keySerializer != null, "Timers can only be used on keyed operators.");
        return keyedTimeServiceHandler.getInternalTimerService(name, keySerializer, namespaceSerializer, triggerable);
    }

    public void processWatermark(Watermark mark) throws Exception {
        if (this.watermarkProcessor != null) {
            this.watermarkProcessor.emitWatermarkInsideMailbox(mark);
        } else {
            this.emitWatermarkDirectly(mark);
        }
    }

    private void emitWatermarkDirectly(Watermark mark) throws Exception {
        if (this.timeServiceManager != null) {
            this.timeServiceManager.advanceWatermark(mark);
        }
        this.output.emitWatermark(mark);
    }

    private void processWatermark(Watermark mark, int index) throws Exception {
        if (this.combinedWatermark.updateWatermark(index, mark.getTimestamp())) {
            this.processWatermark(new Watermark(this.combinedWatermark.getCombinedWatermark()));
        }
    }

    public void processWatermark1(Watermark mark) throws Exception {
        this.processWatermark(mark, 0);
    }

    public void processWatermark2(Watermark mark) throws Exception {
        this.processWatermark(mark, 1);
    }

    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        this.output.emitWatermarkStatus(watermarkStatus);
    }

    protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index) throws Exception {
        boolean wasIdle = this.combinedWatermark.isIdle();
        if (this.combinedWatermark.updateStatus(index, watermarkStatus.isIdle())) {
            this.processWatermark(new Watermark(this.combinedWatermark.getCombinedWatermark()));
        }
        if (wasIdle != this.combinedWatermark.isIdle()) {
            this.output.emitWatermarkStatus(watermarkStatus);
        }
    }

    public final void processWatermarkStatus1(WatermarkStatus watermarkStatus) throws Exception {
        this.processWatermarkStatus(watermarkStatus, 0);
    }

    public final void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception {
        this.processWatermarkStatus(watermarkStatus, 1);
    }

    @Override
    public OperatorID getOperatorID() {
        return this.config.getOperatorID();
    }

    protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() {
        return Optional.ofNullable(this.timeServiceManager);
    }

    @Experimental
    public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
        this.output.emitRecordAttributes(new RecordAttributesBuilder(Collections.singletonList(recordAttributes)).build());
    }

    @Experimental
    public void processRecordAttributes1(RecordAttributes recordAttributes) {
        this.lastRecordAttributes1 = recordAttributes;
        this.output.emitRecordAttributes(new RecordAttributesBuilder(Arrays.asList(this.lastRecordAttributes1, this.lastRecordAttributes2)).build());
    }

    @Experimental
    public void processRecordAttributes2(RecordAttributes recordAttributes) {
        this.lastRecordAttributes2 = recordAttributes;
        this.output.emitRecordAttributes(new RecordAttributesBuilder(Arrays.asList(this.lastRecordAttributes1, this.lastRecordAttributes2)).build());
    }

    @Experimental
    public void processWatermark(WatermarkEvent watermark) throws Exception {
        this.output.emitWatermark(watermark);
    }

    @Experimental
    public void processWatermark1(WatermarkEvent watermark) throws Exception {
        this.output.emitWatermark(watermark);
    }

    @Experimental
    public void processWatermark2(WatermarkEvent watermark) throws Exception {
        this.output.emitWatermark(watermark);
    }
}

