/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.util;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import rx.Observable;
import rx.functions.Action2;
import rx.functions.Func0;

public class CollectBytes
implements Observable.Transformer<ByteBuf, ByteBuf> {
    private final int maxBytes;

    public static CollectBytes all() {
        return CollectBytes.upTo(Integer.MAX_VALUE);
    }

    public static CollectBytes upTo(int maxBytes) {
        return new CollectBytes(maxBytes);
    }

    private CollectBytes(int maxBytes) {
        if (maxBytes < 0) {
            throw new IllegalArgumentException("maxBytes must not be negative");
        }
        this.maxBytes = maxBytes;
    }

    @Override
    public Observable<ByteBuf> call(Observable<ByteBuf> upstream) {
        return upstream.collect(new Func0<CompositeByteBuf>(){

            @Override
            public CompositeByteBuf call() {
                return Unpooled.compositeBuffer();
            }
        }, new Action2<CompositeByteBuf, ByteBuf>(){

            @Override
            public void call(CompositeByteBuf collector, ByteBuf buf) {
                long newLength = collector.readableBytes() + buf.readableBytes();
                if (newLength > (long)CollectBytes.this.maxBytes) {
                    collector.release();
                    buf.release();
                    throw new TooMuchDataException("More than " + CollectBytes.this.maxBytes + "B received");
                }
                collector.addComponent(true, buf);
            }
        }).cast(ByteBuf.class);
    }

    public static class TooMuchDataException
    extends RuntimeException {
        public TooMuchDataException(String message) {
            super(message);
        }
    }
}

