/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.BindException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
import org.apache.flink.util.NetUtils;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.AssumptionViolatedException;
import org.junit.jupiter.api.Test;

class SocketClientSinkTest {
    private static final String TEST_MESSAGE = "testSocketSinkInvoke";
    private static final String EXCEPTION_MESSGAE = "Failed to send message 'testSocketSinkInvoke\n'";
    private static final String host = "127.0.0.1";
    private SerializationSchema<String> simpleSchema = new SerializationSchema<String>(){

        public byte[] serialize(String element) {
            return element.getBytes(ConfigConstants.DEFAULT_CHARSET);
        }
    };

    SocketClientSinkTest() {
    }

    @Test
    void testSocketSink() throws Exception {
        ServerSocket server = new ServerSocket(0);
        final int port = server.getLocalPort();
        CheckedThread sinkRunner = new CheckedThread("Test sink runner"){

            public void go() throws Exception {
                SocketClientSink simpleSink = new SocketClientSink(SocketClientSinkTest.host, port, SocketClientSinkTest.this.simpleSchema, 0);
                simpleSink.open(DefaultOpenContext.INSTANCE);
                simpleSink.invoke((Object)"testSocketSinkInvoke\n", SinkContextUtil.forTimestamp((long)0L));
                simpleSink.close();
            }
        };
        sinkRunner.start();
        Socket sk = NetUtils.acceptWithoutTimeout((ServerSocket)server);
        BufferedReader rdr = new BufferedReader(new InputStreamReader(sk.getInputStream()));
        String value = rdr.readLine();
        sinkRunner.sync();
        server.close();
        Assertions.assertThat((String)value).isEqualTo(TEST_MESSAGE);
    }

    @Test
    void testSinkAutoFlush() throws Exception {
        ServerSocket server = new ServerSocket(0);
        int port = server.getLocalPort();
        final SocketClientSink simpleSink = new SocketClientSink(host, port, this.simpleSchema, 0, true);
        simpleSink.open(DefaultOpenContext.INSTANCE);
        CheckedThread sinkRunner = new CheckedThread("Test sink runner"){

            public void go() throws Exception {
                simpleSink.invoke((Object)"testSocketSinkInvoke\n", SinkContextUtil.forTimestamp((long)0L));
            }
        };
        sinkRunner.start();
        Socket sk = NetUtils.acceptWithoutTimeout((ServerSocket)server);
        BufferedReader rdr = new BufferedReader(new InputStreamReader(sk.getInputStream()));
        String value = rdr.readLine();
        sinkRunner.sync();
        simpleSink.close();
        server.close();
        Assertions.assertThat((String)value).isEqualTo(TEST_MESSAGE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testSocketSinkNoRetry() throws Exception {
        final ServerSocket server = new ServerSocket(0);
        int port = server.getLocalPort();
        try {
            CheckedThread serverRunner = new CheckedThread("Test server runner"){

                public void go() throws Exception {
                    Socket sk = NetUtils.acceptWithoutTimeout((ServerSocket)server);
                    sk.close();
                }
            };
            serverRunner.start();
            SocketClientSink simpleSink = new SocketClientSink(host, port, this.simpleSchema, 0, true);
            simpleSink.open(DefaultOpenContext.INSTANCE);
            serverRunner.sync();
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
                while (true) {
                    simpleSink.invoke((Object)"testSocketSinkInvoke\n", SinkContextUtil.forTimestamp((long)0L));
                }
            }).isInstanceOf(IOException.class)).hasMessageContaining(EXCEPTION_MESSGAE);
            Assertions.assertThat((int)simpleSink.getCurrentNumberOfRetries()).isZero();
        }
        finally {
            IOUtils.closeQuietly((ServerSocket)server);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testRetry() throws Exception {
        final ServerSocket[] serverSocket = new ServerSocket[1];
        ExecutorService[] executor = new ExecutorService[1];
        try {
            serverSocket[0] = new ServerSocket(0);
            executor[0] = Executors.newCachedThreadPool();
            int port = serverSocket[0].getLocalPort();
            Callable<Void> serverTask = new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    Socket socket = NetUtils.acceptWithoutTimeout((ServerSocket)serverSocket[0]);
                    BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                    String value = reader.readLine();
                    Assertions.assertThat((String)value).isEqualTo("0");
                    socket.close();
                    return null;
                }
            };
            Future<Void> serverFuture = executor[0].submit(serverTask);
            final SocketClientSink sink = new SocketClientSink(host, serverSocket[0].getLocalPort(), this.simpleSchema, -1, true);
            sink.open(DefaultOpenContext.INSTANCE);
            sink.invoke((Object)"0\n", SinkContextUtil.forTimestamp((long)0L));
            serverFuture.get();
            serverSocket[0].close();
            Assertions.assertThat((boolean)serverSocket[0].isClosed()).isTrue();
            Assertions.assertThat((int)sink.getCurrentNumberOfRetries()).isZero();
            final CountDownLatch retryLatch = new CountDownLatch(1);
            CountDownLatch again = new CountDownLatch(1);
            Callable<Void> sinkTask = new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    while (retryLatch.getCount() != 0L) {
                        sink.invoke((Object)"1\n");
                    }
                    return null;
                }
            };
            Future<Void> sinkFuture = executor[0].submit(sinkTask);
            while (sink.getCurrentNumberOfRetries() == 0) {
                Thread.sleep(100L);
            }
            retryLatch.countDown();
            try {
                serverSocket[0] = new ServerSocket(port);
            }
            catch (BindException be) {
                throw new AssumptionViolatedException("Could not bind server to previous port.", (Throwable)be);
            }
            Socket socket = NetUtils.acceptWithoutTimeout((ServerSocket)serverSocket[0]);
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String value = reader.readLine();
            Assertions.assertThat((String)value).isEqualTo("1");
        }
        finally {
            if (serverSocket[0] != null) {
                serverSocket[0].close();
            }
            if (executor[0] != null) {
                executor[0].shutdown();
            }
        }
    }
}

