/*
 * Decompiled with CFR 0.152.
 */
package oracle.r2dbc.impl;

import io.r2dbc.spi.Blob;
import io.r2dbc.spi.Clob;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

final class OracleLargeObjects {
    private OracleLargeObjects() {
    }

    static Blob createBlob(Publisher<ByteBuffer> contentPublisher, final Publisher<Void> releasePublisher) {
        final Publisher<ByteBuffer> streamPublisher = OracleLargeObjects.createStreamPublisher(contentPublisher, releasePublisher);
        return new Blob(){

            public Publisher<ByteBuffer> stream() {
                return streamPublisher;
            }

            public Publisher<Void> discard() {
                return releasePublisher;
            }
        };
    }

    static Clob createClob(Publisher<? extends CharSequence> contentPublisher, final Publisher<Void> releasePublisher) {
        final Publisher<? extends CharSequence> streamPublisher = OracleLargeObjects.createStreamPublisher(contentPublisher, releasePublisher);
        return new Clob(){

            public Publisher<CharSequence> stream() {
                return streamPublisher;
            }

            public Publisher<Void> discard() {
                return releasePublisher;
            }
        };
    }

    private static <T> Publisher<T> createStreamPublisher(Publisher<? extends T> contentPublisher, Publisher<Void> releasePublisher) {
        AtomicBoolean isSubscribed = new AtomicBoolean(false);
        return subscriber -> {
            Objects.requireNonNull(subscriber, "Subscriber is null");
            if (!isSubscribed.compareAndSet(false, true)) {
                throw new IllegalStateException("A content stream can not be consumed more than once");
            }
            Flux.from((Publisher)contentPublisher).doFinally(signalType -> {
                switch (signalType) {
                    case CANCEL: 
                    case ON_COMPLETE: 
                    case ON_ERROR: {
                        Mono.from((Publisher)releasePublisher).subscribe();
                    }
                }
            }).subscribe(subscriber);
        };
    }
}

