/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.connector.sink.lib;

import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.operators.sink.InitContextBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class OutputFormatSink<IN>
implements Sink<IN> {
    private final OutputFormat<IN> format;

    public OutputFormatSink(OutputFormat<IN> format) {
        this.format = format;
    }

    public SinkWriter<IN> createWriter(WriterInitContext writerContext) throws IOException {
        RuntimeContext runtimeContext = null;
        if (writerContext instanceof InitContextBase) {
            runtimeContext = ((InitContextBase)writerContext).getRuntimeContext();
        }
        return new InputFormatSinkWriter<IN>(writerContext, this.format, runtimeContext);
    }

    private static class InputFormatSinkWriter<IN>
    implements SinkWriter<IN> {
        private static final Logger LOG = LoggerFactory.getLogger(InputFormatSinkWriter.class);
        private final OutputFormat<IN> format;
        private boolean cleanupCalled = false;

        public InputFormatSinkWriter(final WriterInitContext writerContext, OutputFormat<IN> format, @Nullable RuntimeContext runtimeContext) throws IOException {
            this.format = format;
            if (format instanceof RichOutputFormat) {
                ((RichOutputFormat)format).setRuntimeContext(runtimeContext);
            }
            if (runtimeContext instanceof StreamingRuntimeContext) {
                format.configure(((StreamingRuntimeContext)runtimeContext).getJobConfiguration());
            } else {
                format.configure(new Configuration());
            }
            final int indexInSubtaskGroup = writerContext.getTaskInfo().getIndexOfThisSubtask();
            final int currentNumberOfSubtasks = writerContext.getTaskInfo().getNumberOfParallelSubtasks();
            format.open(new OutputFormat.InitializationContext(){

                public int getNumTasks() {
                    return currentNumberOfSubtasks;
                }

                public int getTaskNumber() {
                    return indexInSubtaskGroup;
                }

                public int getAttemptNumber() {
                    return writerContext.getTaskInfo().getAttemptNumber();
                }
            });
        }

        public void write(IN element, SinkWriter.Context context) throws IOException {
            try {
                this.format.writeRecord(element);
            }
            catch (Exception ex) {
                this.cleanup();
                throw ex;
            }
        }

        public void flush(boolean endOfInput) {
        }

        public void close() throws Exception {
            try {
                this.format.close();
            }
            catch (Exception ex) {
                this.cleanup();
                throw ex;
            }
        }

        private void cleanup() {
            try {
                if (!this.cleanupCalled && this.format instanceof CleanupWhenUnsuccessful) {
                    this.cleanupCalled = true;
                    ((CleanupWhenUnsuccessful)this.format).tryCleanupOnError();
                }
            }
            catch (Throwable t) {
                LOG.error("Cleanup on error failed.", t);
            }
        }
    }
}

