/*
 * Decompiled with CFR 0.152.
 */
package cn.iflow.sdk.transport;

import cn.iflow.sdk.exceptions.ConnectionException;
import cn.iflow.sdk.exceptions.TimeoutException;
import cn.iflow.sdk.exceptions.TransportException;
import cn.iflow.sdk.types.config.IFlowOptions;
import cn.iflow.sdk.util.GsonFactory;
import com.google.gson.Gson;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.exceptions.WebsocketNotConnectedException;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;

public class WebSocketTransport
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(WebSocketTransport.class);
    private final String url;
    private final Duration timeout;
    private final Gson gson;
    private final AtomicBoolean connected = new AtomicBoolean(false);
    private final AtomicReference<WebSocketClient> websocket = new AtomicReference();
    private final Sinks.Many<String> messageSink = Sinks.many().multicast().onBackpressureBuffer();
    private final Flux<String> messageFlux = this.messageSink.asFlux().publish().autoConnect();

    public WebSocketTransport(String url, Duration timeout) {
        this.url = url;
        this.timeout = timeout != null ? timeout : Duration.ofMinutes(5L);
        this.gson = GsonFactory.getInstance();
    }

    public WebSocketTransport(IFlowOptions options) {
        this(WebSocketTransport.determineWebSocketUrl(options), options.getTimeout());
    }

    public Mono<Void> connect() {
        if (this.connected.get()) {
            log.warn("Already connected to {}", (Object)this.url);
            return Mono.empty();
        }
        return Mono.create(sink -> {
            try {
                log.info("Connecting to {}", (Object)this.url);
                URI serverUri = URI.create(this.url);
                WebSocketClient wsClient = new WebSocketClient(serverUri, (MonoSink)sink){
                    final /* synthetic */ MonoSink val$sink;
                    {
                        this.val$sink = monoSink;
                        super(serverUri);
                    }

                    public void onOpen(ServerHandshake handshake) {
                        log.info("Connected to {}", (Object)WebSocketTransport.this.url);
                        WebSocketTransport.this.connected.set(true);
                        this.val$sink.success();
                    }

                    public void onMessage(String message) {
                        log.debug("Received message: {}", message.length() > 200 ? message.substring(0, 200) + "..." : message);
                        WebSocketTransport.this.messageSink.tryEmitNext((Object)message);
                    }

                    public void onClose(int code, String reason, boolean remote) {
                        log.info("Connection closed: {} - {}", (Object)code, (Object)reason);
                        WebSocketTransport.this.connected.set(false);
                        WebSocketTransport.this.messageSink.tryEmitComplete();
                    }

                    public void onError(Exception ex) {
                        log.error("WebSocket error: {}", (Object)ex.getMessage());
                        WebSocketTransport.this.connected.set(false);
                        ConnectionException connEx = new ConnectionException("WebSocket connection failed: " + ex.getMessage(), ex);
                        WebSocketTransport.this.messageSink.tryEmitError((Throwable)connEx);
                        this.val$sink.error((Throwable)connEx);
                    }
                };
                this.websocket.set(wsClient);
                wsClient.setConnectionLostTimeout(30);
                wsClient.connect();
            }
            catch (Exception e) {
                this.connected.set(false);
                ConnectionException connEx = new ConnectionException("Failed to connect to " + this.url + ": " + e.getMessage(), e);
                sink.error((Throwable)connEx);
            }
        }).timeout(this.timeout, Mono.error((Throwable)new TimeoutException("Connection timeout after " + this.timeout.toSeconds() + "s"))).cast(Void.class).doOnError(error -> {
            this.connected.set(false);
            WebSocketClient wsClient = this.websocket.get();
            if (wsClient != null) {
                try {
                    wsClient.close();
                }
                catch (Exception closeEx) {
                    log.warn("Error closing WebSocket after connection failure: {}", (Object)closeEx.getMessage());
                }
            }
        });
    }

    public Mono<Void> send(Object message) {
        if (!this.connected.get() || this.websocket.get() == null) {
            return Mono.error((Throwable)new ConnectionException("Not connected"));
        }
        return Mono.fromRunnable(() -> {
            try {
                String data = message instanceof String ? (String)message : this.gson.toJson(message);
                this.websocket.get().send(data);
                log.debug("Sent message: {}", (Object)data);
            }
            catch (Exception e) {
                if (e.getCause() instanceof WebsocketNotConnectedException) {
                    this.connected.set(false);
                    throw new ConnectionException("Connection lost: " + e.getMessage());
                }
                throw new TransportException("Failed to send message: " + e.getMessage(), e);
            }
        });
    }

    public Flux<String> receive() {
        if (!this.connected.get() || this.websocket.get() == null) {
            return Flux.error((Throwable)new ConnectionException("Not connected"));
        }
        return this.messageFlux.doOnSubscribe(subscription -> log.debug("Started receiving messages")).doOnComplete(() -> log.debug("Message stream completed")).doOnError(error -> log.error("Message stream error: {}", (Object)error.getMessage()));
    }

    @Override
    public void close() {
        WebSocketClient wsClient = this.websocket.get();
        if (wsClient != null && this.connected.get()) {
            try {
                wsClient.close();
                log.info("WebSocket connection closed");
            }
            catch (Exception e) {
                log.warn("Error closing WebSocket: {}", (Object)e.getMessage());
            }
            finally {
                this.connected.set(false);
                this.websocket.set(null);
            }
        }
    }

    public boolean isConnected() {
        return this.connected.get() && this.websocket.get() != null;
    }

    private static String determineWebSocketUrl(IFlowOptions options) {
        if (options.getUrl() != null && !options.getUrl().isEmpty()) {
            return options.getUrl();
        }
        int port = options.getProcessStartPort();
        if (port > 0) {
            return "ws://localhost:" + port + "/acp";
        }
        return "ws://localhost:8090/acp";
    }
}

