/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sortpartition;

import java.io.Serializable;
import java.util.LinkedList;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.sortpartition.SortPartitionOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class SortPartitionOperatorTest {
    SortPartitionOperatorTest() {
    }

    @Test
    void testSortPartition() throws Exception {
        SortPartitionOperator<Tuple2<Integer, String>> operator1 = this.createSortPartitionOperatorWithPositionField();
        OneInputStreamOperatorTestHarness testHarness1 = new OneInputStreamOperatorTestHarness(operator1);
        LinkedList<StreamRecord> expectedOutput1 = new LinkedList<StreamRecord>();
        testHarness1.setup();
        testHarness1.processElement(new StreamRecord((Object)Tuple2.of((Object)3, (Object)"3")));
        testHarness1.processElement(new StreamRecord((Object)Tuple2.of((Object)1, (Object)"1")));
        testHarness1.endInput();
        testHarness1.close();
        expectedOutput1.add(new StreamRecord((Object)Tuple2.of((Object)1, (Object)"1")));
        expectedOutput1.add(new StreamRecord((Object)Tuple2.of((Object)3, (Object)"3")));
        TestHarnessUtil.assertOutputEquals("The sort partition result is not correct.", expectedOutput1, testHarness1.getOutput());
        SortPartitionOperator<TestPojo> operator2 = this.createSortPartitionOperatorWithStringField();
        OneInputStreamOperatorTestHarness testHarness2 = new OneInputStreamOperatorTestHarness(operator2);
        LinkedList<StreamRecord> expectedOutput2 = new LinkedList<StreamRecord>();
        testHarness2.setup();
        testHarness2.processElement(new StreamRecord((Object)new TestPojo("3", 3)));
        testHarness2.processElement(new StreamRecord((Object)new TestPojo("1", 1)));
        testHarness2.endInput();
        testHarness2.close();
        expectedOutput2.add(new StreamRecord((Object)new TestPojo("1", 1)));
        expectedOutput2.add(new StreamRecord((Object)new TestPojo("3", 3)));
        TestHarnessUtil.assertOutputEquals("The sort partition result is not correct.", expectedOutput2, testHarness2.getOutput());
        SortPartitionOperator<TestPojo> operator3 = this.createSortPartitionOperatorWithKeySelector();
        OneInputStreamOperatorTestHarness testHarness3 = new OneInputStreamOperatorTestHarness(operator3);
        LinkedList<StreamRecord> expectedOutput3 = new LinkedList<StreamRecord>();
        testHarness3.setup();
        testHarness3.processElement(new StreamRecord((Object)new TestPojo("3", 3)));
        testHarness3.processElement(new StreamRecord((Object)new TestPojo("1", 1)));
        testHarness3.endInput();
        testHarness3.close();
        expectedOutput3.add(new StreamRecord((Object)new TestPojo("1", 1)));
        expectedOutput3.add(new StreamRecord((Object)new TestPojo("3", 3)));
        TestHarnessUtil.assertOutputEquals("The sort partition result is not correct.", expectedOutput3, testHarness3.getOutput());
    }

    @Test
    void testOpenClose() throws Exception {
        SortPartitionOperator<Tuple2<Integer, String>> sortPartitionOperator = this.createSortPartitionOperatorWithPositionField();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(sortPartitionOperator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)Tuple2.of((Object)1, (Object)"1")));
        testHarness.endInput();
        testHarness.close();
        Assertions.assertThat(testHarness.getOutput()).isNotEmpty();
    }

    private SortPartitionOperator<Tuple2<Integer, String>> createSortPartitionOperatorWithPositionField() {
        TypeInformation inputType = Types.TUPLE((TypeInformation[])new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO});
        int positionSortField = 0;
        Order sortOrder = Order.ASCENDING;
        return new SortPartitionOperator(inputType, positionSortField, sortOrder);
    }

    private SortPartitionOperator<TestPojo> createSortPartitionOperatorWithStringField() {
        TypeInformation inputType = Types.POJO(TestPojo.class);
        String positionSortField = "value";
        Order sortOrder = Order.ASCENDING;
        return new SortPartitionOperator(inputType, positionSortField, sortOrder);
    }

    private SortPartitionOperator<TestPojo> createSortPartitionOperatorWithKeySelector() {
        TypeInformation inputType = Types.POJO(TestPojo.class);
        Order sortOrder = Order.ASCENDING;
        return new SortPartitionOperator(inputType, TestPojo::getValue, sortOrder);
    }

    public static class TestPojo
    implements Serializable {
        public String key;
        public Integer value;

        public TestPojo() {
        }

        public TestPojo(String key, Integer value) {
            this.key = key;
            this.value = value;
        }

        public Integer getValue() {
            return this.value;
        }

        public void setValue(Integer value) {
            this.value = value;
        }

        public String getKey() {
            return this.key;
        }

        public void setKey(String key) {
            this.key = key;
        }

        public boolean equals(Object object) {
            if (object instanceof TestPojo) {
                TestPojo testPojo = (TestPojo)object;
                return testPojo.getKey().equals(this.getKey()) && testPojo.getValue().equals(this.getValue());
            }
            return false;
        }
    }
}

