/*
 * Decompiled with CFR 0.152.
 */
package io.opentelemetry.testing.internal.armeria.common.multipart;

import io.opentelemetry.testing.internal.armeria.common.HttpData;
import io.opentelemetry.testing.internal.armeria.common.HttpHeaders;
import io.opentelemetry.testing.internal.armeria.common.HttpMethod;
import io.opentelemetry.testing.internal.armeria.common.HttpRequest;
import io.opentelemetry.testing.internal.armeria.common.HttpResponse;
import io.opentelemetry.testing.internal.armeria.common.HttpStatus;
import io.opentelemetry.testing.internal.armeria.common.MediaType;
import io.opentelemetry.testing.internal.armeria.common.RequestHeaders;
import io.opentelemetry.testing.internal.armeria.common.ResponseHeaders;
import io.opentelemetry.testing.internal.armeria.common.annotation.Nullable;
import io.opentelemetry.testing.internal.armeria.common.multipart.AggregatedBodyPart;
import io.opentelemetry.testing.internal.armeria.common.multipart.AggregatedMultipart;
import io.opentelemetry.testing.internal.armeria.common.multipart.BodyPart;
import io.opentelemetry.testing.internal.armeria.common.multipart.Multipart;
import io.opentelemetry.testing.internal.armeria.common.multipart.MultipartEncoder;
import io.opentelemetry.testing.internal.armeria.common.stream.ByteStreamMessage;
import io.opentelemetry.testing.internal.armeria.common.stream.StreamMessage;
import io.opentelemetry.testing.internal.armeria.common.stream.SubscriptionOption;
import io.opentelemetry.testing.internal.armeria.common.util.UnmodifiableFuture;
import io.opentelemetry.testing.internal.armeria.internal.shaded.futures.CompletableFutures;
import io.opentelemetry.testing.internal.armeria.internal.shaded.guava.base.MoreObjects;
import io.opentelemetry.testing.internal.armeria.internal.shaded.guava.base.Preconditions;
import io.opentelemetry.testing.internal.armeria.internal.shaded.guava.io.BaseEncoding;
import io.opentelemetry.testing.internal.io.netty.buffer.ByteBufAllocator;
import io.opentelemetry.testing.internal.io.netty.util.concurrent.EventExecutor;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

