/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;

public class GlobalBatchCommitterHandlerTest
extends TestLogger {
    @Test(expected=IllegalStateException.class)
    public void throwExceptionWithoutCommitter() throws Exception {
        OneInputStreamOperatorTestHarness<byte[], byte[]> testHarness = this.createTestHarness(null);
        testHarness.initializeEmptyState();
    }

    @Test
    public void supportRetry() throws Exception {
        TestSink.RetryOnceGlobalCommitter globalCommitter = new TestSink.RetryOnceGlobalCommitter();
        OneInputStreamOperatorTestHarness<byte[], byte[]> testHarness = this.createTestHarness(globalCommitter);
        testHarness.initializeEmptyState();
        testHarness.open();
        testHarness.processElement(SinkTestUtil.committableRecord("hotel"));
        testHarness.processElement(SinkTestUtil.committableRecord("motel"));
        testHarness.endInput();
        testHarness.close();
        MatcherAssert.assertThat(globalCommitter.getCommittedData(), (Matcher)Matchers.contains((Object[])new String[]{"hotel|motel"}));
    }

    @Test
    public void endOfInput() throws Exception {
        TestSink.DefaultGlobalCommitter globalCommitter = new TestSink.DefaultGlobalCommitter();
        OneInputStreamOperatorTestHarness<byte[], byte[]> testHarness = this.createTestHarness(globalCommitter);
        List<String> inputs = Arrays.asList("compete", "swear", "shallow");
        testHarness.initializeEmptyState();
        testHarness.open();
        testHarness.processElements(SinkTestUtil.committableRecords(inputs));
        testHarness.endInput();
        List<String> expectedCommittedData = Arrays.asList(globalCommitter.combine((List)inputs), "end of input");
        MatcherAssert.assertThat(globalCommitter.getCommittedData(), (Matcher)Matchers.containsInAnyOrder((Object[])expectedCommittedData.toArray()));
        testHarness.close();
    }

    @Test
    public void close() throws Exception {
        TestSink.DefaultGlobalCommitter globalCommitter = new TestSink.DefaultGlobalCommitter();
        OneInputStreamOperatorTestHarness<byte[], byte[]> testHarness = this.createTestHarness(globalCommitter);
        testHarness.initializeEmptyState();
        testHarness.open();
        testHarness.close();
        MatcherAssert.assertThat((Object)globalCommitter.isClosed(), (Matcher)Matchers.is((Object)true));
    }

    private OneInputStreamOperatorTestHarness<byte[], byte[]> createTestHarness(GlobalCommitter<String, String> globalCommitter) throws Exception {
        return new OneInputStreamOperatorTestHarness<byte[], byte[]>((OneInputStreamOperatorFactory<byte[], byte[]>)new CommitterOperatorFactory(TestSink.newBuilder().setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE).setGlobalCommitter(globalCommitter).setGlobalCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE).build(), true));
    }
}

