/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.examples.java.connectors;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.data.RowData;

public final class SocketSourceFunction
extends RichSourceFunction<RowData>
implements ResultTypeQueryable<RowData> {
    private final String hostname;
    private final int port;
    private final byte byteDelimiter;
    private final DeserializationSchema<RowData> deserializer;
    private volatile boolean isRunning = true;
    private Socket currentSocket;

    public SocketSourceFunction(String hostname, int port, byte byteDelimiter, DeserializationSchema<RowData> deserializer) {
        this.hostname = hostname;
        this.port = port;
        this.byteDelimiter = byteDelimiter;
        this.deserializer = deserializer;
    }

    public TypeInformation<RowData> getProducedType() {
        return this.deserializer.getProducedType();
    }

    public void open(Configuration parameters) throws Exception {
        this.deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter((RuntimeContext)this.getRuntimeContext()));
    }

    public void run(SourceFunction.SourceContext<RowData> ctx) throws Exception {
        while (this.isRunning) {
            try (Socket socket = new Socket();){
                this.currentSocket = socket;
                socket.connect(new InetSocketAddress(this.hostname, this.port), 0);
                try (InputStream stream = socket.getInputStream();){
                    int b;
                    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
                    while ((b = stream.read()) >= 0) {
                        if (b != this.byteDelimiter) {
                            buffer.write(b);
                            continue;
                        }
                        ctx.collect(this.deserializer.deserialize(buffer.toByteArray()));
                        buffer.reset();
                    }
                }
            }
            catch (Throwable t) {
                t.printStackTrace();
            }
            Thread.sleep(1000L);
        }
    }

    public void cancel() {
        this.isRunning = false;
        try {
            this.currentSocket.close();
        }
        catch (Throwable throwable) {
            // empty catch block
        }
    }
}

