/*
 * Decompiled with CFR 0.152.
 */
package stream.net;

import java.net.ServerSocket;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.annotations.Parameter;
import stream.io.Sink;
import stream.net.ConnectionHandler;

public class DataTapSink
implements Sink {
    protected Integer port = 9100;
    protected int clientBufferSize = 10;
    protected boolean gzip = false;
    protected boolean detectClientClose = false;
    protected boolean disconnectSlowClients = false;
    protected boolean logBufferFull = false;
    static final Logger log = LoggerFactory.getLogger(DataTapSink.class);
    protected ConnectionHandler connectionHandler;
    protected String id;

    public String getId() {
        return this.id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Integer getPort() {
        return this.port;
    }

    @Parameter(description="The port to listen on for incoming tap connections, defaults to 9100.")
    public void setPort(Integer port) {
        this.port = port;
    }

    public int getClientBufferSize() {
        return this.clientBufferSize;
    }

    @Parameter(description="The buffer size (number of items) used for each client", defaultValue="10")
    public void setClientBufferSize(int clientBufferSize) {
        this.clientBufferSize = clientBufferSize;
    }

    public boolean isGzip() {
        return this.gzip;
    }

    @Parameter(description="This parameter allows for enabling GZIP compression on the TCP stream, default is no compression.")
    public void setGzip(boolean gzip) {
        this.gzip = gzip;
    }

    public boolean isActivelyDetectClientClose() {
        return this.detectClientClose;
    }

    @Parameter(required=false, defaultValue="false", description="Defines if this sink actively listens to client disconnect events (= client's input stream read-method returns '-1').If true, this sink will close the connection immediately if the event occurs.Otherwise a client disconnect will be detected (and the connection will also be closed)the next time, an item should be transferred to the client (and therefore the client's output stream write-method is unsuccessful).")
    public void setDetectClientClose(boolean activelyDetectClientClose) {
        this.detectClientClose = activelyDetectClientClose;
    }

    public boolean isDisconnectSlowClients() {
        return this.disconnectSlowClients;
    }

    @Parameter(required=false, defaultValue="false", description="Defines if slow clients should be disconnected. A client is defined to be 'slow', if its buffer is completely filled.")
    public void setDisconnectSlowClients(boolean disconnectSlowClients) {
        this.disconnectSlowClients = disconnectSlowClients;
    }

    public void init() throws Exception {
        ExecutorService pool = Executors.newCachedThreadPool();
        ServerSocket socket = new ServerSocket(this.port);
        this.connectionHandler = new ConnectionHandler(pool, socket);
        this.connectionHandler.init(this.clientBufferSize, this.gzip, this.disconnectSlowClients);
        pool.execute(this.connectionHandler);
    }

    public boolean write(Collection<Data> data) throws Exception {
        for (Data d : data) {
            this.write(d);
        }
        return true;
    }

    public boolean write(Data item) throws Exception {
        if (item != null) {
            this.connectionHandler.write(item);
        }
        return true;
    }

    public void close() throws Exception {
        this.connectionHandler.close();
    }
}

