/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.grpc;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.Functions;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.datamodel.Tuple3;
import com.hazelcast.jet.examples.grpc.BrokerInfoRequest;
import com.hazelcast.jet.examples.grpc.BrokerServiceGrpc;
import com.hazelcast.jet.examples.grpc.BrokerServiceImpl;
import com.hazelcast.jet.examples.grpc.EventGenerator;
import com.hazelcast.jet.examples.grpc.ProductInfoRequest;
import com.hazelcast.jet.examples.grpc.ProductServiceGrpc;
import com.hazelcast.jet.examples.grpc.ProductServiceImpl;
import com.hazelcast.jet.examples.grpc.datamodel.Broker;
import com.hazelcast.jet.examples.grpc.datamodel.Product;
import com.hazelcast.jet.examples.grpc.datamodel.Trade;
import com.hazelcast.jet.grpc.GrpcServices;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.ServiceFactory;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.map.IMap;
import io.grpc.BindableService;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public final class GRPCEnrichment {
    private static final String TRADES = "trades";
    private static final int PORT = 50051;
    private final JetInstance jet;

    private GRPCEnrichment(JetInstance jet) {
        this.jet = jet;
    }

    public static Pipeline enrichUsingGRPC() {
        Pipeline p = Pipeline.create();
        StreamStage trades = p.readFrom(Sources.mapJournal((String)TRADES, (JournalInitialPosition)JournalInitialPosition.START_FROM_CURRENT)).withoutTimestamps().map(Functions.entryValue());
        ServiceFactory productService = GrpcServices.unaryService((SupplierEx & Serializable)() -> ManagedChannelBuilder.forAddress((String)"localhost", (int)50051).useTransportSecurity().usePlaintext(), (FunctionEx & Serializable)channel -> ProductServiceGrpc.newStub((Channel)channel)::productInfo);
        ServiceFactory brokerService = GrpcServices.bidirectionalStreamingService((SupplierEx & Serializable)() -> ManagedChannelBuilder.forAddress((String)"localhost", (int)50051).usePlaintext(), (FunctionEx & Serializable)channel -> BrokerServiceGrpc.newStub((Channel)channel)::brokerInfo);
        trades.mapUsingServiceAsync(productService, (BiFunctionEx & Serializable)(service, trade) -> {
            ProductInfoRequest request = ProductInfoRequest.newBuilder().setId(trade.productId()).build();
            return service.call((Object)request).thenApply(productReply -> Tuple2.tuple2((Object)trade, (Object)productReply.getProductName()));
        }).mapUsingServiceAsync(brokerService, (BiFunctionEx & Serializable)(service, t) -> {
            BrokerInfoRequest request = BrokerInfoRequest.newBuilder().setId(((Trade)t.f0()).brokerId()).build();
            return service.call((Object)request).thenApply(brokerReply -> Tuple3.tuple3((Object)t.f0(), (Object)t.f1(), (Object)brokerReply.getBrokerName()));
        }).writeTo(Sinks.logger());
        return p;
    }

    public static void main(String[] args) throws Exception {
        JetInstance jet = Jet.bootstrappedInstance();
        new GRPCEnrichment(jet).go();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void go() throws Exception {
        EventGenerator eventGenerator = new EventGenerator((IMap<Object, Trade>)this.jet.getMap(TRADES));
        eventGenerator.start();
        try (Closeable server = GRPCEnrichment.startGRPCServer();){
            Pipeline p = GRPCEnrichment.enrichUsingGRPC();
            Job job = this.jet.newJob(p);
            eventGenerator.generateEventsForFiveSeconds();
            job.cancel();
            try {
                job.join();
            }
            catch (CancellationException cancellationException) {
                // empty catch block
            }
        }
        finally {
            eventGenerator.shutdown();
            Jet.shutdownAll();
        }
    }

    private static Closeable startGRPCServer() throws IOException {
        Map<Integer, Product> productMap = GRPCEnrichment.readLines("products.txt").collect(Collectors.toMap(Map.Entry::getKey, e -> new Product((Integer)e.getKey(), (String)e.getValue())));
        Map<Integer, Broker> brokerMap = GRPCEnrichment.readLines("brokers.txt").collect(Collectors.toMap(Map.Entry::getKey, e -> new Broker((Integer)e.getKey(), (String)e.getValue())));
        ExecutorService executor = Executors.newFixedThreadPool(4);
        Server server = ServerBuilder.forPort((int)50051).executor((Executor)executor).addService((BindableService)new ProductServiceImpl(productMap)).addService((BindableService)new BrokerServiceImpl(brokerMap)).build().start();
        System.out.println("*** Server started, listening on 50051");
        return () -> {
            server.shutdown();
            executor.shutdown();
        };
    }

    private static Stream<Map.Entry<Integer, String>> readLines(String file) {
        try {
            InputStream stream = GRPCEnrichment.class.getResourceAsStream("/" + file);
            BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
            return reader.lines().map(GRPCEnrichment::splitLine);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static Map.Entry<Integer, String> splitLine(String e) {
        int commaPos = e.indexOf(44);
        return Util.entry((Object)Integer.valueOf(e.substring(0, commaPos)), (Object)e.substring(commaPos + 1));
    }
}

