/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.web.reactive.socket.adapter;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.adapter.NettyWebSocketSessionSupport;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;

public class RxNettyWebSocketSession
extends NettyWebSocketSessionSupport<WebSocketConnection> {
    public static final String FRAME_AGGREGATOR_NAME = "websocket-frame-aggregator";

    public RxNettyWebSocketSession(WebSocketConnection conn, HandshakeInfo info, NettyDataBufferFactory factory) {
        super(conn, info, factory);
    }

    public RxNettyWebSocketSession aggregateFrames(Channel channel, String frameDecoderName) {
        ChannelPipeline pipeline = channel.pipeline();
        if (pipeline.context(FRAME_AGGREGATOR_NAME) != null) {
            this.logger.trace((Object)"WebSocketFrameAggregator already registered.");
            return this;
        }
        ChannelHandlerContext frameDecoder = pipeline.context(frameDecoderName);
        Assert.notNull((Object)frameDecoder, (String)("WebSocketFrameDecoder not found: " + frameDecoderName));
        WebSocketFrameAggregator frameAggregator = new WebSocketFrameAggregator(65536);
        pipeline.addAfter(frameDecoder.name(), FRAME_AGGREGATOR_NAME, (ChannelHandler)frameAggregator);
        return this;
    }

    @Override
    public Flux<WebSocketMessage> receive() {
        Observable messages = ((WebSocketConnection)this.getDelegate()).getInput().filter(frame -> !(frame instanceof CloseWebSocketFrame)).map(x$0 -> super.toMessage((WebSocketFrame)x$0));
        return Flux.from((Publisher)RxReactiveStreams.toPublisher((Observable)messages));
    }

    @Override
    public Mono<Void> send(Publisher<WebSocketMessage> messages) {
        Observable frames = RxReactiveStreams.toObservable(messages).map(this::toFrame);
        Observable completion = ((WebSocketConnection)this.getDelegate()).writeAndFlushOnEach(frames);
        return Mono.from((Publisher)RxReactiveStreams.toPublisher((Observable)completion));
    }

    @Override
    public Mono<Void> close(CloseStatus status) {
        Observable completion = ((WebSocketConnection)this.getDelegate()).close();
        return Mono.from((Publisher)RxReactiveStreams.toPublisher((Observable)completion));
    }
}

