package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.class */
class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Integer>, ListSink> {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest$FailingCommitter.class */
    private static class FailingCommitter extends CheckpointCommitter {
        private static final long serialVersionUID = 1;
        private List<Tuple2<Long, Integer>> checkpoints;
        private boolean failIsCommitted;
        private boolean failCommit;

        private FailingCommitter() {
            this.failIsCommitted = true;
            this.failCommit = true;
        }

        public void open() throws Exception {
        }

        public void close() throws Exception {
        }

        public void createResource() throws Exception {
            this.checkpoints = new ArrayList();
        }

        public void commitCheckpoint(int i, long j) {
            if (this.failCommit) {
                this.failCommit = false;
                throw new RuntimeException("Expected exception");
            }
            this.checkpoints.add(new Tuple2<>(Long.valueOf(j), Integer.valueOf(i)));
        }

        public boolean isCheckpointCommitted(int i, long j) {
            if (!this.failIsCommitted) {
                return false;
            }
            this.failIsCommitted = false;
            throw new RuntimeException("Expected exception");
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest$ListSink.class */
    public static class ListSink extends GenericWriteAheadSink<Tuple1<Integer>> {
        private static final long serialVersionUID = 1;
        public List<Integer> values;

        public ListSink() throws Exception {
            super(new SimpleCommitter(), TypeExtractor.getForObject(new Tuple1(1)).createSerializer(new SerializerConfigImpl()), "job");
            this.values = new ArrayList();
        }

        /* JADX WARN: Multi-variable type inference failed */
        protected boolean sendValues(Iterable<Tuple1<Integer>> iterable, long j, long j2) throws Exception {
            Iterator<Tuple1<Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                this.values.add(it.next().f0);
            }
            return true;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest$ListSink2.class */
    public static class ListSink2 extends GenericWriteAheadSink<Tuple1<Integer>> {
        private static final long serialVersionUID = 1;
        public List<Integer> values;

        public ListSink2() throws Exception {
            super(new FailingCommitter(), TypeExtractor.getForObject(new Tuple1(1)).createSerializer(new SerializerConfigImpl()), "job");
            this.values = new ArrayList();
        }

        /* JADX WARN: Multi-variable type inference failed */
        protected boolean sendValues(Iterable<Tuple1<Integer>> iterable, long j, long j2) throws Exception {
            Iterator<Tuple1<Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                this.values.add(it.next().f0);
            }
            return true;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest$SimpleCommitter.class */
    private static class SimpleCommitter extends CheckpointCommitter {
        private static final long serialVersionUID = 1;
        private List<Tuple2<Long, Integer>> checkpoints;

        private SimpleCommitter() {
        }

        public void open() throws Exception {
        }

        public void close() throws Exception {
        }

        public void createResource() throws Exception {
            this.checkpoints = new ArrayList();
        }

        public void commitCheckpoint(int i, long j) {
            this.checkpoints.add(new Tuple2<>(Long.valueOf(j), Integer.valueOf(i)));
        }

        public boolean isCheckpointCommitted(int i, long j) {
            return this.checkpoints.contains(new Tuple2(Long.valueOf(j), Integer.valueOf(i)));
        }
    }

    GenericWriteAheadSinkTest() {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase
    public ListSink createSink() throws Exception {
        return new ListSink();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase
    public TupleTypeInfo<Tuple1<Integer>> createTypeInfo() {
        return TupleTypeInfo.getBasicTupleTypeInfo(new Class[]{Integer.class});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase
    public Tuple1<Integer> generateValue(int i, int i2) {
        return new Tuple1<>(Integer.valueOf(i));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase
    public void verifyResultsIdealCircumstances(ListSink listSink) {
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= 60; i++) {
            arrayList.add(Integer.valueOf(i));
        }
        Iterator<Integer> it = listSink.values.iterator();
        while (it.hasNext()) {
            arrayList.remove(it.next());
        }
        Assertions.assertThat(arrayList).as("The following ID's where not found in the result list: " + arrayList, new Object[0]).isEmpty();
        Assertions.assertThat(listSink.values).as("The sink emitted to many values: " + (listSink.values.size() - 60), new Object[0]).hasSize(60);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase
    public void verifyResultsDataPersistenceUponMissedNotify(ListSink listSink) {
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= 60; i++) {
            arrayList.add(Integer.valueOf(i));
        }
        Iterator<Integer> it = listSink.values.iterator();
        while (it.hasNext()) {
            arrayList.remove(it.next());
        }
        Assertions.assertThat(arrayList).as("The following ID's where not found in the result list: " + arrayList, new Object[0]).isEmpty();
        Assertions.assertThat(listSink.values).as("The sink emitted to many values: " + (listSink.values.size() - 60), new Object[0]).hasSize(60);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase
    public void verifyResultsDataDiscardingUponRestore(ListSink listSink) {
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= 20; i++) {
            arrayList.add(Integer.valueOf(i));
        }
        for (int i2 = 41; i2 <= 60; i2++) {
            arrayList.add(Integer.valueOf(i2));
        }
        Iterator<Integer> it = listSink.values.iterator();
        while (it.hasNext()) {
            arrayList.remove(it.next());
        }
        Assertions.assertThat(arrayList).as("The following ID's where not found in the result list: " + arrayList, new Object[0]).isEmpty();
        Assertions.assertThat(listSink.values).as("The sink emitted to many values: " + (listSink.values.size() - 40), new Object[0]).hasSize(40);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase
    public void verifyResultsWhenReScaling(ListSink listSink, int i, int i2) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i3 = i; i3 <= i2; i3++) {
            arrayList.add(Integer.valueOf(i3));
        }
        Assertions.assertThat(listSink.values).containsExactlyInAnyOrderElementsOf(arrayList);
    }

    @Test
    void testCommitterException() throws Exception {
        ListSink2 listSink2 = new ListSink2();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) listSink2);
        oneInputStreamOperatorTestHarness.open();
        int i = 1;
        for (int i2 = 0; i2 < 10; i2++) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(generateValue(i, 0)));
            i++;
        }
        oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        oneInputStreamOperatorTestHarness.notifyOfCompletedCheckpoint(0L);
        Assertions.assertThat(listSink2.values).isEmpty();
        for (int i3 = 0; i3 < 11; i3++) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(generateValue(i, 1)));
            i++;
        }
        oneInputStreamOperatorTestHarness.snapshot(1L, 0L);
        oneInputStreamOperatorTestHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(listSink2.values).hasSize(10);
        for (int i4 = 0; i4 < 12; i4++) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(generateValue(i, 2)));
            i++;
        }
        oneInputStreamOperatorTestHarness.snapshot(2L, 0L);
        oneInputStreamOperatorTestHarness.notifyOfCompletedCheckpoint(2L);
        Assertions.assertThat(listSink2.values).hasSize(43);
    }
}
