/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.sink.SinkOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class SinkWriterOperatorTest
extends TestLogger {
    @Parameterized.Parameter
    public boolean stateful;

    @Parameterized.Parameters(name="Stateful: {0}")
    public static Collection<Object> data() {
        return Arrays.asList(true, false);
    }

    @Test
    public void nonBufferingWriterEmitsWithoutFlush() throws Exception {
        long initialTime = 0L;
        OneInputStreamOperatorTestHarness<Integer, byte[]> testHarness = this.createTestHarness(new TestSink.DefaultSinkWriter<Integer>());
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        testHarness.snapshot(1L, 1L);
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(testHarness.getOutput()), (Matcher)Matchers.contains((Object[])new StreamElement[]{new Watermark(0L), new StreamRecord((Object)Tuple3.of((Object)1, (Object)1L, (Object)0L).toString()), new StreamRecord((Object)Tuple3.of((Object)2, (Object)2L, (Object)0L).toString())}));
    }

    @Test
    public void nonBufferingWriterEmitsOnFlush() throws Exception {
        long initialTime = 0L;
        OneInputStreamOperatorTestHarness<Integer, byte[]> testHarness = this.createTestHarness(new TestSink.DefaultSinkWriter<Integer>());
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.endInput();
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(testHarness.getOutput()), (Matcher)Matchers.contains((Object[])new StreamElement[]{new Watermark(0L), new StreamRecord((Object)Tuple3.of((Object)1, (Object)1L, (Object)0L).toString()), new StreamRecord((Object)Tuple3.of((Object)2, (Object)2L, (Object)0L).toString())}));
    }

    @Test
    public void bufferingWriterDoesNotEmitWithoutFlush() throws Exception {
        long initialTime = 0L;
        OneInputStreamOperatorTestHarness<Integer, byte[]> testHarness = this.createTestHarness(new BufferingSinkWriter());
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        testHarness.snapshot(1L, 1L);
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(testHarness.getOutput()), (Matcher)Matchers.contains((Object[])new StreamElement[]{new Watermark(0L)}));
    }

    @Test
    public void bufferingWriterEmitsOnFlush() throws Exception {
        long initialTime = 0L;
        OneInputStreamOperatorTestHarness<Integer, byte[]> testHarness = this.createTestHarness(new BufferingSinkWriter());
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.endInput();
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(testHarness.getOutput()), (Matcher)Matchers.contains((Object[])new StreamElement[]{new Watermark(0L), new StreamRecord((Object)Tuple3.of((Object)1, (Object)1L, (Object)0L).toString()), new StreamRecord((Object)Tuple3.of((Object)2, (Object)2L, (Object)0L).toString())}));
    }

    @Test
    public void timeBasedBufferingSinkWriter() throws Exception {
        long initialTime = 0L;
        OneInputStreamOperatorTestHarness<Integer, byte[]> testHarness = this.createTestHarness(new TimeBasedBufferingSinkWriter());
        testHarness.open();
        testHarness.setProcessingTime(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        MatcherAssert.assertThat((Object)testHarness.getOutput().size(), (Matcher)Matchers.equalTo((Object)0));
        testHarness.getProcessingTimeService().setCurrentTime(2001L);
        testHarness.prepareSnapshotPreBarrier(2L);
        testHarness.endInput();
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(testHarness.getOutput()), (Matcher)Matchers.contains((Object[])new StreamElement[]{new StreamRecord((Object)Tuple3.of((Object)1, (Object)1L, (Object)Long.MIN_VALUE).toString()), new StreamRecord((Object)Tuple3.of((Object)2, (Object)2L, (Object)Long.MIN_VALUE).toString())}));
    }

    @Test
    public void watermarkPropagatedToSinkWriter() throws Exception {
        long initialTime = 0L;
        TestSink.DefaultSinkWriter<Integer> writer = new TestSink.DefaultSinkWriter<Integer>();
        OneInputStreamOperatorTestHarness<Integer, byte[]> testHarness = this.createTestHarness(writer);
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processWatermark(1L);
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(testHarness.getOutput()), (Matcher)Matchers.contains((Object[])new StreamElement[]{new Watermark(0L), new Watermark(1L)}));
        MatcherAssert.assertThat(writer.watermarks, (Matcher)Matchers.contains((Object[])new org.apache.flink.api.common.eventtime.Watermark[]{new org.apache.flink.api.common.eventtime.Watermark(0L), new org.apache.flink.api.common.eventtime.Watermark(1L)}));
    }

    @Test
    public void stateIsRestored() throws Exception {
        long initialTime = 0L;
        SnapshottingBufferingSinkWriter snapshottingWriter = new SnapshottingBufferingSinkWriter();
        OneInputStreamOperatorTestHarness<Integer, byte[]> testHarness = this.createTestHarness(snapshottingWriter);
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L);
        MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])new Object[]{new Watermark(0L)}));
        MatcherAssert.assertThat((Object)snapshottingWriter.lastCheckpointId, (Matcher)Matchers.equalTo((Object)(this.stateful ? 1L : -1L)));
        testHarness.close();
        OneInputStreamOperatorTestHarness<Integer, byte[]> restoredTestHarness = this.createTestHarness(new SnapshottingBufferingSinkWriter());
        restoredTestHarness.initializeState(snapshot);
        restoredTestHarness.open();
        restoredTestHarness.endInput();
        ArrayList<StreamRecord> expectedOutput = new ArrayList<StreamRecord>();
        if (this.stateful) {
            expectedOutput.add(new StreamRecord((Object)Tuple3.of((Object)1, (Object)1L, (Object)0L).toString()));
            expectedOutput.add(new StreamRecord((Object)Tuple3.of((Object)2, (Object)2L, (Object)0L).toString()));
        }
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(restoredTestHarness.getOutput()), (Matcher)Matchers.equalTo(expectedOutput));
    }

    @Test
    public void loadPreviousSinkState() throws Exception {
        List<String> previousSinkInputs = Arrays.asList("bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", "cost", "prompt");
        OneInputStreamOperatorTestHarness<String, String> previousSink = new OneInputStreamOperatorTestHarness<String, String>((OneInputStreamOperator<String, String>)new DummySinkOperator(), (TypeSerializer<String>)StringSerializer.INSTANCE);
        OperatorSubtaskState previousSinkState = TestHarnessUtil.buildSubtaskState(previousSink, previousSinkInputs);
        OneInputStreamOperatorTestHarness<Integer, byte[]> compatibleWriterOperator = this.createCompatibleSinkOperator();
        ArrayList<StreamRecord> expectedOutput1 = this.stateful ? previousSinkInputs.stream().map(StreamRecord::new).collect(Collectors.toList()) : new ArrayList<StreamRecord>();
        expectedOutput1.add(new StreamRecord((Object)Tuple3.of((Object)1, (Object)1, (Object)Long.MIN_VALUE).toString()));
        compatibleWriterOperator.initializeState(previousSinkState);
        compatibleWriterOperator.open();
        compatibleWriterOperator.processElement(1, 1L);
        compatibleWriterOperator.endInput();
        OperatorSubtaskState operatorStateWithoutPreviousState = compatibleWriterOperator.snapshot(1L, 1L);
        compatibleWriterOperator.close();
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(compatibleWriterOperator.getOutput()), (Matcher)Matchers.containsInAnyOrder((Object[])expectedOutput1.toArray()));
        OneInputStreamOperatorTestHarness<Integer, byte[]> restoredSinkOperator = this.createCompatibleSinkOperator();
        List<StreamRecord> expectedOutput2 = Arrays.asList(new StreamRecord((Object)Tuple3.of((Object)2, (Object)2, (Object)Long.MIN_VALUE).toString()), new StreamRecord((Object)Tuple3.of((Object)3, (Object)3, (Object)Long.MIN_VALUE).toString()));
        restoredSinkOperator.initializeState(operatorStateWithoutPreviousState);
        restoredSinkOperator.open();
        restoredSinkOperator.processElement(2, 2L);
        restoredSinkOperator.processElement(3, 3L);
        restoredSinkOperator.endInput();
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(restoredSinkOperator.getOutput()), (Matcher)Matchers.containsInAnyOrder((Object[])expectedOutput2.toArray()));
    }

    @Test
    public void receivePreCommitWithoutCommitter() throws Exception {
        long initialTime = 0L;
        PreBarrierSinkWriter writer = new PreBarrierSinkWriter();
        OneInputStreamOperatorTestHarness<Integer, byte[]> testHarness = this.createTestHarness(writer, false);
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        Assertions.assertTrue((boolean)writer.hasReceivedPreCommit());
        testHarness.snapshot(1L, 1L);
        MatcherAssert.assertThat(writer.getElements(), (Matcher)Matchers.contains((Object[])new String[]{Tuple3.of((Object)1, (Object)1L, (Object)0L).toString(), Tuple3.of((Object)2, (Object)2L, (Object)0L).toString()}));
        MatcherAssert.assertThat(writer.getWatermarks(), (Matcher)Matchers.contains((Object[])new org.apache.flink.api.common.eventtime.Watermark[]{new org.apache.flink.api.common.eventtime.Watermark(0L)}));
    }

    private OneInputStreamOperatorTestHarness<Integer, byte[]> createCompatibleSinkOperator() throws Exception {
        return new OneInputStreamOperatorTestHarness<Integer, byte[]>((OneInputStreamOperatorFactory<Integer, byte[]>)new SinkOperatorFactory(this.getBuilder(new SnapshottingBufferingSinkWriter()).setCompatibleStateNames("dummy_sink_state").build(), false, true), (TypeSerializer<Integer>)IntSerializer.INSTANCE);
    }

    private OneInputStreamOperatorTestHarness<Integer, byte[]> createTestHarness(TestSink.DefaultSinkWriter<Integer> writer) throws Exception {
        return this.createTestHarness(writer, true);
    }

    private OneInputStreamOperatorTestHarness<Integer, byte[]> createTestHarness(TestSink.DefaultSinkWriter<Integer> writer, boolean withCommitter) throws Exception {
        return new OneInputStreamOperatorTestHarness<Integer, byte[]>((OneInputStreamOperatorFactory<Integer, byte[]>)new SinkOperatorFactory(this.getBuilder(writer).build(), false, withCommitter), (TypeSerializer<Integer>)IntSerializer.INSTANCE);
    }

    private TestSink.Builder<Integer> getBuilder(TestSink.DefaultSinkWriter<Integer> writer) {
        TestSink.Builder<Integer> builder = TestSink.newBuilder().setWriter(writer).setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE);
        if (this.stateful) {
            builder.withWriterState();
        }
        return builder;
    }

    private static class TimeBasedBufferingSinkWriter
    extends TestSink.DefaultSinkWriter<Integer>
    implements Sink.ProcessingTimeService.ProcessingTimeCallback {
        private final List<String> cachedCommittables = new ArrayList<String>();

        private TimeBasedBufferingSinkWriter() {
        }

        @Override
        public void write(Integer element, SinkWriter.Context context) {
            this.cachedCommittables.add(Tuple3.of((Object)element, (Object)context.timestamp(), (Object)context.currentWatermark()).toString());
        }

        @Override
        void setProcessingTimerService(Sink.ProcessingTimeService processingTimerService) {
            super.setProcessingTimerService(processingTimerService);
            this.processingTimerService.registerProcessingTimer(1000L, (Sink.ProcessingTimeService.ProcessingTimeCallback)this);
        }

        public void onProcessingTime(long time) throws IOException {
            this.elements.addAll(this.cachedCommittables);
            this.cachedCommittables.clear();
            this.processingTimerService.registerProcessingTimer(time + 1000L, (Sink.ProcessingTimeService.ProcessingTimeCallback)this);
        }
    }

    private static class BufferingSinkWriter
    extends TestSink.DefaultSinkWriter<Integer> {
        private BufferingSinkWriter() {
        }

        @Override
        public List<String> prepareCommit(boolean flush) {
            if (!flush) {
                return Collections.emptyList();
            }
            List result = this.elements;
            this.elements = new ArrayList();
            return result;
        }
    }

    private static class DummySinkOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        static final String DUMMY_SINK_STATE_NAME = "dummy_sink_state";
        static final ListStateDescriptor<byte[]> SINK_STATE_DESC = new ListStateDescriptor("dummy_sink_state", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        ListState<String> sinkState;

        private DummySinkOperator() {
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.sinkState = new SimpleVersionedListState(context.getOperatorStateStore().getListState(SINK_STATE_DESC), (SimpleVersionedSerializer)TestSink.StringCommittableSerializer.INSTANCE);
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            this.sinkState.add(element.getValue());
        }
    }

    private static class SnapshottingBufferingSinkWriter
    extends BufferingSinkWriter {
        public static final int NOT_SNAPSHOTTED = -1;
        long lastCheckpointId = -1L;

        private SnapshottingBufferingSinkWriter() {
        }

        @Override
        public List<String> snapshotState(long checkpointId) throws IOException {
            this.lastCheckpointId = checkpointId;
            return this.elements;
        }

        @Override
        void restoredFrom(List<String> states) {
            this.elements = new ArrayList<String>(states);
        }
    }

    private static class PreBarrierSinkWriter
    extends TestSink.DefaultSinkWriter<Integer> {
        private boolean receivedPreCommit = false;

        private PreBarrierSinkWriter() {
        }

        @Override
        public List<String> prepareCommit(boolean flush) {
            this.receivedPreCommit = true;
            return Collections.emptyList();
        }

        public boolean hasReceivedPreCommit() {
            return this.receivedPreCommit;
        }

        public List<org.apache.flink.api.common.eventtime.Watermark> getWatermarks() {
            return this.watermarks;
        }

        public List<String> getElements() {
            return this.elements;
        }
    }
}

