/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.SupportsWriterState;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializerAdapter;
import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableSet;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableFunction;
import org.apache.flink.util.function.SerializableSupplier;

public class TestSinkV2<InputT>
implements Sink<InputT> {
    public static final SimpleVersionedSerializer<Integer> WRITER_SERIALIZER = new SimpleVersionedSerializerAdapter((TypeSerializer)IntSerializer.INSTANCE);
    private final SinkWriter<InputT> writer;

    private TestSinkV2(SinkWriter<InputT> writer) {
        this.writer = writer;
    }

    public SinkWriter<InputT> createWriter(WriterInitContext context) {
        if (this.writer instanceof DefaultSinkWriter) {
            ((DefaultSinkWriter)this.writer).init(context);
        }
        return this.writer;
    }

    SinkWriter<InputT> getWriter() {
        return this.writer;
    }

    public static <InputT> Builder<InputT, Record<InputT>> newBuilder() {
        return new Builder();
    }

    public static <InputT> Builder<InputT, Record<InputT>> newBuilder(DefaultSinkWriter<InputT> writer) {
        return new Builder().setWriter(writer);
    }

    static class RetryOnceCommitter<CommT>
    extends DefaultCommitter<CommT> {
        private final Set<CommT> seen = new LinkedHashSet<CommT>();

        RetryOnceCommitter() {
        }

        @Override
        public void commit(Collection<Committer.CommitRequest<CommT>> committables) {
            committables.forEach(c -> {
                if (!this.seen.remove(c.getCommittable())) {
                    this.seen.add(c.getCommittable());
                    c.retryLater();
                }
            });
        }
    }

    public static class DefaultCommitter<CommT>
    implements Committer<CommT>,
    Serializable {
        private boolean isClosed = false;

        public void commit(Collection<Committer.CommitRequest<CommT>> committables) {
        }

        public void close() throws Exception {
            this.isClosed = true;
        }

        public boolean isClosed() {
            return this.isClosed;
        }

        public void init() {
        }
    }

    protected static class DefaultStatefulSinkWriter<InputT>
    extends DefaultCommittingSinkWriter<InputT>
    implements StatefulSinkWriter<InputT, Integer> {
        private int recordCount = 0;

        protected DefaultStatefulSinkWriter() {
        }

        @Override
        public void write(InputT element, SinkWriter.Context context) {
            super.write(element, context);
            ++this.recordCount;
        }

        public int getRecordCount() {
            return this.recordCount;
        }

        public List<Integer> snapshotState(long checkpointId) throws IOException {
            this.lastCheckpointId = checkpointId;
            return Collections.singletonList(this.recordCount);
        }

        protected void restore(Collection<Integer> recoveredState) {
            this.recordCount = recoveredState.isEmpty() ? 0 : recoveredState.iterator().next();
        }
    }

    protected static class ForwardCommittingSinkWriter<InputT>
    extends DefaultSinkWriter<InputT>
    implements CommittingSinkWriter<InputT, InputT>,
    Serializable {
        protected ForwardCommittingSinkWriter() {
        }

        @Override
        public void flush(boolean endOfInput) throws IOException, InterruptedException {
        }

        public Collection<InputT> prepareCommit() {
            List result = this.elements.stream().map(Record::getValue).collect(Collectors.toList());
            this.elements = new ArrayList();
            return result;
        }
    }

    protected static class DefaultCommittingSinkWriter<InputT>
    extends DefaultSinkWriter<InputT>
    implements CommittingSinkWriter<InputT, Record<InputT>>,
    Serializable {
        protected DefaultCommittingSinkWriter() {
        }

        @Override
        public void flush(boolean endOfInput) throws IOException, InterruptedException {
        }

        public Collection<Record<InputT>> prepareCommit() {
            List result = this.elements;
            this.elements = new ArrayList();
            return result;
        }
    }

    public static class DefaultSinkWriter<InputT>
    implements SinkWriter<InputT>,
    Serializable {
        protected List<Record<InputT>> elements = new ArrayList<Record<InputT>>();
        protected List<Watermark> watermarks = new ArrayList<Watermark>();
        public long lastCheckpointId = -1L;

        protected DefaultSinkWriter() {
        }

        public void write(InputT element, SinkWriter.Context context) {
            this.elements.add(new Record<InputT>(element, context.timestamp(), context.currentWatermark()));
        }

        public void flush(boolean endOfInput) throws IOException, InterruptedException {
            this.elements = new ArrayList<Record<InputT>>();
        }

        public List<Record<InputT>> getRecordsOfCurrentCheckpoint() {
            return this.elements;
        }

        public List<Watermark> getWatermarks() {
            return this.watermarks;
        }

        public long getLastCheckpointId() {
            return this.lastCheckpointId;
        }

        public void writeWatermark(Watermark watermark) {
            this.watermarks.add(watermark);
        }

        public void close() throws Exception {
        }

        public void init(WriterInitContext context) {
        }
    }

    public static class RecordSerializer<T>
    implements SimpleVersionedSerializer<Record<T>> {
        public int getVersion() {
            return 0;
        }

        public byte[] serialize(Record<T> record) throws IOException {
            return InstantiationUtil.serializeObject(record);
        }

        public Record<T> deserialize(int version, byte[] serialized) throws IOException {
            try {
                return (Record)InstantiationUtil.deserializeObject((byte[])serialized, (ClassLoader)this.getClass().getClassLoader());
            }
            catch (ClassNotFoundException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Record<T>
    implements Serializable {
        private final T value;
        private final Long timestamp;
        private final long watermark;

        public Record(T value, Long timestamp, long watermark) {
            this.value = value;
            this.timestamp = timestamp;
            this.watermark = watermark;
        }

        public T getValue() {
            return this.value;
        }

        public Long getTimestamp() {
            return this.timestamp;
        }

        public long getWatermark() {
            return this.watermark;
        }

        public boolean equals(Object object) {
            if (this == object) {
                return true;
            }
            if (object == null || this.getClass() != object.getClass()) {
                return false;
            }
            Record record = (Record)object;
            return Objects.equals(this.timestamp, record.timestamp) && this.watermark == record.watermark && Objects.equals(this.value, record.value);
        }

        public int hashCode() {
            return Objects.hash(this.value, this.timestamp, this.watermark);
        }

        public String toString() {
            return "Record{value=" + this.value + ", timestamp=" + this.timestamp + ", watermark=" + this.watermark + "}";
        }

        public Record<T> withValue(T value) {
            return new Record<T>(value, this.timestamp, this.watermark);
        }
    }

    private static class TestStatefulSinkV2<InputT, CommT>
    extends TestSinkV2WithPostCommitTopology<InputT, CommT>
    implements SupportsWriterState<InputT, Integer>,
    SupportsWriterState.WithCompatibleState {
        private final String compatibleState;

        public TestStatefulSinkV2(SinkWriter<InputT> writer, SerializableSupplier<SimpleVersionedSerializer<CommT>> commSerializerFactory, Committer<CommT> committer, String compatibleState) {
            super(writer, commSerializerFactory, committer);
            this.compatibleState = compatibleState;
        }

        @Override
        public DefaultStatefulSinkWriter<InputT> createWriter(WriterInitContext context) {
            return (DefaultStatefulSinkWriter)super.createWriter(context);
        }

        public StatefulSinkWriter<InputT, Integer> restoreWriter(WriterInitContext context, Collection<Integer> recoveredState) {
            DefaultStatefulSinkWriter statefulWriter = (DefaultStatefulSinkWriter)this.getWriter();
            statefulWriter.restore(recoveredState);
            return statefulWriter;
        }

        public SimpleVersionedSerializer<Integer> getWriterStateSerializer() {
            return WRITER_SERIALIZER;
        }

        public Collection<String> getCompatibleWriterStateNames() {
            return this.compatibleState == null ? ImmutableSet.of() : ImmutableSet.of((Object)this.compatibleState);
        }
    }

    private static class TestSinkV2WithPreCommitTopology<InputT, CommT>
    extends TestSinkV2TwoPhaseCommittingSink<InputT, CommT>
    implements SupportsPreCommitTopology<CommT, CommT> {
        private final SerializableFunction<CommT, CommT> preCommitTopology;

        public TestSinkV2WithPreCommitTopology(SinkWriter<InputT> writer, SerializableSupplier<SimpleVersionedSerializer<CommT>> commSerializerFactory, Committer<CommT> committer, SerializableFunction<CommT, CommT> preCommitTopology) {
            super(writer, commSerializerFactory, committer);
            this.preCommitTopology = preCommitTopology;
        }

        public DataStream<CommittableMessage<CommT>> addPreCommitTopology(DataStream<CommittableMessage<CommT>> committables) {
            return committables.map((MapFunction & Serializable)m -> {
                if (m instanceof CommittableSummary) {
                    return m;
                }
                CommittableWithLineage withLineage = (CommittableWithLineage)m;
                return withLineage.map(this.preCommitTopology);
            }).returns(CommittableMessageTypeInfo.of((SerializableSupplier)this.commSerializerFactory));
        }

        public SimpleVersionedSerializer<CommT> getWriteResultSerializer() {
            return (SimpleVersionedSerializer)this.commSerializerFactory.get();
        }
    }

    private static class TestSinkV2WithPostCommitTopology<InputT, CommT>
    extends TestSinkV2TwoPhaseCommittingSink<InputT, CommT>
    implements SupportsPostCommitTopology<CommT> {
        public TestSinkV2WithPostCommitTopology(SinkWriter<InputT> writer, SerializableSupplier<SimpleVersionedSerializer<CommT>> commSerializerFactory, Committer<CommT> committer) {
            super(writer, commSerializerFactory, committer);
        }

        public void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables) {
            StandardSinkTopologies.addGlobalCommitter(committables, this::createCommitter, this::getCommittableSerializer);
        }
    }

    private static class TestSinkV2TwoPhaseCommittingSink<InputT, CommT>
    extends TestSinkV2<InputT>
    implements SupportsCommitter<CommT> {
        private final Committer<CommT> committer;
        protected final SerializableSupplier<SimpleVersionedSerializer<CommT>> commSerializerFactory;

        public TestSinkV2TwoPhaseCommittingSink(SinkWriter<InputT> writer, SerializableSupplier<SimpleVersionedSerializer<CommT>> commSerializerFactory, Committer<CommT> committer) {
            super(writer);
            this.committer = committer;
            this.commSerializerFactory = commSerializerFactory;
        }

        public Committer<CommT> createCommitter(CommitterInitContext context) {
            if (this.committer instanceof DefaultCommitter) {
                ((DefaultCommitter)this.committer).init();
            }
            return this.committer;
        }

        public SimpleVersionedSerializer<CommT> getCommittableSerializer() {
            return (SimpleVersionedSerializer)this.commSerializerFactory.get();
        }
    }

    public static class Builder<InputT, CommT> {
        private SinkWriter<InputT> writer = null;
        private Committer<CommT> committer;
        private boolean withPostCommitTopology = false;
        private SerializableFunction<CommT, CommT> preCommitTopology = null;
        private boolean withWriterState = false;
        private String compatibleStateNames;
        private SerializableSupplier<SimpleVersionedSerializer<CommT>> commSerializerFactory;

        public <NewInputT, NewCommT> Builder<NewInputT, NewCommT> setWriter(CommittingSinkWriter<NewInputT, NewCommT> writer) {
            Builder self = this;
            self.writer = (SinkWriter)Preconditions.checkNotNull(writer);
            return self;
        }

        public <NewInputT> Builder<NewInputT, CommT> setWriter(SinkWriter<NewInputT> writer) {
            Builder self = this;
            self.writer = (SinkWriter)Preconditions.checkNotNull(writer);
            return self;
        }

        public Builder<InputT, CommT> setCommitter(Committer<CommT> committer, SerializableSupplier<SimpleVersionedSerializer<CommT>> commSerializerFactory) {
            this.committer = committer;
            this.commSerializerFactory = commSerializerFactory;
            return this;
        }

        public Builder<InputT, CommT> setWithPostCommitTopology(boolean withPostCommitTopology) {
            this.withPostCommitTopology = withPostCommitTopology;
            return this;
        }

        public Builder<InputT, CommT> setWithPreCommitTopology(SerializableFunction<CommT, CommT> preCommitTopology) {
            this.preCommitTopology = preCommitTopology;
            return this;
        }

        public Builder<InputT, CommT> setWriterState(boolean withWriterState) {
            this.withWriterState = withWriterState;
            return this;
        }

        public Builder<InputT, CommT> setCompatibleStateNames(String compatibleStateNames) {
            this.compatibleStateNames = compatibleStateNames;
            return this;
        }

        public TestSinkV2<InputT> build() {
            if (this.committer == null) {
                if (this.writer == null) {
                    this.writer = new DefaultSinkWriter();
                }
                return new TestSinkV2<InputT>(this.writer);
            }
            if (this.writer == null) {
                this.writer = new DefaultCommittingSinkWriter();
            }
            if (!this.withPostCommitTopology) {
                if (this.preCommitTopology == null) {
                    return new TestSinkV2TwoPhaseCommittingSink<InputT, CommT>(this.writer, this.commSerializerFactory, this.committer);
                }
                return new TestSinkV2WithPreCommitTopology<InputT, CommT>(this.writer, this.commSerializerFactory, this.committer, this.preCommitTopology);
            }
            if (this.withWriterState) {
                return new TestStatefulSinkV2<InputT, CommT>(this.writer, this.commSerializerFactory, this.committer, this.compatibleStateNames);
            }
            return new TestSinkV2WithPostCommitTopology<InputT, CommT>(this.writer, this.commSerializerFactory, this.committer);
        }
    }
}

