/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.OptionalLong;
import javax.annotation.Nullable;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.sink.CommitRetrier;
import org.apache.flink.streaming.runtime.operators.sink.CommitterHandler;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterStateHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.BiFunctionWithException;

class SinkOperator<InputT, CommT, WriterStateT>
extends AbstractStreamOperator<byte[]>
implements OneInputStreamOperator<InputT, byte[]>,
BoundedOneInput {
    private final Context<InputT> context;
    private Long currentWatermark = Long.MIN_VALUE;
    private SinkWriter<InputT, CommT, WriterStateT> sinkWriter;
    private final SinkWriterStateHandler<WriterStateT> sinkWriterStateHandler;
    private final CommitterHandler<CommT, CommT> committerHandler;
    private CommitRetrier commitRetrier;
    @Nullable
    private final SimpleVersionedSerializer<CommT> committableSerializer;
    private final BiFunctionWithException<Sink.InitContext, List<WriterStateT>, SinkWriter<InputT, CommT, WriterStateT>, IOException> writerFactory;
    private final MailboxExecutor mailboxExecutor;
    private Counter numRecordsOutCounter;
    private boolean endOfInput = false;

    SinkOperator(ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor, BiFunctionWithException<Sink.InitContext, List<WriterStateT>, SinkWriter<InputT, CommT, WriterStateT>, IOException> writerFactory, SinkWriterStateHandler<WriterStateT> sinkWriterStateHandler, CommitterHandler<CommT, CommT> committerHandler, @Nullable SimpleVersionedSerializer<CommT> committableSerializer) {
        this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
        this.mailboxExecutor = (MailboxExecutor)Preconditions.checkNotNull((Object)mailboxExecutor);
        this.writerFactory = (BiFunctionWithException)Preconditions.checkNotNull(writerFactory);
        this.sinkWriterStateHandler = (SinkWriterStateHandler)Preconditions.checkNotNull(sinkWriterStateHandler);
        this.committerHandler = (CommitterHandler)Preconditions.checkNotNull(committerHandler);
        this.committableSerializer = committableSerializer;
        this.context = new Context();
        this.commitRetrier = new CommitRetrier(processingTimeService, committerHandler);
    }

    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<byte[]>> output) {
        super.setup(containingTask, config, output);
        this.numRecordsOutCounter = this.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        OptionalLong checkpointId = context.getRestoredCheckpointId();
        this.sinkWriter = (SinkWriter)this.writerFactory.apply((Object)this.createInitContext(checkpointId.isPresent() ? Long.valueOf(checkpointId.getAsLong()) : null), this.sinkWriterStateHandler.initializeState(context));
        this.committerHandler.initializeState(context);
        this.commitRetrier.retryWithDelay();
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.sinkWriterStateHandler.snapshotState(arg_0 -> this.sinkWriter.snapshotState(arg_0), context.getCheckpointId());
        this.committerHandler.snapshotState(context);
    }

    @Override
    public void processElement(StreamRecord<InputT> element) throws Exception {
        ((Context)this.context).element = element;
        this.sinkWriter.write(element.getValue(), this.context);
        this.numRecordsOutCounter.inc();
    }

    @Override
    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        super.prepareSnapshotPreBarrier(checkpointId);
        if (!this.endOfInput) {
            this.emitCommittables(this.committerHandler.processCommittables(this.sinkWriter.prepareCommit(false)));
        }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        this.emitCommittables(this.committerHandler.notifyCheckpointCompleted(checkpointId));
        this.commitRetrier.retryWithDelay();
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
        this.sinkWriter.writeWatermark(new org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp()));
    }

    @Override
    public void endInput() throws Exception {
        this.endOfInput = true;
        this.emitCommittables(this.committerHandler.processCommittables(this.sinkWriter.prepareCommit(true)));
        this.emitCommittables(this.committerHandler.endOfInput());
        this.commitRetrier.retryIndefinitely();
    }

    private void emitCommittables(Collection<CommT> committables) throws IOException {
        if (this.committableSerializer != null) {
            for (CommT committable : committables) {
                this.output.collect(new StreamRecord<byte[]>(SimpleVersionedSerialization.writeVersionAndSerialize(this.committableSerializer, committable)));
            }
        }
    }

    @Override
    public void close() throws Exception {
        IOUtils.closeAll((AutoCloseable[])new AutoCloseable[]{this.committerHandler, this.sinkWriter, () -> super.close()});
    }

    private Sink.InitContext createInitContext(@Nullable Long restoredCheckpointId) {
        return new InitContextImpl(this.getRuntimeContext(), this.processingTimeService, this.mailboxExecutor, (SinkWriterMetricGroup)InternalSinkWriterMetricGroup.wrap((OperatorMetricGroup)this.getMetricGroup()), restoredCheckpointId);
    }

    private static class ProcessingTimerServiceImpl
    implements Sink.ProcessingTimeService {
        private final ProcessingTimeService processingTimeService;

        public ProcessingTimerServiceImpl(ProcessingTimeService processingTimeService) {
            this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
        }

        public long getCurrentProcessingTime() {
            return this.processingTimeService.getCurrentProcessingTime();
        }

        public void registerProcessingTimer(long time, Sink.ProcessingTimeService.ProcessingTimeCallback processingTimerCallback) {
            Preconditions.checkNotNull((Object)processingTimerCallback);
            this.processingTimeService.registerTimer(time, arg_0 -> ((Sink.ProcessingTimeService.ProcessingTimeCallback)processingTimerCallback).onProcessingTime(arg_0));
        }
    }

    private static class InitContextImpl
    implements Sink.InitContext {
        private final ProcessingTimeService processingTimeService;
        private final MailboxExecutor mailboxExecutor;
        private final SinkWriterMetricGroup metricGroup;
        @Nullable
        private final Long restoredCheckpointId;
        private final StreamingRuntimeContext runtimeContext;

        public InitContextImpl(StreamingRuntimeContext runtimeContext, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor, SinkWriterMetricGroup metricGroup, @Nullable Long restoredCheckpointId) {
            this.runtimeContext = (StreamingRuntimeContext)((Object)Preconditions.checkNotNull((Object)((Object)runtimeContext)));
            this.mailboxExecutor = (MailboxExecutor)Preconditions.checkNotNull((Object)mailboxExecutor);
            this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
            this.metricGroup = (SinkWriterMetricGroup)Preconditions.checkNotNull((Object)metricGroup);
            this.restoredCheckpointId = restoredCheckpointId;
        }

        public UserCodeClassLoader getUserCodeClassLoader() {
            return new UserCodeClassLoader(){

                public ClassLoader asClassLoader() {
                    return runtimeContext.getUserCodeClassLoader();
                }

                public void registerReleaseHookIfAbsent(String releaseHookName, Runnable releaseHook) {
                    runtimeContext.registerUserCodeClassLoaderReleaseHookIfAbsent(releaseHookName, releaseHook);
                }
            };
        }

        public int getNumberOfParallelSubtasks() {
            return this.runtimeContext.getNumberOfParallelSubtasks();
        }

        public MailboxExecutor getMailboxExecutor() {
            return this.mailboxExecutor;
        }

        public Sink.ProcessingTimeService getProcessingTimeService() {
            return new ProcessingTimerServiceImpl(this.processingTimeService);
        }

        public int getSubtaskId() {
            return this.runtimeContext.getIndexOfThisSubtask();
        }

        public SinkWriterMetricGroup metricGroup() {
            return this.metricGroup;
        }

        public OptionalLong getRestoredCheckpointId() {
            return this.restoredCheckpointId == null ? OptionalLong.empty() : OptionalLong.of(this.restoredCheckpointId);
        }
    }

    private class Context<IN>
    implements SinkWriter.Context {
        private StreamRecord<IN> element;

        private Context() {
        }

        public long currentWatermark() {
            return SinkOperator.this.currentWatermark;
        }

        public Long timestamp() {
            if (this.element.hasTimestamp()) {
                return this.element.getTimestamp();
            }
            return null;
        }
    }
}

