/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.vertx;

import io.confluent.csid.utils.Java8StreamUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.vertx.JStreamVertxParallelStreamProcessor;
import io.confluent.parallelconsumer.vertx.VertxParallelEoSStreamProcessor;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JStreamVertxParallelEoSStreamProcessor<K, V>
extends VertxParallelEoSStreamProcessor<K, V>
implements JStreamVertxParallelStreamProcessor<K, V> {
    private static final Logger log = LoggerFactory.getLogger(JStreamVertxParallelEoSStreamProcessor.class);
    private final Stream<VertxCPResult<K, V>> stream;
    private final ConcurrentLinkedDeque<VertxCPResult<K, V>> userProcessResultsStream = new ConcurrentLinkedDeque();

    public JStreamVertxParallelEoSStreamProcessor(Consumer<K, V> consumer, Producer<K, V> producer, Vertx vertx, WebClient webClient, ParallelConsumerOptions options) {
        super(consumer, producer, vertx, webClient, options);
        this.stream = Java8StreamUtils.setupStreamFromDeque(this.userProcessResultsStream);
    }

    public JStreamVertxParallelEoSStreamProcessor(Consumer<K, V> consumer, Producer<K, V> producer, ParallelConsumerOptions options) {
        this(consumer, producer, null, null, options);
    }

    @Override
    public Stream<VertxCPResult<K, V>> vertxHttpReqInfoStream(Function<ConsumerRecord<K, V>, VertxParallelEoSStreamProcessor.RequestInfo> requestInfoFunction) {
        VertxCPResult.VertxCPResultBuilder result = VertxCPResult.builder();
        Function requestInfoFunctionWrapped = x -> {
            result.in(x);
            VertxParallelEoSStreamProcessor.RequestInfo apply = (VertxParallelEoSStreamProcessor.RequestInfo)requestInfoFunction.apply((ConsumerRecord)x);
            result.requestInfo(Optional.of(apply));
            return apply;
        };
        java.util.function.Consumer<Future<HttpResponse<Buffer>>> onSendCallBack = future -> {
            result.asr((Future<HttpResponse<Buffer>>)future);
            VertxCPResult build = result.build();
            this.userProcessResultsStream.add(build);
        };
        super.vertxHttpReqInfo(requestInfoFunctionWrapped, onSendCallBack, ignore -> {});
        return this.stream;
    }

    @Override
    public Stream<VertxCPResult<K, V>> vertxHttpRequestStream(BiFunction<WebClient, ConsumerRecord<K, V>, HttpRequest<Buffer>> webClientRequestFunction) {
        VertxCPResult.VertxCPResultBuilder result = VertxCPResult.builder();
        BiFunction requestInfoFunctionWrapped = (wc, x) -> {
            result.in(x);
            HttpRequest apply = (HttpRequest)webClientRequestFunction.apply((WebClient)wc, (ConsumerRecord)x);
            result.httpReq(Optional.of(apply));
            return apply;
        };
        java.util.function.Consumer<Future<HttpResponse<Buffer>>> onSendCallBack = future -> {
            result.asr((Future<HttpResponse<Buffer>>)future);
            VertxCPResult build = result.build();
            this.userProcessResultsStream.add(build);
        };
        super.vertxHttpRequest(requestInfoFunctionWrapped, onSendCallBack, ignore -> {});
        return this.stream;
    }

    @Override
    public Stream<VertxCPResult<K, V>> vertxHttpWebClientStream(BiFunction<WebClient, ConsumerRecord<K, V>, Future<HttpResponse<Buffer>>> webClientRequestFunction) {
        VertxCPResult.VertxCPResultBuilder result = VertxCPResult.builder();
        BiFunction wrappedFunc = (x, y) -> {
            result.in(y);
            Future apply = (Future)webClientRequestFunction.apply((WebClient)x, (ConsumerRecord)y);
            result.asr((Future<HttpResponse<Buffer>>)apply);
            return apply;
        };
        java.util.function.Consumer<Future<HttpResponse<Buffer>>> onSendCallBack = future -> {
            result.asr((Future<HttpResponse<Buffer>>)future);
            VertxCPResult build = result.build();
            this.userProcessResultsStream.add(build);
        };
        super.vertxHttpWebClient(wrappedFunc, onSendCallBack);
        return this.stream;
    }

    public static class VertxCPResult<K, V> {
        private final ConsumerRecord<K, V> in;
        private final Future<HttpResponse<Buffer>> asr;
        private final Optional<VertxParallelEoSStreamProcessor.RequestInfo> requestInfo;
        private final Optional<HttpRequest<Buffer>> httpReq;

        private static <K, V> Optional<VertxParallelEoSStreamProcessor.RequestInfo> $default$requestInfo() {
            return Optional.empty();
        }

        private static <K, V> Optional<HttpRequest<Buffer>> $default$httpReq() {
            return Optional.empty();
        }

        VertxCPResult(ConsumerRecord<K, V> in, Future<HttpResponse<Buffer>> asr, Optional<VertxParallelEoSStreamProcessor.RequestInfo> requestInfo, Optional<HttpRequest<Buffer>> httpReq) {
            this.in = in;
            this.asr = asr;
            this.requestInfo = requestInfo;
            this.httpReq = httpReq;
        }

        public static <K, V> VertxCPResultBuilder<K, V> builder() {
            return new VertxCPResultBuilder();
        }

        public ConsumerRecord<K, V> getIn() {
            return this.in;
        }

        public Future<HttpResponse<Buffer>> getAsr() {
            return this.asr;
        }

        public Optional<VertxParallelEoSStreamProcessor.RequestInfo> getRequestInfo() {
            return this.requestInfo;
        }

        public Optional<HttpRequest<Buffer>> getHttpReq() {
            return this.httpReq;
        }

        public static class VertxCPResultBuilder<K, V> {
            private ConsumerRecord<K, V> in;
            private Future<HttpResponse<Buffer>> asr;
            private boolean requestInfo$set;
            private Optional<VertxParallelEoSStreamProcessor.RequestInfo> requestInfo$value;
            private boolean httpReq$set;
            private Optional<HttpRequest<Buffer>> httpReq$value;

            VertxCPResultBuilder() {
            }

            public VertxCPResultBuilder<K, V> in(ConsumerRecord<K, V> in) {
                this.in = in;
                return this;
            }

            public VertxCPResultBuilder<K, V> asr(Future<HttpResponse<Buffer>> asr) {
                this.asr = asr;
                return this;
            }

            public VertxCPResultBuilder<K, V> requestInfo(Optional<VertxParallelEoSStreamProcessor.RequestInfo> requestInfo) {
                this.requestInfo$value = requestInfo;
                this.requestInfo$set = true;
                return this;
            }

            public VertxCPResultBuilder<K, V> httpReq(Optional<HttpRequest<Buffer>> httpReq) {
                this.httpReq$value = httpReq;
                this.httpReq$set = true;
                return this;
            }

            public VertxCPResult<K, V> build() {
                Optional requestInfo$value = this.requestInfo$value;
                if (!this.requestInfo$set) {
                    requestInfo$value = VertxCPResult.$default$requestInfo();
                }
                Optional httpReq$value = this.httpReq$value;
                if (!this.httpReq$set) {
                    httpReq$value = VertxCPResult.$default$httpReq();
                }
                return new VertxCPResult<K, V>(this.in, this.asr, requestInfo$value, httpReq$value);
            }

            public String toString() {
                return "JStreamVertxParallelEoSStreamProcessor.VertxCPResult.VertxCPResultBuilder(in=" + this.in + ", asr=" + this.asr + ", requestInfo$value=" + this.requestInfo$value + ", httpReq$value=" + this.httpReq$value + ")";
            }
        }
    }
}

