package org.apache.flink.streaming.api.functions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.types.Value;
import org.apache.flink.util.InstantiationUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/FromElementsFunctionTest.class */
class FromElementsFunctionTest {
    private static final String[] STRING_ARRAY_DATA = {"Oh", "boy", "what", "a", "show", "!"};
    private static final List<String> STRING_LIST_DATA = Arrays.asList(STRING_ARRAY_DATA);

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/FromElementsFunctionTest$DeserializeTooMuchType.class */
    private static class DeserializeTooMuchType implements Value {
        private static final long serialVersionUID = -6037206294939421807L;

        private DeserializeTooMuchType() {
        }

        public void write(DataOutputView dataOutputView) throws IOException {
            dataOutputView.writeInt(42);
        }

        public void read(DataInputView dataInputView) throws IOException {
            dataInputView.readLong();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/FromElementsFunctionTest$MyPojo.class */
    private static class MyPojo {
        public long val1;
        public int val2;

        public MyPojo() {
        }

        public MyPojo(long j, int i) {
            this.val1 = j;
            this.val2 = i;
        }

        public int hashCode() {
            return this.val2;
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof MyPojo)) {
                return false;
            }
            MyPojo myPojo = (MyPojo) obj;
            return this.val1 == myPojo.val1 && this.val2 == myPojo.val2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/FromElementsFunctionTest$SerializationErrorType.class */
    public static class SerializationErrorType implements Value {
        private static final long serialVersionUID = -6037206294939421807L;

        private SerializationErrorType() {
        }

        public void write(DataOutputView dataOutputView) throws IOException {
            throw new IOException("test exception");
        }

        public void read(DataInputView dataInputView) throws IOException {
            throw new IOException("test exception");
        }
    }

    FromElementsFunctionTest() {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> List<T> runSource(FromElementsFunction<T> fromElementsFunction) throws Exception {
        ArrayList arrayList = new ArrayList();
        InstantiationUtil.clone(fromElementsFunction).run(new ListSourceContext(arrayList));
        return arrayList;
    }

    @Test
    void testStrings() throws Exception {
        String[] strArr = {"Oh", "boy", "what", "a", "show", "!"};
        FromElementsFunction fromElementsFunction = new FromElementsFunction(BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new SerializerConfigImpl()), strArr);
        ArrayList arrayList = new ArrayList();
        fromElementsFunction.run(new ListSourceContext(arrayList));
        Assertions.assertThat(arrayList).containsExactly(strArr);
    }

    @Test
    void testNullElement() {
        Assertions.assertThatThrownBy(() -> {
            new FromElementsFunction(new String[]{"a", null, "b"});
        }).hasMessageContaining("contains a null element").isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testSetOutputTypeWithNoSerializer() throws Exception {
        FromElementsFunction fromElementsFunction = new FromElementsFunction(STRING_ARRAY_DATA);
        Assertions.assertThat(fromElementsFunction.getSerializer()).isNull();
        fromElementsFunction.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
        Assertions.assertThat(fromElementsFunction.getSerializer()).isNotNull().isEqualTo(BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new SerializerConfigImpl()));
        Assertions.assertThat(runSource(fromElementsFunction)).containsExactly(STRING_ARRAY_DATA);
    }

    @Test
    void testSetOutputTypeWithSameSerializer() throws Exception {
        FromElementsFunction fromElementsFunction = new FromElementsFunction(BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new SerializerConfigImpl()), STRING_LIST_DATA);
        TypeSerializer serializer = fromElementsFunction.getSerializer();
        fromElementsFunction.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
        Assertions.assertThat(fromElementsFunction.getSerializer()).isEqualTo(serializer);
        Assertions.assertThat(runSource(fromElementsFunction)).containsExactly(STRING_ARRAY_DATA);
    }

