/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.codec;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.reactivestreams.SubscriberBarrier;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;

public abstract class BufferCodec<IN, OUT>
extends Codec<Buffer, IN, OUT> {
    protected BufferCodec() {
    }

    protected BufferCodec(Byte delimiter) {
        super(delimiter);
    }

    @Override
    public Publisher<IN> decode(Publisher<? extends Buffer> publisherToDecode) {
        return super.decode(publisherToDecode);
    }

    @Override
    public Publisher<Buffer> encode(Publisher<? extends OUT> publisherToEncode) {
        return super.encode(publisherToEncode);
    }

    private class AggregatingEncoderBarrier
    extends SubscriberBarrier<OUT, Buffer> {
        final Buffer aggregate;

        public AggregatingEncoderBarrier(Subscriber<? super Buffer> subscriber) {
            super(subscriber);
            this.aggregate = new Buffer();
        }

        @Override
        protected void doNext(OUT src) {
        }
    }

    private static final class AggregatingDecoderBarrier<IN>
    extends SubscriberBarrier<Buffer, IN> {
        private volatile long pendingDemand = 0L;
        private static final AtomicLongFieldUpdater<AggregatingDecoderBarrier> PENDING_UPDATER = AtomicLongFieldUpdater.newUpdater(AggregatingDecoderBarrier.class, "pendingDemand");
        final Buffer aggregate;
        final Function<Buffer, IN> codec;
        final Byte delimiter;

        public AggregatingDecoderBarrier(BufferCodec<IN, ?> codec, Subscriber<? super IN> subscriber) {
            super(subscriber);
            this.codec = codec.decoder();
            this.delimiter = codec.delimiter;
            this.aggregate = this.delimiter != null ? null : null;
        }

        @Override
        protected void doNext(Buffer buffer) {
            long previous = PENDING_UPDATER.decrementAndGet(this);
            if (this.aggregate != null) {
                this.aggregate.append(buffer);
                buffer.position(0);
                if (this.delimiter != null) {
                    int index = buffer.indexOf(this.delimiter);
                    if (index == -1) {
                        return;
                    }
                    int aggregateIndex = this.aggregate.limit() - buffer.limit() + index;
                    Buffer aggregTmp = this.aggregate.duplicate();
                    aggregTmp.position(this.aggregate.position()).flip();
                    for (Buffer.View view : aggregTmp.split(this.delimiter.byteValue())) {
                        if (view.getEnd() == aggregTmp.limit()) {
                            return;
                        }
                        this.subscriber.onNext(this.codec.apply(view.get()));
                    }
                    this.aggregate.clear();
                }
                return;
            }
            this.subscriber.onNext(this.codec.apply(buffer));
        }

        @Override
        protected void doRequest(long n) {
            long previous = PENDING_UPDATER.getAndAdd(this, n);
            super.doRequest(n);
        }
    }
}

