/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil;
import org.apache.flink.streaming.runtime.operators.sink.SinkV1WriterCommittableSerializer;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.OptionalAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

abstract class SinkWriterOperatorTestBase {
    SinkWriterOperatorTestBase() {
    }

    @Test
    void testNotEmitCommittablesWithoutCommitter() throws Exception {
        SinkAndSuppliers sinkAndSuppliers = this.sinkWithoutCommitter();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sinkAndSuppliers.sink));
        testHarness.open();
        testHarness.processElement(1, 1L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        Assertions.assertThat(sinkAndSuppliers.elementSupplier.get()).containsOnly((Object[])new String[]{"(1,1,-9223372036854775808)"});
        testHarness.prepareSnapshotPreBarrier(1L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        Assertions.assertThat(sinkAndSuppliers.elementSupplier.get()).isEmpty();
        testHarness.close();
    }

    @Test
    void testWatermarkPropagatedToSinkWriter() throws Exception {
        long initialTime = 0L;
        SinkAndSuppliers sinkAndSuppliers = this.sinkWithoutCommitter();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sinkAndSuppliers.sink));
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processWatermark(1L);
        Assertions.assertThat(testHarness.getOutput()).containsExactly(new Object[]{new Watermark(0L), new Watermark(1L)});
        Assertions.assertThat(sinkAndSuppliers.watermarkSupplier.get()).containsExactly((Object[])new org.apache.flink.api.common.eventtime.Watermark[]{new org.apache.flink.api.common.eventtime.Watermark(0L), new org.apache.flink.api.common.eventtime.Watermark(1L)});
        testHarness.close();
    }

    @Test
    void testTimeBasedBufferingSinkWriter() throws Exception {
        long initialTime = 0L;
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(this.sinkWithTimeBasedWriter().sink));
        testHarness.open();
        testHarness.setProcessingTime(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        SinkWriterOperatorTestBase.assertBasicOutput(testHarness.getOutput(), 0, 1L);
        testHarness.getOutput().poll();
        testHarness.getProcessingTimeService().setCurrentTime(2001L);
        testHarness.prepareSnapshotPreBarrier(2L);
        SinkWriterOperatorTestBase.assertBasicOutput(testHarness.getOutput(), 2, 2L);
        testHarness.close();
    }

    @Test
    void testEmitOnFlushWithCommitter() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(this.sinkWithCommitter().sink));
        testHarness.open();
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        SinkWriterOperatorTestBase.assertBasicOutput(testHarness.getOutput(), 2, 1L);
        testHarness.close();
    }

    @Test
    void testEmitOnEndOfInputInBatchMode() throws Exception {
        SinkWriterOperatorFactory writerOperatorFactory = new SinkWriterOperatorFactory(this.sinkWithCommitter().sink);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(writerOperatorFactory);
        testHarness.open();
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        testHarness.processElement(1, 1L);
        testHarness.endInput();
        SinkWriterOperatorTestBase.assertBasicOutput(testHarness.getOutput(), 1, Long.MAX_VALUE);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testStateRestore(boolean stateful) throws Exception {
        long initialTime = 0L;
        SinkAndSuppliers sinkAndSuppliers = this.sinkWithSnapshottingWriter(stateful, null);
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = SinkWriterOperatorTestBase.createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers.sink);
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L);
        ((AbstractCollectionAssert)Assertions.assertThat(testHarness.getOutput()).hasSize(2)).contains(new Object[]{new Watermark(0L)});
        Assertions.assertThat((long)sinkAndSuppliers.lastCheckpointSupplier.getAsLong()).isEqualTo(stateful ? 1L : -1L);
        testHarness.close();
        SinkAndSuppliers restoredSink = this.sinkWithSnapshottingWriter(stateful, null);
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> restoredTestHarness = SinkWriterOperatorTestBase.createTestHarnessWithBufferingSinkWriter(restoredSink.sink);
        restoredTestHarness.initializeState(snapshot);
        restoredTestHarness.open();
        restoredTestHarness.endInput();
        long checkpointId = 2L;
        restoredTestHarness.prepareSnapshotPreBarrier(2L);
        restoredTestHarness.notifyOfCompletedCheckpoint(2L);
        if (stateful) {
            SinkWriterOperatorTestBase.assertBasicOutput(restoredTestHarness.getOutput(), 2, Long.MAX_VALUE);
        } else {
            ((ObjectAssert)Assertions.assertThat((Object)SinkTestUtil.fromOutput(restoredTestHarness.getOutput()).get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class)).satisfies(new ThrowingConsumer[]{cs -> SinkV2Assertions.assertThat((CommittableSummary)cs).hasOverallCommittables(0).hasPendingCommittables(0).hasFailedCommittables(0)});
        }
        restoredTestHarness.close();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testLoadPreviousSinkState(boolean stateful) throws Exception {
        List<String> previousSinkInputs = Arrays.asList("bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", "cost", "prompt");
        SinkAndSuppliers sinkAndSuppliers = this.sinkWithSnapshottingWriter(stateful, "dummy_sink_state");
        OneInputStreamOperatorTestHarness<String, String> previousSink = new OneInputStreamOperatorTestHarness<String, String>((OneInputStreamOperator<String, String>)new DummySinkOperator(sinkAndSuppliers.serializerSupplier.get()), (TypeSerializer<String>)StringSerializer.INSTANCE);
        OperatorSubtaskState previousSinkState = TestHarnessUtil.buildSubtaskState(previousSink, previousSinkInputs);
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> compatibleWriterOperator = SinkWriterOperatorTestBase.createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers.sink);
        ArrayList<String> expectedOutput1 = stateful ? new ArrayList<String>(previousSinkInputs) : new ArrayList();
        expectedOutput1.add(Tuple3.of((Object)1, (Object)1, (Object)Long.MIN_VALUE).toString());
        compatibleWriterOperator.initializeState(previousSinkState);
        compatibleWriterOperator.open();
        compatibleWriterOperator.processElement(1, 1L);
        compatibleWriterOperator.endInput();
        compatibleWriterOperator.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState operatorStateWithoutPreviousState = compatibleWriterOperator.snapshot(1L, 1L);
        compatibleWriterOperator.close();
        SinkWriterOperatorTestBase.assertEmitted(expectedOutput1, compatibleWriterOperator.getOutput());
        SinkAndSuppliers sinkAndSuppliers2 = this.sinkWithSnapshottingWriter(stateful, "dummy_sink_state");
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> restoredSinkOperator = SinkWriterOperatorTestBase.createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers2.sink);
        List<String> expectedOutput2 = Arrays.asList(Tuple3.of((Object)2, (Object)2, (Object)Long.MIN_VALUE).toString(), Tuple3.of((Object)3, (Object)3, (Object)Long.MIN_VALUE).toString());
        restoredSinkOperator.initializeState(operatorStateWithoutPreviousState);
        restoredSinkOperator.open();
        restoredSinkOperator.processElement(2, 2L);
        restoredSinkOperator.processElement(3, 3L);
        restoredSinkOperator.endInput();
        restoredSinkOperator.prepareSnapshotPreBarrier(2L);
        SinkWriterOperatorTestBase.assertEmitted(expectedOutput2, restoredSinkOperator.getOutput());
        restoredSinkOperator.close();
    }

    @Test
    void testRestoreCommitterState() throws Exception {
        List<String> committables = Arrays.asList("state1", "state2");
        SinkAndSuppliers sinkAndSuppliers = this.sinkWithCommitter();
        OneInputStreamOperatorTestHarness<String, String> committer = new OneInputStreamOperatorTestHarness<String, String>((OneInputStreamOperator<String, String>)new TestCommitterOperator(sinkAndSuppliers.serializerSupplier.get()), (TypeSerializer<String>)StringSerializer.INSTANCE);
        OperatorSubtaskState committerState = TestHarnessUtil.buildSubtaskState(committer, committables);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sinkAndSuppliers.sink));
        testHarness.initializeState(committerState);
        testHarness.open();
        testHarness.prepareSnapshotPreBarrier(2L);
        List<StreamElement> output = SinkTestUtil.fromOutput(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(4);
        ((ObjectAssert)Assertions.assertThat((Object)output.get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class)).satisfies(new ThrowingConsumer[]{cs -> SinkV2Assertions.assertThat((CommittableSummary)cs).hasPendingCommittables(committables.size()).hasCheckpointId(1L).hasOverallCommittables(committables.size()).hasFailedCommittables(0)});
        SinkWriterOperatorTestBase.assertRestoredCommitterCommittable(output.get(1).asRecord().getValue(), committables.get(0));
        SinkWriterOperatorTestBase.assertRestoredCommitterCommittable(output.get(2).asRecord().getValue(), committables.get(1));
        ((ObjectAssert)Assertions.assertThat((Object)output.get(3).asRecord().getValue()).isInstanceOf(CommittableSummary.class)).satisfies(new ThrowingConsumer[]{cs -> SinkV2Assertions.assertThat((CommittableSummary)cs).hasPendingCommittables(0).hasCheckpointId(2L).hasOverallCommittables(0).hasFailedCommittables(0)});
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception {
        SinkAndSuppliers sinkAndSuppliers = this.sinkWithCommitter();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sinkAndSuppliers.sink));
        testHarness.open();
        testHarness.processElement(1, 1L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        String record = "(1,1,-9223372036854775808)";
        Assertions.assertThat(sinkAndSuppliers.elementSupplier.get()).containsOnly((Object[])new String[]{"(1,1,-9223372036854775808)"});
        testHarness.endInput();
        if (isCheckpointingEnabled) {
            testHarness.prepareSnapshotPreBarrier(1L);
        }
        SinkWriterOperatorTestBase.assertEmitted(Collections.singletonList("(1,1,-9223372036854775808)"), testHarness.getOutput());
        Assertions.assertThat(sinkAndSuppliers.elementSupplier.get()).isEmpty();
        testHarness.close();
    }

    @Test
    void testInitContext() throws Exception {
        AtomicReference initContext = new AtomicReference();
        Sink & Serializable sink = (Sink & Serializable)context -> {
            initContext.set(context);
            return null;
        };
        boolean subtaskId = true;
        int parallelism = 10;
        StringSerializer typeSerializer = StringSerializer.INSTANCE;
        JobID jobID = new JobID();
        MockEnvironment environment = MockEnvironment.builder().setSubtaskIndex(1).setParallelism(10).setMaxParallelism(10).setJobID(jobID).setExecutionConfig(new ExecutionConfig().enableObjectReuse()).build();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory((Sink)sink), typeSerializer, environment);
        testHarness.open();
        Assertions.assertThat((Object)((Sink.InitContext)initContext.get()).getUserCodeClassLoader()).isNotNull();
        Assertions.assertThat((Object)((Sink.InitContext)initContext.get()).getMailboxExecutor()).isNotNull();
        Assertions.assertThat((Object)((Sink.InitContext)initContext.get()).getProcessingTimeService()).isNotNull();
        Assertions.assertThat((int)((Sink.InitContext)initContext.get()).getTaskInfo().getIndexOfThisSubtask()).isEqualTo(1);
        Assertions.assertThat((int)((Sink.InitContext)initContext.get()).getTaskInfo().getNumberOfParallelSubtasks()).isEqualTo(10);
        Assertions.assertThat((int)((Sink.InitContext)initContext.get()).getTaskInfo().getAttemptNumber()).isZero();
        Assertions.assertThat((Object)((Sink.InitContext)initContext.get()).metricGroup()).isNotNull();
        Assertions.assertThat((OptionalLong)((Sink.InitContext)initContext.get()).getRestoredCheckpointId()).isNotPresent();
        Assertions.assertThat((boolean)((Sink.InitContext)initContext.get()).isObjectReuseEnabled()).isTrue();
        Assertions.assertThat((Object)((Sink.InitContext)initContext.get()).createInputSerializer()).isEqualTo((Object)typeSerializer);
        Assertions.assertThat((Comparable)((Sink.InitContext)initContext.get()).getJobInfo().getJobId()).isEqualTo((Object)jobID);
        testHarness.close();
    }

    @Test
    void testInitContextWrapper() throws Exception {
        final AtomicReference initContext = new AtomicReference();
        final AtomicReference originalContext = new AtomicReference();
        AtomicBoolean consumed = new AtomicBoolean(false);
        final Consumer<AtomicBoolean> metadataConsumer = element -> element.set(true);
        Sink<String> sink = new Sink<String>(){

            public SinkWriter<String> createWriter(WriterInitContext context) throws IOException {
                WriterInitContext decoratedContext = (WriterInitContext)Proxy.newProxyInstance(WriterInitContext.class.getClassLoader(), new Class[]{WriterInitContext.class}, (proxy, method, args) -> {
                    if (method.getName().equals("metadataConsumer")) {
                        return Optional.of(metadataConsumer);
                    }
                    return method.invoke((Object)context, args);
                });
                originalContext.set(decoratedContext);
                return super.createWriter(decoratedContext);
            }

            public SinkWriter<String> createWriter(Sink.InitContext context) {
                initContext.set(context);
                return null;
            }
        };
        boolean subtaskId = true;
        int parallelism = 10;
        StringSerializer typeSerializer = StringSerializer.INSTANCE;
        JobID jobID = new JobID();
        MockEnvironment environment = MockEnvironment.builder().setSubtaskIndex(1).setParallelism(10).setMaxParallelism(10).setJobID(jobID).setExecutionConfig(new ExecutionConfig().enableObjectReuse()).build();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory((Sink)sink), typeSerializer, environment);
        testHarness.open();
        SinkWriterOperatorTestBase.assertContextsEqual((Sink.InitContext)initContext.get(), (WriterInitContext)originalContext.get());
        ((OptionalAssert)Assertions.assertThat((Optional)((Sink.InitContext)initContext.get()).metadataConsumer()).isPresent()).hasValueSatisfying(consumer -> {
            consumer.accept(consumed);
            Assertions.assertThat((AtomicBoolean)consumed).isTrue();
        });
        testHarness.close();
    }

    private static void assertContextsEqual(Sink.InitContext initContext, WriterInitContext original) {
        Assertions.assertThat((Object)initContext.getUserCodeClassLoader().asClassLoader()).isEqualTo((Object)original.getUserCodeClassLoader().asClassLoader());
        Assertions.assertThat((Object)initContext.getMailboxExecutor()).isEqualTo((Object)original.getMailboxExecutor());
        Assertions.assertThat((Object)initContext.getProcessingTimeService()).isEqualTo((Object)original.getProcessingTimeService());
        Assertions.assertThat((int)initContext.getTaskInfo().getIndexOfThisSubtask()).isEqualTo(original.getTaskInfo().getIndexOfThisSubtask());
        Assertions.assertThat((int)initContext.getTaskInfo().getNumberOfParallelSubtasks()).isEqualTo(original.getTaskInfo().getNumberOfParallelSubtasks());
        Assertions.assertThat((int)initContext.getTaskInfo().getAttemptNumber()).isEqualTo(original.getTaskInfo().getAttemptNumber());
        Assertions.assertThat((Object)initContext.metricGroup()).isEqualTo((Object)original.metricGroup());
        Assertions.assertThat((OptionalLong)initContext.getRestoredCheckpointId()).isEqualTo((Object)original.getRestoredCheckpointId());
        Assertions.assertThat((boolean)initContext.isObjectReuseEnabled()).isEqualTo(original.isObjectReuseEnabled());
        Assertions.assertThat((Object)initContext.createInputSerializer()).isEqualTo((Object)original.createInputSerializer());
        Assertions.assertThat((Comparable)initContext.getJobInfo().getJobId()).isEqualTo((Object)original.getJobInfo().getJobId());
        Assertions.assertThat((Optional)initContext.metadataConsumer()).isEqualTo((Object)original.metadataConsumer());
    }

    private static void assertRestoredCommitterCommittable(Object record, String committable) {
        ((ObjectAssert)Assertions.assertThat((Object)record).isInstanceOf(CommittableWithLineage.class)).satisfies(new ThrowingConsumer[]{cl -> SinkV2Assertions.assertThat((CommittableWithLineage)cl).hasCommittable(committable).hasCheckpointId(1L).hasSubtaskId(0)});
    }

    private static void assertEmitted(List<String> records, Queue<Object> output) {
        List<StreamElement> collected = SinkTestUtil.fromOutput(output);
        Assertions.assertThat(collected).hasSize(records.size() + 1);
        ((ObjectAssert)Assertions.assertThat((Object)collected.get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class)).satisfies(new ThrowingConsumer[]{cs -> SinkV2Assertions.assertThat((CommittableSummary)cs).hasPendingCommittables(records.size()).hasOverallCommittables(records.size()).hasFailedCommittables(0)});
        ArrayList<Object> committables = new ArrayList<Object>();
        for (int i = 1; i <= records.size(); ++i) {
            Object value = collected.get(i).asRecord().getValue();
            Assertions.assertThat((Object)value).isInstanceOf(CommittableWithLineage.class);
            committables.add(((CommittableWithLineage)value).getCommittable());
        }
        Assertions.assertThat(committables).containsExactlyInAnyOrderElementsOf(records);
    }

    private static OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> createTestHarnessWithBufferingSinkWriter(Sink sink) throws Exception {
        SinkWriterOperatorFactory writerOperatorFactory = new SinkWriterOperatorFactory(sink);
        return new OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>>((OneInputStreamOperatorFactory<Integer, CommittableMessage<Integer>>)writerOperatorFactory);
    }

    private static void assertBasicOutput(Collection<Object> queuedOutput, int numberOfCommittables, @Nullable Long checkpointId) {
        List<StreamElement> output = SinkTestUtil.fromOutput(queuedOutput);
        Assertions.assertThat(output).hasSize(numberOfCommittables + 1);
        ((ObjectAssert)Assertions.assertThat((Object)output.get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class)).satisfies(new ThrowingConsumer[]{cs -> SinkV2Assertions.assertThat((CommittableSummary)cs).hasOverallCommittables(numberOfCommittables).hasPendingCommittables(numberOfCommittables).hasFailedCommittables(0)});
        for (int i = 1; i <= numberOfCommittables; ++i) {
            ((ObjectAssert)Assertions.assertThat((Object)output.get(i).asRecord().getValue()).isInstanceOf(CommittableWithLineage.class)).satisfies(new ThrowingConsumer[]{cl -> SinkV2Assertions.assertThat((CommittableWithLineage)cl).hasCheckpointId(checkpointId).hasSubtaskId(0)});
        }
    }

    abstract SinkAndSuppliers sinkWithoutCommitter();

    abstract SinkAndSuppliers sinkWithTimeBasedWriter();

    abstract SinkAndSuppliers sinkWithSnapshottingWriter(boolean var1, String var2);

    abstract SinkAndSuppliers sinkWithCommitter();

    static class SinkAndSuppliers {
        Sink<Integer> sink;
        Supplier<List<String>> elementSupplier;
        Supplier<List<org.apache.flink.api.common.eventtime.Watermark>> watermarkSupplier;
        LongSupplier lastCheckpointSupplier;
        Supplier<SimpleVersionedSerializer<String>> serializerSupplier;

        public SinkAndSuppliers(Sink<Integer> sink, Supplier<List<String>> elementSupplier, Supplier<List<org.apache.flink.api.common.eventtime.Watermark>> watermarkSupplier, LongSupplier lastCheckpointSupplier, Supplier<SimpleVersionedSerializer<String>> serializerSupplier) {
            this.sink = sink;
            this.elementSupplier = elementSupplier;
            this.watermarkSupplier = watermarkSupplier;
            this.lastCheckpointSupplier = lastCheckpointSupplier;
            this.serializerSupplier = serializerSupplier;
        }
    }

    private static class TestingCommittableSerializer
    extends SinkV1WriterCommittableSerializer<String> {
        private final SimpleVersionedSerializer<String> committableSerializer;

        public TestingCommittableSerializer(SimpleVersionedSerializer<String> committableSerializer) {
            super(committableSerializer);
            this.committableSerializer = committableSerializer;
        }

        public byte[] serialize(List<String> obj) throws IOException {
            DataOutputSerializer out = new DataOutputSerializer(256);
            out.writeInt(-1189141204);
            SimpleVersionedSerialization.writeVersionAndSerializeList(this.committableSerializer, obj, (DataOutputView)out);
            return out.getCopyOfBuffer();
        }
    }

    private static class DummySinkOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        static final String DUMMY_SINK_STATE_NAME = "dummy_sink_state";
        static final ListStateDescriptor<byte[]> SINK_STATE_DESC = new ListStateDescriptor("dummy_sink_state", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        ListState<String> sinkState;
        private final SimpleVersionedSerializer<String> serializer;

        public DummySinkOperator(SimpleVersionedSerializer<String> serializer) {
            this.serializer = serializer;
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.sinkState = new SimpleVersionedListState(context.getOperatorStateStore().getListState(SINK_STATE_DESC), this.serializer);
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            this.sinkState.add(element.getValue());
        }
    }

    private static class TestCommitterOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor("streaming_committer_raw_states", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        private ListState<List<String>> committerState;
        private final List<String> buffer = new ArrayList<String>();
        private final SimpleVersionedSerializer<String> serializer;

        public TestCommitterOperator(SimpleVersionedSerializer<String> serializer) {
            this.serializer = serializer;
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.committerState = new SimpleVersionedListState(context.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC), (SimpleVersionedSerializer)new TestingCommittableSerializer(this.serializer));
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            this.buffer.add((String)element.getValue());
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            super.snapshotState(context);
            this.committerState.add(this.buffer);
        }
    }
}

