/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.postgresql;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.r2dbc.postgresql.ConnectionResources;
import io.r2dbc.postgresql.ExceptionFactory;
import io.r2dbc.postgresql.PostgresqlResult;
import io.r2dbc.postgresql.api.CopyInBuilder;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.CopyData;
import io.r2dbc.postgresql.message.frontend.CopyDone;
import io.r2dbc.postgresql.message.frontend.CopyFail;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.message.frontend.Query;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.postgresql.util.Operators;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

final class PostgresqlCopyIn {
    private final ConnectionResources context;

    PostgresqlCopyIn(ConnectionResources resources) {
        this.context = Assert.requireNonNull(resources, "resources must not be null");
    }

    Mono<Long> copy(String sql, Publisher<? extends Publisher<ByteBuf>> stdin) {
        ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
        return (Mono)Flux.from(stdin).concatMap(data -> {
            CompositeByteBuf composite = this.context.getClient().getByteBufAllocator().compositeBuffer();
            return Flux.from((Publisher)data).reduce((Object)composite, (l, r) -> l.addComponent(true, r)).map(CopyData::new).doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release);
        }).concatWithValues((Object[])new FrontendMessage[]{CopyDone.INSTANCE}).startWith((Object[])new FrontendMessage[]{new Query(sql)}).as(messages -> this.copyIn(exceptionFactory, (Flux<FrontendMessage>)messages));
    }

    private Mono<Long> copyIn(ExceptionFactory exceptionFactory, Flux<FrontendMessage> copyDataMessages) {
        Client client = this.context.getClient();
        AtomicBoolean stop = new AtomicBoolean();
        Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer();
        Flux requestMessages = sink.asFlux().mergeWith((Publisher)copyDataMessages.doOnComplete(() -> ((Sinks.Many)sink).tryEmitComplete()).filter(it -> !stop.get()).onErrorResume(e -> {
            this.copyFail((Sinks.Many<FrontendMessage>)sink, stop, "Copy operation failed: " + e.getMessage());
            return Mono.empty();
        }));
        return (Mono)((Flux)client.exchange(backendMessage -> backendMessage instanceof ReadyForQuery, (Publisher<FrontendMessage>)requestMessages).doOnNext(it -> {
            if (it instanceof ErrorResponse) {
                stop.set(true);
                sink.tryEmitComplete();
            }
        }).doOnComplete(() -> {
            stop.set(true);
            sink.tryEmitComplete();
        }).doOnError(e -> this.copyFail((Sinks.Many<FrontendMessage>)sink, stop, "Copy operation failed: " + e.getMessage())).doOnCancel(() -> this.copyFail((Sinks.Many<FrontendMessage>)sink, stop, "Copy operation failed: Cancelled")).doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release).as(Operators::discardOnCancel)).doOnCancel(() -> this.copyFail((Sinks.Many<FrontendMessage>)sink, stop, "Copy operation failed: Cancelled")).as(messages -> PostgresqlResult.toResult(this.context, (Flux<BackendMessage>)messages, exceptionFactory).getRowsUpdated());
    }

    private void copyFail(Sinks.Many<FrontendMessage> sink, AtomicBoolean stop, String e) {
        sink.tryEmitNext((Object)new CopyFail(e));
        sink.tryEmitComplete();
        stop.set(true);
    }

    public String toString() {
        return "PostgresqlCopyIn{context=" + this.context + '}';
    }

    static final class Builder
    implements CopyInBuilder {
        private final ConnectionResources resources;
        private final String sql;
        @Nullable
        private Publisher<? extends Publisher<ByteBuf>> stdin;

        Builder(ConnectionResources resources, String sql) {
            this.resources = resources;
            this.sql = sql;
        }

        @Override
        public CopyInBuilder fromMany(Publisher<? extends Publisher<ByteBuf>> stdin) {
            this.stdin = Assert.requireNonNull(stdin, "stdin must not be null");
            return this;
        }

        @Override
        public Mono<Long> build() {
            if (this.stdin == null) {
                throw new IllegalArgumentException("No stdin configured for COPY IN");
            }
            return new PostgresqlCopyIn(this.resources).copy(this.sql, this.stdin);
        }
    }
}