final class DefaultMultipart
implements Multipart,
StreamMessage<HttpData> {
    private static final BaseEncoding base64 = BaseEncoding.base64().omitPadding();
    private static final String BOUNDARY_PARAMETER = "boundary";
    private static final MediaType DEFAULT_MULTIPART_TYPE = MediaType.MULTIPART_FORM_DATA;
    private final String boundary;
    private final StreamMessage<BodyPart> parts;

    static String randomBoundary() {
        byte[] bytes = new byte[12];
        ThreadLocalRandom.current().nextBytes(bytes);
        return "ArmeriaBoundary" + base64.encode(bytes);
    }

    DefaultMultipart(String boundary, StreamMessage<? extends BodyPart> parts) {
        this.boundary = boundary;
        this.parts = parts;
    }

    @Override
    public String boundary() {
        return this.boundary;
    }

    @Override
    public StreamMessage<BodyPart> bodyParts() {
        return this.parts;
    }

    @Override
    public void subscribe(Subscriber<? super HttpData> subscriber, EventExecutor executor, SubscriptionOption ... options) {
        Objects.requireNonNull(subscriber, "subscriber");
        Objects.requireNonNull(executor, "executor");
        Objects.requireNonNull(options, "options");
        MultipartEncoder encoder = new MultipartEncoder(this.parts, this.boundary);
        encoder.subscribe(subscriber, executor, options);
    }

    @Override
    public CompletableFuture<AggregatedMultipart> aggregate() {
        return this.aggregate(this.defaultSubscriberExecutor());
    }

    @Override
    public CompletableFuture<AggregatedMultipart> aggregate(EventExecutor executor) {
        Objects.requireNonNull(executor, "executor");
        return this.aggregate0(executor, null);
    }

    @Override
    public CompletableFuture<AggregatedMultipart> aggregateWithPooledObjects(ByteBufAllocator alloc) {
        Objects.requireNonNull(alloc, "alloc");
        return this.aggregateWithPooledObjects(this.defaultSubscriberExecutor(), alloc);
    }

    @Override
    public CompletableFuture<AggregatedMultipart> aggregateWithPooledObjects(EventExecutor executor, ByteBufAllocator alloc) {
        Objects.requireNonNull(executor, "executor");
        Objects.requireNonNull(alloc, "alloc");
        return this.aggregate0(executor, alloc);
    }

    private CompletableFuture<AggregatedMultipart> aggregate0(@Nullable EventExecutor executor, @Nullable ByteBufAllocator alloc) {
        BodyPartAggregator aggregator = new BodyPartAggregator(alloc);
        if (executor == null) {
            this.parts.subscribe(aggregator);
        } else {
            this.parts.subscribe((Subscriber<BodyPart>)aggregator, executor);
        }
        return UnmodifiableFuture.wrap(aggregator.completionFuture.thenApply(parts -> AggregatedMultipart.of(this.boundary, parts)));
    }

    @Override
    public HttpRequest toHttpRequest(RequestHeaders requestHeaders) {
        RequestHeaders requestHeadersWithBoundary = DefaultMultipart.injectBoundary(this.boundary, requestHeaders);
        return HttpRequest.of(requestHeadersWithBoundary, this);
    }

    @Override
    public HttpRequest toHttpRequest(String path) {
        Objects.requireNonNull(path, "path");
        MediaType contentType = DEFAULT_MULTIPART_TYPE.withParameter(BOUNDARY_PARAMETER, this.boundary);
        RequestHeaders requestHeaders = RequestHeaders.builder(HttpMethod.POST, path).contentType(contentType).build();
        return HttpRequest.of(requestHeaders, this);
    }

    @Override
    public HttpResponse toHttpResponse(ResponseHeaders responseHeaders) {
        ResponseHeaders responseHeadersWithBoundary = DefaultMultipart.injectBoundary(this.boundary, responseHeaders);
        return HttpResponse.of(responseHeadersWithBoundary, this);
    }

    @Override
    public HttpResponse toHttpResponse(HttpStatus status) {
        Objects.requireNonNull(status, "status");
        MediaType contentType = DEFAULT_MULTIPART_TYPE.withParameter(BOUNDARY_PARAMETER, this.boundary);
        ResponseHeaders responseHeaders = ResponseHeaders.builder(status).contentType(contentType).build();
        return HttpResponse.of(responseHeaders, this);
    }

    @Override
    public ByteStreamMessage toStreamMessage() {
        return ByteStreamMessage.of(this);
    }

    @Override
    public boolean isOpen() {
        return this.parts.isOpen();
    }

    @Override
    public boolean isEmpty() {
        return false;
    }

    @Override
    public long demand() {
        return this.parts.demand();
    }

    @Override
    public CompletableFuture<Void> whenComplete() {
        return this.parts.whenComplete();
    }

    @Override
    public void abort() {
        this.parts.abort();
    }

    @Override
    public void abort(Throwable cause) {
        this.parts.abort(cause);
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add(BOUNDARY_PARAMETER, this.boundary).add("parts", this.parts).toString();
    }

    private static <T extends HttpHeaders> T injectBoundary(String boundary, T headers) {
        Objects.requireNonNull(headers, "headers");
        @Nullable MediaType contentType = headers.contentType();
        if (contentType != null) {
            Preconditions.checkArgument(contentType.isMultipart(), "Content-Type: %s (expected: multipart content type)", (Object)contentType);
            contentType = contentType.withParameter(BOUNDARY_PARAMETER, boundary);
        } else {
            contentType = DEFAULT_MULTIPART_TYPE.withParameter(BOUNDARY_PARAMETER, boundary);
        }
        MediaType contentTypeWithBoundary = contentType;
        return (T)headers.withMutations(builder -> builder.contentType(contentTypeWithBoundary));
    }

    private static final class BodyPartAggregator
    implements Subscriber<BodyPart> {
        private final CompletableFuture<List<AggregatedBodyPart>> completionFuture = new CompletableFuture();
        private final List<CompletableFuture<AggregatedBodyPart>> bodyPartFutures = new ArrayList<CompletableFuture<AggregatedBodyPart>>();
        @Nullable
        private final ByteBufAllocator alloc;

        BodyPartAggregator(@Nullable ByteBufAllocator alloc) {
            this.alloc = alloc;
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            Objects.requireNonNull(subscription, "subscription");
            subscription.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(BodyPart bodyPart) {
            Objects.requireNonNull(bodyPart, "bodyPart");
            if (this.alloc != null) {
                this.bodyPartFutures.add(bodyPart.aggregateWithPooledObjects(this.alloc));
            } else {
                this.bodyPartFutures.add(bodyPart.aggregate());
            }
        }

        @Override
        public void onError(Throwable ex) {
            Objects.requireNonNull(ex, "ex");
            this.completionFuture.completeExceptionally(ex);
        }

        @Override
        public void onComplete() {
            CompletableFutures.allAsList(this.bodyPartFutures).handle((parts, cause) -> {
                if (cause != null) {
                    this.completionFuture.completeExceptionally((Throwable)cause);
                } else {
                    this.completionFuture.complete((List<AggregatedBodyPart>)parts);
                }
                return null;
            });
        }
    }
}

