/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.AbstractProcessor;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;

public final class StreamSocketP
extends AbstractProcessor {
    private final String host;
    private final int port;
    private final Charset charset;
    private CompletableFuture<Void> jobFuture;

    private StreamSocketP(String host, int port, Charset charset) {
        this.host = host;
        this.port = port;
        this.charset = charset;
    }

    @Override
    protected void init(@Nonnull Processor.Context context) throws Exception {
        super.init(context);
        this.jobFuture = context.jobFuture();
    }

    @Override
    public boolean complete() {
        try {
            this.getLogger().info("Connecting to socket " + this.hostAndPort());
            try (Socket socket = new Socket(this.host, this.port);
                 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream(), this.charset));){
                String line;
                this.getLogger().info("Connected to socket " + this.hostAndPort());
                while (!this.jobFuture.isDone() && (line = bufferedReader.readLine()) != null) {
                    this.emit(line);
                }
                this.getLogger().info("Closing socket " + this.hostAndPort());
            }
            return true;
        }
        catch (IOException e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    @Override
    public boolean isCooperative() {
        return false;
    }

    private String hostAndPort() {
        return this.host + ':' + this.port;
    }

    public static DistributedSupplier<Processor> supplier(String host, int port, @Nonnull String charset) {
        return () -> new StreamSocketP(host, port, Charset.forName(charset));
    }
}

