package io.helidon.media.multipart;

import io.helidon.common.http.DataChunk;
import io.helidon.common.reactive.BufferedEmittingPublisher;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
import io.helidon.common.reactive.SubscriptionHelper;
import io.helidon.media.common.MessageBodyWriterContext;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.function.Function;

/* loaded from: input_file:io/helidon/media/multipart/MultiPartEncoder.class */
public class MultiPartEncoder implements Flow.Processor<WriteableBodyPart, DataChunk> {
    private final MessageBodyWriterContext context;
    private final String boundary;
    private final CompletableFuture<BufferedEmittingPublisher<Flow.Publisher<DataChunk>>> initFuture;
    private BufferedEmittingPublisher<Flow.Publisher<DataChunk>> emitter;
    private Flow.Subscriber<? super DataChunk> downstream;
    private Flow.Subscription upstream;

    MultiPartEncoder(String str, MessageBodyWriterContext messageBodyWriterContext) {
        Objects.requireNonNull(str, "boundary cannot be null!");
        Objects.requireNonNull(messageBodyWriterContext, "context cannot be null!");
        this.context = messageBodyWriterContext;
        this.boundary = str;
        this.initFuture = new CompletableFuture<>();
    }

    public static MultiPartEncoder create(String str, MessageBodyWriterContext messageBodyWriterContext) {
        return new MultiPartEncoder(str, messageBodyWriterContext);
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super DataChunk> subscriber) {
        Objects.requireNonNull(subscriber);
        if (this.emitter == null && this.downstream == null) {
            this.downstream = subscriber;
            deferredInit();
        } else {
            subscriber.onSubscribe(SubscriptionHelper.CANCELED);
            subscriber.onError(new IllegalStateException("Only one Subscriber allowed"));
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        SubscriptionHelper.validate(this.upstream, subscription);
        this.upstream = subscription;
        deferredInit();
    }

    private void deferredInit() {
        if (this.upstream == null || this.downstream == null) {
            return;
        }
        this.emitter = BufferedEmittingPublisher.create();
        this.emitter.onRequest((l, l2) -> {
            this.upstream.request(l.longValue());
        });
        Multi.create(this.emitter).flatMap(Function.identity()).onCompleteResume(DataChunk.create(("--" + this.boundary + "--").getBytes(StandardCharsets.UTF_8))).subscribe(this.downstream);
        this.initFuture.complete(this.emitter);
        this.downstream = null;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(WriteableBodyPart writeableBodyPart) {
        this.emitter.emit(createBodyPartPublisher(writeableBodyPart));
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        Objects.requireNonNull(th);
        this.initFuture.whenComplete((bufferedEmittingPublisher, th2) -> {
            bufferedEmittingPublisher.fail(th);
        });
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        this.initFuture.whenComplete((bufferedEmittingPublisher, th) -> {
            bufferedEmittingPublisher.complete();
        });
    }

    private Flow.Publisher<DataChunk> createBodyPartPublisher(WriteableBodyPart writeableBodyPart) {
        StringBuilder append = new StringBuilder("--").append(this.boundary).append("\r\n");
        for (Map.Entry entry : writeableBodyPart.headers().toMap().entrySet()) {
            String str = (String) entry.getKey();
            Iterator it = ((List) entry.getValue()).iterator();
            while (it.hasNext()) {
                append.append(str).append(":").append((String) it.next()).append("\r\n");
            }
        }
        append.append("\r\n");
        return Multi.concat(Multi.concat(Single.just(DataChunk.create(append.toString().getBytes(StandardCharsets.UTF_8))), writeableBodyPart.mo12content().init(this.context)), Single.just(DataChunk.create("\r\n".getBytes(StandardCharsets.UTF_8))));
    }
}
