package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.class */
public final class CommitterOperatorFactory<CommT> extends AbstractStreamOperatorFactory<CommittableMessage<CommT>> implements OneInputStreamOperatorFactory<CommittableMessage<CommT>, CommittableMessage<CommT>> {
    private final SupportsCommitter<CommT> sink;
    private final boolean isBatchMode;
    private final boolean isCheckpointingEnabled;

    public CommitterOperatorFactory(SupportsCommitter<CommT> supportsCommitter, boolean z, boolean z2) {
        this.sink = (SupportsCommitter) Preconditions.checkNotNull(supportsCommitter);
        this.isBatchMode = z;
        this.isCheckpointingEnabled = z2;
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorFactory
    public <T extends StreamOperator<CommittableMessage<CommT>>> T createStreamOperator(StreamOperatorParameters<CommittableMessage<CommT>> streamOperatorParameters) {
        try {
            CommitterOperator committerOperator = new CommitterOperator(this.processingTimeService, this.sink.getCommittableSerializer(), committerInitContext -> {
                return this.sink.createCommitter(committerInitContext);
            }, this.sink instanceof SupportsPostCommitTopology, this.isBatchMode, this.isCheckpointingEnabled);
            committerOperator.setup(streamOperatorParameters.getContainingTask(), streamOperatorParameters.getStreamConfig(), streamOperatorParameters.getOutput());
            return committerOperator;
        } catch (Exception e) {
            throw new IllegalStateException("Cannot create commit operator for " + streamOperatorParameters.getStreamConfig().getOperatorName(), e);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorFactory
    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
        return CommitterOperator.class;
    }
}
