/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.OptionalLong;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.configuration.SinkOptions;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.operators.sink.CommitterInitContextImpl;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;

class CommitterOperator<CommT>
extends AbstractStreamOperator<CommittableMessage<CommT>>
implements OneInputStreamOperator<CommittableMessage<CommT>, CommittableMessage<CommT>>,
BoundedOneInput {
    private final SimpleVersionedSerializer<CommT> committableSerializer;
    private final FunctionWithException<CommitterInitContext, Committer<CommT>, IOException> committerSupplier;
    private final boolean emitDownstream;
    private final boolean isBatchMode;
    private final boolean isCheckpointingEnabled;
    private SinkCommitterMetricGroup metricGroup;
    private Committer<CommT> committer;
    private CommittableCollector<CommT> committableCollector;
    private long lastCompletedCheckpointId = -1L;
    private int maxRetries;
    private boolean endInput = false;
    private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor<byte[]>("streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
    private ListState<CommittableCollector<CommT>> committableCollectorState;

    public CommitterOperator(StreamOperatorParameters<CommittableMessage<CommT>> parameters, ProcessingTimeService processingTimeService, SimpleVersionedSerializer<CommT> committableSerializer, FunctionWithException<CommitterInitContext, Committer<CommT>, IOException> committerSupplier, boolean emitDownstream, boolean isBatchMode, boolean isCheckpointingEnabled) {
        super(parameters);
        this.emitDownstream = emitDownstream;
        this.isBatchMode = isBatchMode;
        this.isCheckpointingEnabled = isCheckpointingEnabled;
        this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
        this.committableSerializer = Preconditions.checkNotNull(committableSerializer);
        this.committerSupplier = Preconditions.checkNotNull(committerSupplier);
    }

    @Override
    protected void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<CommittableMessage<CommT>>> output) {
        super.setup(containingTask, config, output);
        this.metricGroup = InternalSinkCommitterMetricGroup.wrap(this.getMetricGroup());
        this.committableCollector = CommittableCollector.of(this.metricGroup);
        this.maxRetries = config.getConfiguration().get(SinkOptions.COMMITTER_RETRIES);
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        OptionalLong checkpointId = context.getRestoredCheckpointId();
        CommitterInitContextImpl initContext = new CommitterInitContextImpl(this.getRuntimeContext(), this.metricGroup, checkpointId);
        this.committer = this.committerSupplier.apply(initContext);
        this.committableCollectorState = new SimpleVersionedListState<CommT>(context.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC), new CommittableCollectorSerializer<CommT>(this.committableSerializer, this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), this.metricGroup));
        if (context.isRestored()) {
            ((Iterable)this.committableCollectorState.get()).forEach(cc -> this.committableCollector.merge((CommittableCollector<CommT>)cc));
            this.lastCompletedCheckpointId = checkpointId.getAsLong();
            this.commitAndEmitCheckpoints();
        }
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.committableCollectorState.update(Collections.singletonList(this.committableCollector.copy()));
    }

    @Override
    public void endInput() throws Exception {
        this.endInput = true;
        if (!this.isCheckpointingEnabled || this.isBatchMode) {
            this.commitAndEmitCheckpoints();
        }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        this.lastCompletedCheckpointId = Math.max(this.lastCompletedCheckpointId, checkpointId);
        this.commitAndEmitCheckpoints();
    }

    private void commitAndEmitCheckpoints() throws IOException, InterruptedException {
        long completedCheckpointId = this.endInput ? Long.MAX_VALUE : this.lastCompletedCheckpointId;
        for (CheckpointCommittableManager<CommT> checkpointManager : this.committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
            this.commitAndEmit(checkpointManager);
            this.committableCollector.remove(checkpointManager);
        }
    }

    private void commitAndEmit(CheckpointCommittableManager<CommT> committableManager) throws IOException, InterruptedException {
        committableManager.commit(this.committer, this.maxRetries);
        if (this.emitDownstream) {
            this.emit(committableManager);
        }
    }

    private void emit(CheckpointCommittableManager<CommT> committableManager) {
        int subtaskId = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
        int numberOfSubtasks = this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
        long checkpointId = committableManager.getCheckpointId();
        Collection<CommT> committables = committableManager.getSuccessfulCommittables();
        this.output.collect(new StreamRecord(new CommittableSummary(subtaskId, numberOfSubtasks, checkpointId, committables.size(), committableManager.getNumFailed())));
        for (CommT committable : committables) {
            this.output.collect(new StreamRecord<CommittableWithLineage<CommT>>(new CommittableWithLineage<CommT>(committable, checkpointId, subtaskId)));
        }
    }

    @Override
    public void processElement(StreamRecord<CommittableMessage<CommT>> element) throws Exception {
        this.committableCollector.addMessage(element.getValue());
    }

    @Override
    public void close() throws Exception {
        IOUtils.closeAll(this.committer, () -> super.close());
    }
}

