/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.reactor;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContext;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import io.confluent.parallelconsumer.internal.ExternalEngine;
import io.confluent.parallelconsumer.internal.UserFunctions;
import io.confluent.parallelconsumer.state.WorkContainer;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class ReactorProcessor<K, V>
extends ExternalEngine<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ReactorProcessor.class);
    private static final String REACTOR_TYPE = "reactor.x-type";
    private final Supplier<Scheduler> schedulerSupplier;
    private final Supplier<Scheduler> defaultSchedulerSupplier = Schedulers::boundedElastic;

    public ReactorProcessor(ParallelConsumerOptions<K, V> options, Supplier<Scheduler> newSchedulerSupplier) {
        super(options);
        this.schedulerSupplier = newSchedulerSupplier == null ? this.defaultSchedulerSupplier : newSchedulerSupplier;
    }

    public ReactorProcessor(ParallelConsumerOptions<K, V> options) {
        this(options, null);
    }

    protected boolean isAsyncFutureWork(List<?> resultsFromUserFunction) {
        Iterator<?> iterator = resultsFromUserFunction.iterator();
        if (iterator.hasNext()) {
            Object object = iterator.next();
            return object instanceof Disposable;
        }
        return false;
    }

    public void close(Duration timeout, DrainingCloseable.DrainingMode drainMode) {
        super.close(timeout, drainMode);
    }

    public void react(Function<PollContext<K, V>, Publisher<?>> reactorFunction) {
        Function<PollContextInternal, List> wrappedUserFunc = pollContext -> {
            if (log.isTraceEnabled()) {
                log.trace("Record list ({}), executing void function...", pollContext.streamConsumerRecords().map(ConsumerRecord::offset).collect(Collectors.toList()));
            }
            pollContext.streamWorkContainers().forEach(x -> x.setWorkType(REACTOR_TYPE));
            Publisher publisher = (Publisher)UserFunctions.carefullyRun((Function)reactorFunction, (Object)pollContext.getPollContext());
            Disposable flux = Flux.from((Publisher)publisher).publishOn(this.getScheduler()).doOnNext(signal -> log.trace("doOnNext {}", signal)).doOnComplete(() -> {
                log.debug("Reactor success (doOnComplete)");
                pollContext.streamWorkContainers().forEach(wc -> {
                    wc.onUserFunctionSuccess();
                    this.addToMailbox((WorkContainer)wc);
                });
            }).doOnError(throwable -> {
                log.error("Reactor fail signal", throwable);
                pollContext.streamWorkContainers().forEach(wc -> {
                    wc.onUserFunctionFailure(throwable);
                    this.addToMailbox((WorkContainer)wc);
                });
            }).subscribeOn(this.getScheduler()).subscribe();
            log.trace("asyncPoll - user function finished ok.");
            return UniLists.of((Object)flux);
        };
        Consumer<Object> voidCallBack = ignore -> log.trace("Void callback applied.");
        this.supervisorLoop(wrappedUserFunc, voidCallBack);
    }

    private Scheduler getScheduler() {
        return this.schedulerSupplier.get();
    }
}

