package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
import org.apache.flink.shaded.guava31.com.google.common.collect.Streams;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.IntegerSerializer;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.class */
class CommittableCollectorSerializerTest {
    private static final int SUBTASK_ID = 1;
    private static final int NUMBER_OF_SUBTASKS = 1;
    private static final SimpleVersionedSerializer<Integer> COMMITTABLE_SERIALIZER = new IntegerSerializer();
    private static final SinkCommitterMetricGroup METRIC_GROUP = MetricsGroupTestUtils.mockCommitterMetricGroup();
    private static final CommittableCollectorSerializer<Integer> SERIALIZER = new CommittableCollectorSerializer<>(COMMITTABLE_SERIALIZER, 1, 1, METRIC_GROUP);

    CommittableCollectorSerializerTest() {
    }

    @Test
    void testCommittableCollectorV1SerDe() throws IOException {
        List asList = Arrays.asList(1, 2, 3);
        DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
        dataOutputSerializer.writeInt(-1189141204);
        SimpleVersionedSerialization.writeVersionAndSerializeList(COMMITTABLE_SERIALIZER, asList, dataOutputSerializer);
        CommittableCollector deserialize = SERIALIZER.deserialize(1, dataOutputSerializer.getCopyOfBuffer());
        Assertions.assertThat(deserialize.getNumberOfSubtasks()).isEqualTo(1);
        Assertions.assertThat(deserialize.isFinished()).isFalse();
        Assertions.assertThat(deserialize.getSubtaskId()).isEqualTo(0);
        Collection checkpointCommittables = deserialize.getCheckpointCommittables();
        Assertions.assertThat(checkpointCommittables).hasSize(1);
        Assertions.assertThat((List) ((CheckpointCommittableManagerImpl) checkpointCommittables.iterator().next()).getSubtaskCommittableManager(0).getPendingRequests().map((v0) -> {
            return v0.getCommittable();
        }).collect(Collectors.toList())).containsExactly(new Integer[]{1, 2, 3});
    }

    @Test
    void testCommittableCollectorV2SerDe() throws IOException {
        CommittableCollectorSerializer committableCollectorSerializer = new CommittableCollectorSerializer(COMMITTABLE_SERIALIZER, 2, 3, METRIC_GROUP);
        CommittableCollector<Integer> committableCollector = new CommittableCollector<>(2, 3, METRIC_GROUP);
        committableCollector.addMessage(new CommittableSummary(2, 3, 1L, 1, 1, 0));
        committableCollector.addMessage(new CommittableSummary(2, 3, 2L, 1, 1, 0));
        committableCollector.addMessage(new CommittableWithLineage(1, 1L, 2));
        committableCollector.addMessage(new CommittableWithLineage(2, 2L, 2));
        CommittableCollector<Integer> deserialize = committableCollectorSerializer.deserialize(2, SERIALIZER.serialize(committableCollector));
        Assertions.assertThat(deserialize.getSubtaskId()).isEqualTo(2);
        Assertions.assertThat(deserialize.isFinished()).isFalse();
        Assertions.assertThat(deserialize.getNumberOfSubtasks()).isEqualTo(3);
        assertCommittableCollector("Original CommittableCollector", 2, 3, committableCollector, Arrays.asList(Collections.singletonList(1), Collections.singletonList(2)));
        assertCommittableCollector("Deserialized CommittableCollector", 2, 3, deserialize, Arrays.asList(Collections.singletonList(1), Collections.singletonList(2)));
    }

    @Test
    public void testCommittablesForSameSubtaskIdV2SerDe() throws IOException {
        CommittableCollectorSerializer committableCollectorSerializer = new CommittableCollectorSerializer(COMMITTABLE_SERIALIZER, 1, 3, METRIC_GROUP);
        CommittableCollector<Integer> committableCollector = new CommittableCollector<>(1, 3, METRIC_GROUP);
        committableCollector.addMessage(new CommittableSummary(1, 3, 1L, 1, 1, 0));
        committableCollector.addMessage(new CommittableSummary(1 + 1, 3, 1L, 1, 1, 0));
        committableCollector.addMessage(new CommittableWithLineage(1, 1L, 1));
        committableCollector.addMessage(new CommittableWithLineage(1, 1L, 1 + 1));
        CommittableCollector<Integer> deserialize = committableCollectorSerializer.deserialize(2, SERIALIZER.serialize(committableCollector));
        Assertions.assertThat(deserialize.getSubtaskId()).isEqualTo(1);
        Assertions.assertThat(deserialize.isFinished()).isFalse();
        Assertions.assertThat(deserialize.getNumberOfSubtasks()).isEqualTo(3);
        assertCommittableCollector("Original CommittableCollector", 1, 3, committableCollector, Collections.singletonList(Collections.singletonList(1)));
        assertCommittableCollector("Deserialized CommittableCollector", 1, 3, deserialize, Collections.singletonList(Arrays.asList(1, 1)));
    }

    @Test
    void testAlignSubtaskCommittableManagerCheckpointWithCheckpointCommittableManagerCheckpointId() throws IOException {
        CommittableCollector committableCollector = new CommittableCollector(1, 1, METRIC_GROUP);
        committableCollector.addMessage(new CommittableSummary(1, 1, 2L, 1, 1, 0));
        committableCollector.addMessage(new CommittableWithLineage(1, 2L, 1));
        Collection checkpointCommittables = SERIALIZER.deserialize(2, SERIALIZER.serialize(committableCollector)).getCheckpointCommittables();
        Assertions.assertThat(checkpointCommittables).hasSize(1);
        CheckpointCommittableManagerImpl checkpointCommittableManagerImpl = (CheckpointCommittableManagerImpl) checkpointCommittables.iterator().next();
        Assertions.assertThat(checkpointCommittableManagerImpl.getSubtaskCommittableManager(1).getCheckpointId()).isEqualTo(checkpointCommittableManagerImpl.getCheckpointId());
    }

    private void assertCommittableCollector(String str, int i, int i2, CommittableCollector<Integer> committableCollector, List<List<Integer>> list) {
        org.junit.jupiter.api.Assertions.assertAll(str, new Executable[]{() -> {
            Collection checkpointCommittables = committableCollector.getCheckpointCommittables();
            Assertions.assertThat(checkpointCommittables).hasSize(list.size());
            Streams.zip(checkpointCommittables.stream(), list.stream(), (v0, v1) -> {
                return Pair.of(v0, v1);
            }).forEach(pair -> {
                CheckpointCommittableManagerImpl checkpointCommittableManagerImpl = (CheckpointCommittableManagerImpl) pair.getKey();
                List<Integer> list2 = (List) pair.getValue();
                SubtaskCommittableManager<Integer> subtaskCommittableManager = checkpointCommittableManagerImpl.getSubtaskCommittableManager(i);
                SinkV2Assertions.assertThat((CommittableSummary<?>) checkpointCommittableManagerImpl.getSummary()).hasSubtaskId(i).hasNumberOfSubtasks(i2);
                assertPendingRequests(subtaskCommittableManager, list2);
                Assertions.assertThat(subtaskCommittableManager.getSubtaskId()).isEqualTo(i);
            });
        }});
    }

    private void assertPendingRequests(SubtaskCommittableManager<Integer> subtaskCommittableManager, List<Integer> list) {
        Assertions.assertThat((List) subtaskCommittableManager.getPendingRequests().map((v0) -> {
            return v0.getCommittable();
        }).collect(Collectors.toList())).containsExactlyElementsOf(list);
    }
}
