package org.apache.flink.table.examples.java.connectors;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.data.RowData;

/* loaded from: input_file:org/apache/flink/table/examples/java/connectors/SocketSourceFunction.class */
public final class SocketSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
    private final String hostname;
    private final int port;
    private final byte byteDelimiter;
    private final DeserializationSchema<RowData> deserializer;
    private volatile boolean isRunning = true;
    private Socket currentSocket;

    public SocketSourceFunction(String str, int i, byte b, DeserializationSchema<RowData> deserializationSchema) {
        this.hostname = str;
        this.port = i;
        this.byteDelimiter = b;
        this.deserializer = deserializationSchema;
    }

    public TypeInformation<RowData> getProducedType() {
        return this.deserializer.getProducedType();
    }

    public void open(Configuration configuration) throws Exception {
        this.deserializer.open(() -> {
            return getRuntimeContext().getMetricGroup();
        });
    }

    public void run(SourceFunction.SourceContext<RowData> sourceContext) throws Exception {
        Socket socket;
        Throwable th;
        while (this.isRunning) {
            try {
                socket = new Socket();
                th = null;
            } catch (Throwable th2) {
                th2.printStackTrace();
            }
            try {
                try {
                    this.currentSocket = socket;
                    socket.connect(new InetSocketAddress(this.hostname, this.port), 0);
                    InputStream inputStream = socket.getInputStream();
                    Throwable th3 = null;
                    try {
                        try {
                            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                            while (true) {
                                int read = inputStream.read();
                                if (read < 0) {
                                    break;
                                }
                                if (read != this.byteDelimiter) {
                                    byteArrayOutputStream.write(read);
                                } else {
                                    sourceContext.collect(this.deserializer.deserialize(byteArrayOutputStream.toByteArray()));
                                    byteArrayOutputStream.reset();
                                }
                            }
                            if (inputStream != null) {
                                if (0 != 0) {
                                    try {
                                        inputStream.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                } else {
                                    inputStream.close();
                                }
                            }
                            if (socket != null) {
                                if (0 != 0) {
                                    try {
                                        socket.close();
                                    } catch (Throwable th5) {
                                        th.addSuppressed(th5);
                                    }
                                } else {
                                    socket.close();
                                }
                            }
                            Thread.sleep(1000L);
                        } catch (Throwable th6) {
                            th3 = th6;
                            throw th6;
                            break;
                        }
                    } catch (Throwable th7) {
                        if (inputStream != null) {
                            if (th3 != null) {
                                try {
                                    inputStream.close();
                                } catch (Throwable th8) {
                                    th3.addSuppressed(th8);
                                }
                            } else {
                                inputStream.close();
                            }
                        }
                        throw th7;
                        break;
                    }
                } catch (Throwable th9) {
                    th = th9;
                    throw th9;
                    break;
                }
            } finally {
            }
        }
    }

    public void cancel() {
        this.isRunning = false;
        try {
            this.currentSocket.close();
        } catch (Throwable th) {
        }
    }
}