    @Test
    void testSetOutputTypeWithIncompatibleType() {
        FromElementsFunction fromElementsFunction = new FromElementsFunction(STRING_LIST_DATA);
        Assertions.assertThatThrownBy(() -> {
            fromElementsFunction.setOutputType(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
        }).hasMessageContaining("not all subclasses of java.lang.Integer").isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testSetOutputTypeWithExistingBrokenSerializer() throws Exception {
        FromElementsFunction fromElementsFunction = new FromElementsFunction(new ValueTypeInfo(DeserializeTooMuchType.class).createSerializer(new SerializerConfigImpl()), new DeserializeTooMuchType[]{new DeserializeTooMuchType()});
        TypeSerializer serializer = fromElementsFunction.getSerializer();
        fromElementsFunction.setOutputType(new GenericTypeInfo(DeserializeTooMuchType.class), new ExecutionConfig());
        Assertions.assertThat(fromElementsFunction.getSerializer()).isNotEqualTo(serializer);
        Assertions.assertThat(runSource(fromElementsFunction)).hasSize(1).first().isInstanceOf(DeserializeTooMuchType.class);
    }

    @Test
    void testSetOutputTypeAfterTransferred() throws Exception {
        FromElementsFunction clone = InstantiationUtil.clone(new FromElementsFunction(STRING_LIST_DATA));
        Assertions.assertThatThrownBy(() -> {
            clone.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
        }).hasMessageContaining("The output type should've been specified before shipping the graph to the cluster").isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testNoSerializer() {
        FromElementsFunction fromElementsFunction = new FromElementsFunction(STRING_LIST_DATA);
        Assertions.assertThatThrownBy(() -> {
            runSource(fromElementsFunction);
        }).hasMessageContaining("serializer not configured").isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testNonJavaSerializableType() throws Exception {
        MyPojo[] myPojoArr = {new MyPojo(1L, 2), new MyPojo(3L, 4), new MyPojo(5L, 6)};
        Assertions.assertThat(runSource(new FromElementsFunction(TypeExtractor.getForClass(MyPojo.class).createSerializer(new SerializerConfigImpl()), myPojoArr))).containsExactly(myPojoArr);
    }

    @Test
    void testNonJavaSerializableTypeWithSetOutputType() throws Exception {
        MyPojo[] myPojoArr = {new MyPojo(1L, 2), new MyPojo(3L, 4), new MyPojo(5L, 6)};
        FromElementsFunction fromElementsFunction = new FromElementsFunction(myPojoArr);
        fromElementsFunction.setOutputType(TypeExtractor.getForClass(MyPojo.class), new ExecutionConfig());
        Assertions.assertThat(runSource(fromElementsFunction)).containsExactly(myPojoArr);
    }

    @Test
    void testSerializationError() {
        ValueTypeInfo valueTypeInfo = new ValueTypeInfo(SerializationErrorType.class);
        Assertions.assertThatThrownBy(() -> {
            new FromElementsFunction(valueTypeInfo.createSerializer(new SerializerConfigImpl()), new SerializationErrorType[]{new SerializationErrorType()});
        }).isInstanceOf(IOException.class).hasMessageContaining("test exception");
    }

    @Test
    void testDeSerializationError() throws Exception {
        FromElementsFunction fromElementsFunction = new FromElementsFunction(new ValueTypeInfo(DeserializeTooMuchType.class).createSerializer(new SerializerConfigImpl()), new DeserializeTooMuchType[]{new DeserializeTooMuchType()});
        Assertions.assertThatThrownBy(() -> {
            fromElementsFunction.run(new ListSourceContext(new ArrayList()));
        }).hasMessageContaining("user-defined serialization").isInstanceOf(IOException.class);
    }

    @Test
    void testCheckpointAndRestore() throws Exception {
        OperatorSubtaskState snapshot;
        ArrayList arrayList = new ArrayList(10000);
        ArrayList arrayList2 = new ArrayList(10000);
        for (int i = 0; i < 10000; i++) {
            arrayList.add(Integer.valueOf(i));
        }
        final FromElementsFunction fromElementsFunction = new FromElementsFunction(IntSerializer.INSTANCE, arrayList);
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness((StreamOperator) new StreamSource(fromElementsFunction), 1, 1, 0);
        abstractStreamOperatorTestHarness.open();
        final ListSourceContext listSourceContext = new ListSourceContext(arrayList2, 2L);
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.streaming.api.functions.FromElementsFunctionTest.1
            public void go() throws Exception {
                fromElementsFunction.run(listSourceContext);
            }
        };
        checkedThread.start();
        Thread.sleep(1000L);
        ArrayList arrayList3 = new ArrayList(10000);
        synchronized (listSourceContext.getCheckpointLock()) {
            snapshot = abstractStreamOperatorTestHarness.snapshot(566L, System.currentTimeMillis());
            arrayList3.addAll(arrayList2);
        }
        fromElementsFunction.cancel();
        checkedThread.sync();
        FromElementsFunction fromElementsFunction2 = new FromElementsFunction(IntSerializer.INSTANCE, arrayList);
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness2 = new AbstractStreamOperatorTestHarness((StreamOperator) new StreamSource(fromElementsFunction2), 1, 1, 0);
        abstractStreamOperatorTestHarness2.setup();
        abstractStreamOperatorTestHarness2.initializeState(snapshot);
        abstractStreamOperatorTestHarness2.open();
        fromElementsFunction2.run(new ListSourceContext(arrayList3));
        Assertions.assertThat(arrayList3).isEqualTo(arrayList);
    }
}
