/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.SupportsWriterState;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;

public class TestSinkV2<InputT>
implements Sink<InputT> {
    private final DefaultSinkWriter<InputT> writer;

    private TestSinkV2(DefaultSinkWriter<InputT> writer) {
        this.writer = writer;
    }

    public SinkWriter<InputT> createWriter(Sink.InitContext context) {
        this.writer.init(context);
        return this.writer;
    }

    DefaultSinkWriter<InputT> getWriter() {
        return this.writer;
    }

    public static <InputT> Builder<InputT> newBuilder() {
        return new Builder();
    }

    public static class StringSerializer
    implements SimpleVersionedSerializer<String>,
    Serializable {
        public static final StringSerializer INSTANCE = new StringSerializer();

        public int getVersion() {
            return SimpleVersionedStringSerializer.INSTANCE.getVersion();
        }

        public byte[] serialize(String obj) {
            return SimpleVersionedStringSerializer.INSTANCE.serialize(obj);
        }

        public String deserialize(int version, byte[] serialized) throws IOException {
            return SimpleVersionedStringSerializer.INSTANCE.deserialize(version, serialized);
        }
    }

    static class RetryOnceCommitter
    extends DefaultCommitter {
        private final Set<Committer.CommitRequest<String>> seen = new LinkedHashSet<Committer.CommitRequest<String>>();

        RetryOnceCommitter() {
        }

        @Override
        public void commit(Collection<Committer.CommitRequest<String>> committables) {
            committables.forEach(c -> {
                if (this.seen.remove(c)) {
                    Preconditions.checkNotNull((Object)this.committedData);
                    this.committedData.add(c);
                } else {
                    this.seen.add((Committer.CommitRequest<String>)c);
                    c.retryLater();
                }
            });
        }
    }

    public static class DefaultCommitter
    implements Committer<String>,
    Serializable {
        @Nullable
        protected Queue<Committer.CommitRequest<String>> committedData;
        private boolean isClosed;
        @Nullable
        private final Supplier<Queue<Committer.CommitRequest<String>>> queueSupplier;

        public DefaultCommitter() {
            this.committedData = new ConcurrentLinkedQueue<Committer.CommitRequest<String>>();
            this.isClosed = false;
            this.queueSupplier = null;
        }

        public DefaultCommitter(@Nullable Supplier<Queue<Committer.CommitRequest<String>>> queueSupplier) {
            this.queueSupplier = queueSupplier;
            this.isClosed = false;
            this.committedData = null;
        }

        public List<Committer.CommitRequest<String>> getCommittedData() {
            if (this.committedData != null) {
                return new ArrayList<Committer.CommitRequest<String>>(this.committedData);
            }
            return Collections.emptyList();
        }

        public void commit(Collection<Committer.CommitRequest<String>> committables) {
            if (this.committedData == null) {
                Assert.assertNotNull(this.queueSupplier);
                this.committedData = this.queueSupplier.get();
            }
            this.committedData.addAll(committables);
        }

        public void close() throws Exception {
            this.isClosed = true;
        }

        public boolean isClosed() {
            return this.isClosed;
        }

        public void init() {
        }
    }

    protected static class DefaultStatefulSinkWriter<InputT>
    extends DefaultCommittingSinkWriter<InputT>
    implements StatefulSinkWriter<InputT, String> {
        protected DefaultStatefulSinkWriter() {
        }

        public List<String> snapshotState(long checkpointId) throws IOException {
            return this.elements;
        }

        protected void restore(Collection<String> recoveredState) {
            this.elements = new ArrayList<String>(recoveredState);
        }
    }

    protected static class DefaultCommittingSinkWriter<InputT>
    extends DefaultSinkWriter<InputT>
    implements CommittingSinkWriter<InputT, String>,
    Serializable {
        protected DefaultCommittingSinkWriter() {
        }

        @Override
        public void flush(boolean endOfInput) throws IOException, InterruptedException {
        }

        public Collection<String> prepareCommit() {
            List result = this.elements;
            this.elements = new ArrayList();
            return result;
        }
    }

    public static class DefaultSinkWriter<InputT>
    implements SinkWriter<InputT>,
    Serializable {
        protected List<String> elements = new ArrayList<String>();
        protected List<Watermark> watermarks = new ArrayList<Watermark>();

        protected DefaultSinkWriter() {
        }

        public void write(InputT element, SinkWriter.Context context) {
            this.elements.add(Tuple3.of(element, (Object)context.timestamp(), (Object)context.currentWatermark()).toString());
        }

        public void flush(boolean endOfInput) throws IOException, InterruptedException {
            this.elements = new ArrayList<String>();
        }

        public void writeWatermark(Watermark watermark) {
            this.watermarks.add(watermark);
        }

        public void close() throws Exception {
        }

        public void init(Sink.InitContext context) {
        }
    }

    private static class TestStatefulSinkV2<InputT>
    extends TestSinkV2WithPostCommitTopology<InputT>
    implements SupportsWriterState<InputT, String>,
    SupportsWriterState.WithCompatibleState {
        private String compatibleState;

        public TestStatefulSinkV2(DefaultStatefulSinkWriter<InputT> writer, SimpleVersionedSerializer<String> committableSerializer, DefaultCommitter committer, String compatibleState) {
            super(writer, committableSerializer, committer);
            this.compatibleState = compatibleState;
        }

        @Override
        public DefaultStatefulSinkWriter<InputT> createWriter(Sink.InitContext context) {
            return (DefaultStatefulSinkWriter)super.createWriter(context);
        }

        public StatefulSinkWriter<InputT, String> restoreWriter(WriterInitContext context, Collection<String> recoveredState) {
            DefaultStatefulSinkWriter statefulWriter = (DefaultStatefulSinkWriter)this.getWriter();
            statefulWriter.restore(recoveredState);
            return statefulWriter;
        }

        public SimpleVersionedSerializer<String> getWriterStateSerializer() {
            return new StringSerializer();
        }

        public Collection<String> getCompatibleWriterStateNames() {
            return this.compatibleState == null ? ImmutableSet.of() : ImmutableSet.of((Object)this.compatibleState);
        }
    }

    private static class TestSinkV2WithPreCommitTopology<InputT>
    extends TestSinkV2TwoPhaseCommittingSink<InputT>
    implements SupportsPreCommitTopology<String, String> {
        public TestSinkV2WithPreCommitTopology(DefaultSinkWriter<InputT> writer, SimpleVersionedSerializer<String> committableSerializer, DefaultCommitter committer) {
            super(writer, committableSerializer, committer);
        }

        public DataStream<CommittableMessage<String>> addPreCommitTopology(DataStream<CommittableMessage<String>> committables) {
            return committables.map((MapFunction & Serializable)m -> {
                if (m instanceof CommittableSummary) {
                    return m;
                }
                CommittableWithLineage withLineage = (CommittableWithLineage)m;
                return withLineage.map(old -> old + "Transformed");
            }).returns(CommittableMessageTypeInfo.of(StringSerializer::new));
        }

        public SimpleVersionedSerializer<String> getWriteResultSerializer() {
            return new StringSerializer();
        }
    }

    private static class TestSinkV2WithPostCommitTopology<InputT>
    extends TestSinkV2TwoPhaseCommittingSink<InputT>
    implements SupportsPostCommitTopology<String> {
        public TestSinkV2WithPostCommitTopology(DefaultSinkWriter<InputT> writer, SimpleVersionedSerializer<String> committableSerializer, DefaultCommitter committer) {
            super(writer, committableSerializer, committer);
        }

        public void addPostCommitTopology(DataStream<CommittableMessage<String>> committables) {
        }
    }

    private static class TestSinkV2TwoPhaseCommittingSink<InputT>
    extends TestSinkV2<InputT>
    implements SupportsCommitter<String> {
        private final DefaultCommitter committer;
        private final SimpleVersionedSerializer<String> committableSerializer;

        public TestSinkV2TwoPhaseCommittingSink(DefaultSinkWriter<InputT> writer, SimpleVersionedSerializer<String> committableSerializer, DefaultCommitter committer) {
            super(writer);
            this.committer = committer;
            this.committableSerializer = committableSerializer;
        }

        public Committer<String> createCommitter(CommitterInitContext context) {
            this.committer.init();
            return this.committer;
        }

        public SimpleVersionedSerializer<String> getCommittableSerializer() {
            return this.committableSerializer;
        }

        public SinkWriter<InputT> createWriter(WriterInitContext context) throws IOException {
            return super.createWriter(context);
        }
    }

    public static class Builder<InputT> {
        private DefaultSinkWriter<InputT> writer = null;
        private DefaultCommitter committer;
        private SimpleVersionedSerializer<String> committableSerializer;
        private boolean withPostCommitTopology = false;
        private boolean withPreCommitTopology = false;
        private boolean withWriterState = false;
        private String compatibleStateNames;

        public Builder<InputT> setWriter(DefaultSinkWriter<InputT> writer) {
            this.writer = (DefaultSinkWriter)Preconditions.checkNotNull(writer);
            return this;
        }

        public Builder<InputT> setCommitter(DefaultCommitter committer) {
            this.committer = committer;
            return this;
        }

        public Builder<InputT> setCommittableSerializer(SimpleVersionedSerializer<String> committableSerializer) {
            this.committableSerializer = committableSerializer;
            return this;
        }

        public Builder<InputT> setDefaultCommitter() {
            this.committer = new DefaultCommitter();
            this.committableSerializer = StringSerializer.INSTANCE;
            return this;
        }

        public Builder<InputT> setDefaultCommitter(Supplier<Queue<Committer.CommitRequest<String>>> queueSupplier) {
            this.committer = new DefaultCommitter(queueSupplier);
            this.committableSerializer = StringSerializer.INSTANCE;
            return this;
        }

        public Builder<InputT> setWithPostCommitTopology(boolean withPostCommitTopology) {
            this.withPostCommitTopology = withPostCommitTopology;
            return this;
        }

        public Builder<InputT> setWithPreCommitTopology(boolean withPreCommitTopology) {
            this.withPreCommitTopology = withPreCommitTopology;
            return this;
        }

        public Builder<InputT> setWriterState(boolean withWriterState) {
            this.withWriterState = withWriterState;
            return this;
        }

        public Builder<InputT> setCompatibleStateNames(String compatibleStateNames) {
            this.compatibleStateNames = compatibleStateNames;
            return this;
        }

        public TestSinkV2<InputT> build() {
            if (this.committer == null) {
                if (this.writer == null) {
                    this.writer = new DefaultSinkWriter();
                }
                return new TestSinkV2(this.writer);
            }
            if (this.writer == null) {
                this.writer = new DefaultCommittingSinkWriter();
            }
            if (!this.withPostCommitTopology) {
                if (!this.withPreCommitTopology) {
                    return new TestSinkV2TwoPhaseCommittingSink<InputT>(this.writer, this.committableSerializer, this.committer);
                }
                Preconditions.checkArgument((boolean)(this.writer instanceof DefaultCommittingSinkWriter), (Object)"Please provide a DefaultCommittingSinkWriter instance");
                return new TestSinkV2WithPreCommitTopology((DefaultCommittingSinkWriter)this.writer, this.committableSerializer, this.committer);
            }
            if (this.withWriterState) {
                Preconditions.checkArgument((boolean)(this.writer instanceof DefaultStatefulSinkWriter), (Object)"Please provide a DefaultStatefulSinkWriter instance");
                return new TestStatefulSinkV2((DefaultStatefulSinkWriter)this.writer, this.committableSerializer, this.committer, this.compatibleStateNames);
            }
            Preconditions.checkArgument((boolean)(this.writer instanceof DefaultCommittingSinkWriter), (Object)"Please provide a DefaultCommittingSinkWriter instance");
            return new TestSinkV2WithPostCommitTopology((DefaultCommittingSinkWriter)this.writer, this.committableSerializer, this.committer);
        }
    }
}

