/*
 * Decompiled with CFR 0.152.
 */
package com.salesforce.reactivegrpc.jmh;

import com.salesforce.reactivegrpc.jmh.BenchmarkReactorServerServiceImpl;
import com.salesforce.reactivegrpc.jmh.PerfSubscriber;
import com.salesforce.reactivegrpc.jmh.proto.Messages;
import com.salesforce.reactivegrpc.jmh.proto.ReactorBenchmarkServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@BenchmarkMode(value={Mode.Throughput})
@Warmup(iterations=10)
@Measurement(iterations=10, time=10, timeUnit=TimeUnit.SECONDS)
@OutputTimeUnit(value=TimeUnit.SECONDS)
@Fork(value=1)
@State(value=Scope.Benchmark)
public class ReactorGRpcBenchmark {
    private static final Mono<Messages.SimpleRequest> MONO_REQUEST = Mono.just(Messages.SimpleRequest.getDefaultInstance());
    private static final Flux<Messages.SimpleRequest> FLUX_REQUEST;
    private Server reactiveServer;
    private ManagedChannel reactiveChannel;
    private ReactorBenchmarkServiceGrpc.ReactorBenchmarkServiceStub reactiveClient;

    @Setup
    public void setup() throws IOException {
        System.out.println("---------- SETUP ONCE -------------");
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
        this.reactiveServer = ((InProcessServerBuilder)InProcessServerBuilder.forName("benchmark-reactiveServer").scheduledExecutorService(scheduledExecutorService).addService(new BenchmarkReactorServerServiceImpl(100000))).build().start();
        this.reactiveChannel = InProcessChannelBuilder.forName("benchmark-reactiveServer").build();
        this.reactiveClient = ReactorBenchmarkServiceGrpc.newReactorStub(this.reactiveChannel);
    }

    @TearDown
    public void tearDown() throws InterruptedException {
        System.out.println("---------- TEAR DOWN ONCE -------------");
        this.reactiveServer.shutdownNow();
        this.reactiveChannel.shutdownNow();
        this.reactiveServer.awaitTermination(1000L, TimeUnit.MILLISECONDS);
        this.reactiveChannel.awaitTermination(1000L, TimeUnit.MILLISECONDS);
    }

    @Benchmark
    public Object reactiveUnaryCall(Blackhole blackhole) throws InterruptedException {
        PerfSubscriber actual = new PerfSubscriber(blackhole);
        this.reactiveClient.unaryCall(MONO_REQUEST).subscribe(actual);
        actual.latch.await();
        return actual;
    }

    @Benchmark
    public Object reactiveServerStreamingCall(Blackhole blackhole) throws InterruptedException {
        PerfSubscriber actual = new PerfSubscriber(blackhole);
        this.reactiveClient.streamingFromServer(MONO_REQUEST).subscribe(actual);
        actual.latch.await();
        return actual;
    }

    @Benchmark
    public Object reactiveClientStreamingCall(Blackhole blackhole) throws InterruptedException {
        PerfSubscriber actual = new PerfSubscriber(blackhole);
        this.reactiveClient.streamingFromClient(FLUX_REQUEST).subscribe(actual);
        actual.latch.await();
        return actual;
    }

    @Benchmark
    public Object reactiveBothWaysStreamingCall(Blackhole blackhole) throws InterruptedException {
        PerfSubscriber actual = new PerfSubscriber(blackhole);
        this.reactiveClient.streamingBothWays(FLUX_REQUEST).subscribe(actual);
        actual.latch.await();
        return actual;
    }

    static {
        Object[] array = new Messages.SimpleRequest[100000];
        Arrays.fill(array, Messages.SimpleRequest.getDefaultInstance());
        FLUX_REQUEST = Flux.fromArray(array);
    }
}

