/*
 * Decompiled with CFR 0.152.
 */
package org.mule;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import org.mule.AbstractBenchmark;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.core.DefaultEventContext;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.EventContext;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory;
import org.mule.tck.TriggerableMessageSource;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.infra.Blackhole;
import reactor.core.publisher.Mono;

@State(value=Scope.Benchmark)
public abstract class AbstractFlowBenchmark
extends AbstractBenchmark {
    static final Processor nullProcessor = event -> event;
    static final Processor cpuLightProcessor = event -> {
        Blackhole.consumeCPU((long)25000L);
        return event;
    };
    static final Processor cpuIntensiveProcessor = new Processor(){

        public Event process(Event event) throws MuleException {
            Blackhole.consumeCPU((long)2500000L);
            return event;
        }

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return ReactiveProcessor.ProcessingType.CPU_INTENSIVE;
        }
    };
    static final Processor blockingProcessor = new Processor(){

        public Event process(Event event) throws MuleException {
            try {
                Thread.sleep(20L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return event;
        }

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return ReactiveProcessor.ProcessingType.BLOCKING;
        }
    };
    protected MuleContext muleContext;
    protected Flow flow;
    protected TriggerableMessageSource source;
    @Param(value={"org.mule.runtime.core.api.processor.strategy.DirectProcessingStrategyFactory", "org.mule.runtime.core.internal.processor.strategy.DirectStreamPerThreadProcessingStrategyFactory", "org.mule.runtime.core.internal.processor.strategy.ReactorProcessingStrategyFactory", "org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory", "org.mule.runtime.core.processor.strategy.DefaultFlowProcessingStrategyFactory", "org.mule.runtime.core.processor.strategy.TransactionAwareProactorStreamProcessingStrategyFactory", "org.mule.runtime.core.internal.processor.strategy.WorkQueueProcessingStrategyFactory"})
    public String processingStrategyFactory;
    @Param(value={"1"})
    public int subscribers;
    @Param(value={"256"})
    public int bufferSize;
    @Param(value={"10000"})
    public int maxConcurrency;

    @Setup
    public void setup() throws Exception {
        this.muleContext = this.createMuleContextWithServices();
        this.muleContext.start();
        ProcessingStrategyFactory factory = (ProcessingStrategyFactory)Class.forName(this.processingStrategyFactory).newInstance();
        if (factory instanceof AbstractProcessingStrategyFactory) {
            ((AbstractProcessingStrategyFactory)factory).setMaxConcurrency(this.maxConcurrency);
        }
        if (factory instanceof ReactorStreamProcessingStrategyFactory) {
            ((ReactorStreamProcessingStrategyFactory)factory).setBufferSize(this.bufferSize);
            ((ReactorStreamProcessingStrategyFactory)factory).setSubscriberCount(this.subscribers);
        }
        this.source = new TriggerableMessageSource();
        this.flow = Flow.builder((String)"flow", (MuleContext)this.muleContext).processors(this.getMessageProcessors()).source((MessageSource)this.source).processingStrategyFactory(factory).build();
        this.muleContext.getRegistry().registerFlowConstruct((FlowConstruct)this.flow);
    }

    protected abstract List<Processor> getMessageProcessors();

    protected abstract int getStreamIterations();

    @TearDown
    public void teardown() throws MuleException {
        SchedulerService schedulerService = (SchedulerService)this.muleContext.getRegistry().lookupObject(SchedulerService.class);
        this.muleContext.dispose();
        LifecycleUtils.stopIfNeeded((Object)schedulerService);
    }

    @Benchmark
    public Event processSourceBlocking() throws MuleException {
        return this.source.trigger(Event.builder((EventContext)DefaultEventContext.create((FlowConstruct)this.flow, (ComponentLocation)CONNECTOR_LOCATION)).message(Message.of((Object)PAYLOAD)).build());
    }

    @Benchmark
    public CountDownLatch processSourceStream() throws MuleException, InterruptedException {
        CountDownLatch latch = new CountDownLatch(this.getStreamIterations());
        for (int i = 0; i < this.getStreamIterations(); ++i) {
            Mono.just((Object)Event.builder((EventContext)DefaultEventContext.create((FlowConstruct)this.flow, (ComponentLocation)CONNECTOR_LOCATION)).message(Message.of((Object)PAYLOAD)).build()).transform((Function)this.source.getListener()).doOnNext(event -> latch.countDown()).subscribe();
        }
        latch.await();
        return latch;
    }
}

