/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;

public class TestSink<T>
implements Sink<T, String, String, String> {
    public static final String END_OF_INPUT_STR = "end of input";
    private final DefaultSinkWriter<T> writer;
    @Nullable
    private final SimpleVersionedSerializer<String> writerStateSerializer;
    @Nullable
    private final Committer<String> committer;
    @Nullable
    private final SimpleVersionedSerializer<String> committableSerializer;
    @Nullable
    private final GlobalCommitter<String, String> globalCommitter;
    @Nullable
    private final SimpleVersionedSerializer<String> globalCommittableSerializer;
    private final Collection<String> compatibleStateNames;

    private TestSink(DefaultSinkWriter<T> writer, @Nullable SimpleVersionedSerializer<String> writerStateSerializer, @Nullable Committer<String> committer, @Nullable SimpleVersionedSerializer<String> committableSerializer, @Nullable GlobalCommitter<String, String> globalCommitter, @Nullable SimpleVersionedSerializer<String> globalCommittableSerializer, Collection<String> compatibleStateNames) {
        this.writer = writer;
        this.writerStateSerializer = writerStateSerializer;
        this.committer = committer;
        this.committableSerializer = committableSerializer;
        this.globalCommitter = globalCommitter;
        this.globalCommittableSerializer = globalCommittableSerializer;
        this.compatibleStateNames = compatibleStateNames;
    }

    public SinkWriter<T, String, String> createWriter(Sink.InitContext context, List<String> states) {
        this.writer.init(context);
        this.writer.restoredFrom(states);
        this.writer.setProcessingTimerService(context.getProcessingTimeService());
        return this.writer;
    }

    public Optional<Committer<String>> createCommitter() {
        return Optional.ofNullable(this.committer);
    }

    public Optional<GlobalCommitter<String, String>> createGlobalCommitter() {
        return Optional.ofNullable(this.globalCommitter);
    }

    public Optional<SimpleVersionedSerializer<String>> getCommittableSerializer() {
        return Optional.ofNullable(this.committableSerializer);
    }

    public Optional<SimpleVersionedSerializer<String>> getGlobalCommittableSerializer() {
        return Optional.ofNullable(this.globalCommittableSerializer);
    }

    public Optional<SimpleVersionedSerializer<String>> getWriterStateSerializer() {
        return Optional.ofNullable(this.writerStateSerializer);
    }

    public Collection<String> getCompatibleStateNames() {
        return this.compatibleStateNames;
    }

    public static Builder<Integer> newBuilder() {
        return new Builder<Integer>();
    }

    public static class StringCommittableSerializer
    implements SimpleVersionedSerializer<String>,
    Serializable {
        public static final StringCommittableSerializer INSTANCE = new StringCommittableSerializer();

        public int getVersion() {
            return SimpleVersionedStringSerializer.INSTANCE.getVersion();
        }

        public byte[] serialize(String obj) {
            return SimpleVersionedStringSerializer.INSTANCE.serialize(obj);
        }

        public String deserialize(int version, byte[] serialized) throws IOException {
            return SimpleVersionedStringSerializer.INSTANCE.deserialize(version, serialized);
        }
    }

    static class RetryOnceGlobalCommitter
    extends DefaultGlobalCommitter {
        private final Set<String> seen = new LinkedHashSet<String>();

        RetryOnceGlobalCommitter() {
        }

        @Override
        public List<String> filterRecoveredCommittables(List<String> globalCommittables) {
            return globalCommittables;
        }

        @Override
        public String combine(List<String> committables) {
            return String.join((CharSequence)"|", committables);
        }

        @Override
        public void endOfInput() {
        }

        @Override
        public List<String> commit(List<String> committables) {
            committables.forEach(c -> {
                if (this.seen.remove(c)) {
                    Preconditions.checkNotNull((Object)this.committedData);
                    this.committedData.add(c);
                } else {
                    this.seen.add((String)c);
                }
            });
            return new ArrayList<String>(this.seen);
        }
    }

    static class DefaultGlobalCommitter
    extends DefaultCommitter
    implements GlobalCommitter<String, String> {
        static final Function<List<String>, String> COMBINER = strings -> {
            Collections.sort(strings);
            return String.join((CharSequence)"+", strings);
        };
        private final String committedSuccessData;

        DefaultGlobalCommitter() {
            this("");
        }

        DefaultGlobalCommitter(String committedSuccessData) {
            this.committedSuccessData = committedSuccessData;
        }

        DefaultGlobalCommitter(Supplier<Queue<String>> queueSupplier) {
            super(queueSupplier);
            this.committedSuccessData = "";
        }

        public List<String> filterRecoveredCommittables(List<String> globalCommittables) {
            if (this.committedSuccessData == null) {
                return globalCommittables;
            }
            return globalCommittables.stream().filter(s -> !s.equals(this.committedSuccessData)).collect(Collectors.toList());
        }

        public String combine(List<String> committables) {
            return COMBINER.apply(committables);
        }

        public void endOfInput() {
            this.commit(Collections.singletonList(TestSink.END_OF_INPUT_STR));
        }
    }

    static class RetryOnceCommitter
    extends DefaultCommitter
    implements Committer<String> {
        private final Set<String> seen = new LinkedHashSet<String>();

        RetryOnceCommitter() {
        }

        @Override
        public List<String> commit(List<String> committables) {
            committables.forEach(c -> {
                if (this.seen.remove(c)) {
                    Preconditions.checkNotNull((Object)this.committedData);
                    this.committedData.add(c);
                } else {
                    this.seen.add((String)c);
                }
            });
            return new ArrayList<String>(this.seen);
        }
    }

    static class DefaultCommitter
    implements Committer<String>,
    Serializable {
        @Nullable
        protected Queue<String> committedData;
        private boolean isClosed;
        @Nullable
        private final Supplier<Queue<String>> queueSupplier;

        public DefaultCommitter() {
            this.committedData = new ConcurrentLinkedQueue<String>();
            this.isClosed = false;
            this.queueSupplier = null;
        }

        public DefaultCommitter(@Nullable Supplier<Queue<String>> queueSupplier) {
            this.queueSupplier = queueSupplier;
            this.isClosed = false;
            this.committedData = null;
        }

        public List<String> getCommittedData() {
            if (this.committedData != null) {
                return new ArrayList<String>(this.committedData);
            }
            return Collections.emptyList();
        }

        public List<String> commit(List<String> committables) {
            if (this.committedData == null) {
                Assert.assertNotNull(this.queueSupplier);
                this.committedData = this.queueSupplier.get();
            }
            this.committedData.addAll(committables);
            return Collections.emptyList();
        }

        public void close() throws Exception {
            this.isClosed = true;
        }

        public boolean isClosed() {
            return this.isClosed;
        }
    }

    public static class DefaultSinkWriter<T>
    implements SinkWriter<T, String, String>,
    Serializable {
        protected List<String> elements = new ArrayList<String>();
        protected List<Watermark> watermarks = new ArrayList<Watermark>();
        protected Sink.ProcessingTimeService processingTimerService;

        protected DefaultSinkWriter() {
        }

        public void write(T element, SinkWriter.Context context) {
            this.elements.add(Tuple3.of(element, (Object)context.timestamp(), (Object)context.currentWatermark()).toString());
        }

        public void writeWatermark(Watermark watermark) throws IOException {
            this.watermarks.add(watermark);
        }

        public List<String> prepareCommit(boolean flush) {
            List<String> result = this.elements;
            this.elements = new ArrayList<String>();
            return result;
        }

        public List<String> snapshotState(long checkpointId) throws IOException {
            return Collections.emptyList();
        }

        public void close() throws Exception {
        }

        void restoredFrom(List<String> states) {
        }

        void setProcessingTimerService(Sink.ProcessingTimeService processingTimerService) {
            this.processingTimerService = processingTimerService;
        }

        public void init(Sink.InitContext context) {
        }
    }

    public static class Builder<T> {
        private DefaultSinkWriter writer = new DefaultSinkWriter();
        private SimpleVersionedSerializer<String> writerStateSerializer;
        private Committer<String> committer;
        private SimpleVersionedSerializer<String> committableSerializer;
        private GlobalCommitter<String, String> globalCommitter;
        private SimpleVersionedSerializer<String> globalCommittableSerializer;
        private Collection<String> compatibleStateNames = Collections.emptyList();

        public <W> Builder<W> setWriter(DefaultSinkWriter<W> writer) {
            this.writer = (DefaultSinkWriter)Preconditions.checkNotNull(writer);
            return this;
        }

        public Builder<T> withWriterState() {
            this.writerStateSerializer = StringCommittableSerializer.INSTANCE;
            return this;
        }

        public Builder<T> setCommitter(Committer<String> committer) {
            this.committer = committer;
            return this;
        }

        public Builder<T> setCommittableSerializer(SimpleVersionedSerializer<String> committableSerializer) {
            this.committableSerializer = committableSerializer;
            return this;
        }

        public Builder<T> setDefaultCommitter() {
            this.committer = new DefaultCommitter();
            this.committableSerializer = StringCommittableSerializer.INSTANCE;
            return this;
        }

        public Builder<T> setDefaultCommitter(Supplier<Queue<String>> queueSupplier) {
            this.committer = new DefaultCommitter(queueSupplier);
            this.committableSerializer = StringCommittableSerializer.INSTANCE;
            return this;
        }

        public Builder<T> setGlobalCommitter(GlobalCommitter<String, String> globalCommitter) {
            this.globalCommitter = globalCommitter;
            return this;
        }

        public Builder<T> setGlobalCommittableSerializer(SimpleVersionedSerializer<String> globalCommittableSerializer) {
            this.globalCommittableSerializer = globalCommittableSerializer;
            return this;
        }

        public Builder<T> setDefaultGlobalCommitter() {
            this.globalCommitter = new DefaultGlobalCommitter("");
            this.globalCommittableSerializer = StringCommittableSerializer.INSTANCE;
            return this;
        }

        public Builder<T> setGlobalCommitter(Supplier<Queue<String>> queueSupplier) {
            this.globalCommitter = new DefaultGlobalCommitter(queueSupplier);
            this.globalCommittableSerializer = StringCommittableSerializer.INSTANCE;
            return this;
        }

        public Builder<T> setCompatibleStateNames(Collection<String> compatibleStateNames) {
            this.compatibleStateNames = compatibleStateNames;
            return this;
        }

        public Builder<T> setCompatibleStateNames(String ... compatibleStateNames) {
            return this.setCompatibleStateNames(Arrays.asList(compatibleStateNames));
        }

        public TestSink<T> build() {
            return new TestSink(this.writer, this.writerStateSerializer, this.committer, this.committableSerializer, this.globalCommitter, this.globalCommittableSerializer, this.compatibleStateNames);
        }
    }
}

