/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sort;

import java.io.Serializable;
import java.util.Arrays;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.streaming.api.operators.sort.CollectingDataOutput;
import org.apache.flink.streaming.api.operators.sort.CollectionDataInput;
import org.apache.flink.streaming.api.operators.sort.SortingDataInput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class SortingDataInputTest {
    @Test
    public void simpleFixedLengthKeySorting() throws Exception {
        DataInputStatus inputStatus;
        CollectingDataOutput collectingDataOutput = new CollectingDataOutput();
        CollectionDataInput input = new CollectionDataInput(Arrays.asList(new StreamRecord((Object)1, 3L), new StreamRecord((Object)1, 1L), new StreamRecord((Object)2, 1L), new StreamRecord((Object)2, 3L), new StreamRecord((Object)1, 2L), new StreamRecord((Object)2, 2L)));
        MockEnvironment environment = MockEnvironment.builder().build();
        SortingDataInput sortingDataInput = new SortingDataInput(input, (TypeSerializer)new IntSerializer(), (TypeSerializer)new IntSerializer(), (KeySelector & Serializable)value -> value, environment.getMemoryManager(), environment.getIOManager(), true, 1.0, new Configuration(), (TaskInvokable)new DummyInvokable(), new ExecutionConfig());
        while ((inputStatus = sortingDataInput.emitNext(collectingDataOutput)) != DataInputStatus.END_OF_INPUT) {
        }
        Assert.assertThat(collectingDataOutput.events, (Matcher)CoreMatchers.equalTo(Arrays.asList(new StreamRecord((Object)1, 1L), new StreamRecord((Object)1, 2L), new StreamRecord((Object)1, 3L), new StreamRecord((Object)2, 1L), new StreamRecord((Object)2, 2L), new StreamRecord((Object)2, 3L))));
    }

    @Test
    public void watermarkPropagation() throws Exception {
        DataInputStatus inputStatus;
        CollectingDataOutput collectingDataOutput = new CollectingDataOutput();
        CollectionDataInput input = new CollectionDataInput(Arrays.asList(new StreamRecord((Object)1, 3L), new Watermark(1L), new StreamRecord((Object)1, 1L), new Watermark(2L), new StreamRecord((Object)2, 1L), new Watermark(3L), new StreamRecord((Object)2, 3L), new Watermark(4L), new StreamRecord((Object)1, 2L), new Watermark(5L), new StreamRecord((Object)2, 2L), new Watermark(6L)));
        MockEnvironment environment = MockEnvironment.builder().build();
        SortingDataInput sortingDataInput = new SortingDataInput(input, (TypeSerializer)new IntSerializer(), (TypeSerializer)new IntSerializer(), (KeySelector & Serializable)value -> value, environment.getMemoryManager(), environment.getIOManager(), true, 1.0, new Configuration(), (TaskInvokable)new DummyInvokable(), new ExecutionConfig());
        while ((inputStatus = sortingDataInput.emitNext(collectingDataOutput)) != DataInputStatus.END_OF_INPUT) {
        }
        Assert.assertThat(collectingDataOutput.events, (Matcher)CoreMatchers.equalTo(Arrays.asList(new StreamRecord((Object)1, 1L), new StreamRecord((Object)1, 2L), new StreamRecord((Object)1, 3L), new StreamRecord((Object)2, 1L), new StreamRecord((Object)2, 2L), new StreamRecord((Object)2, 3L), new Watermark(6L))));
    }

    @Test
    public void simpleVariableLengthKeySorting() throws Exception {
        DataInputStatus inputStatus;
        CollectingDataOutput collectingDataOutput = new CollectingDataOutput();
        CollectionDataInput input = new CollectionDataInput(Arrays.asList(new StreamRecord((Object)1, 3L), new StreamRecord((Object)1, 1L), new StreamRecord((Object)2, 1L), new StreamRecord((Object)2, 3L), new StreamRecord((Object)1, 2L), new StreamRecord((Object)2, 2L)));
        MockEnvironment environment = MockEnvironment.builder().build();
        SortingDataInput sortingDataInput = new SortingDataInput(input, (TypeSerializer)new IntSerializer(), (TypeSerializer)new StringSerializer(), (KeySelector & Serializable)value -> "" + value, environment.getMemoryManager(), environment.getIOManager(), true, 1.0, new Configuration(), (TaskInvokable)new DummyInvokable(), new ExecutionConfig());
        while ((inputStatus = sortingDataInput.emitNext(collectingDataOutput)) != DataInputStatus.END_OF_INPUT) {
        }
        Assert.assertThat(collectingDataOutput.events, (Matcher)CoreMatchers.equalTo(Arrays.asList(new StreamRecord((Object)1, 1L), new StreamRecord((Object)1, 2L), new StreamRecord((Object)1, 3L), new StreamRecord((Object)2, 1L), new StreamRecord((Object)2, 2L), new StreamRecord((Object)2, 3L))));
    }
}

