/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.sourcebuilder;

import com.hazelcast.core.IMap;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.examples.sourcebuilder.support.MemoryUsageMetric;
import com.hazelcast.jet.examples.sourcebuilder.support.SystemMonitorGui;
import com.hazelcast.jet.examples.sourcebuilder.support.SystemMonitorHttpService;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.WindowDefinition;
import io.undertow.Undertow;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.util.stream.Stream;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;

public class HttpSource {
    private static final String MAP_NAME = "system-monitor";

    private static Pipeline buildPipeline() {
        StreamSource usedMemorySource = SourceBuilder.timestampedStream((String)"used-memory", (FunctionEx & Serializable)x -> new PollHttp()).fillBufferFn(PollHttp::fillBuffer).destroyFn(PollHttp::close).build();
        Pipeline p = Pipeline.create();
        p.drawFrom(usedMemorySource).withNativeTimestamps(0L).window((WindowDefinition)WindowDefinition.sliding((long)100L, (long)20L)).aggregate(AggregateOperations.linearTrend(MemoryUsageMetric::timestamp, MemoryUsageMetric::memoryUsage)).map((FunctionEx & Serializable)wr -> Util.entry((Object)wr.end(), (Object)wr.result())).drainTo(Sinks.map((String)MAP_NAME));
        return p;
    }

    public static void main(String[] args) {
        Undertow server = new SystemMonitorHttpService().httpServer();
        server.start();
        try {
            JetInstance jet = HttpSource.startJet();
            new SystemMonitorGui((IMap<Long, Double>)jet.getMap(MAP_NAME));
            HttpSource.runPipeline(jet);
        }
        finally {
            server.stop();
            Jet.shutdownAll();
        }
    }

    private static JetInstance startJet() {
        System.out.println("Creating Jet instance 1");
        Jet.newJetInstance();
        System.out.println("Creating Jet instance 2");
        return Jet.newJetInstance();
    }

    private static void runPipeline(JetInstance jet) {
        System.out.println("\nRunning the pipeline ");
        Pipeline p = HttpSource.buildPipeline();
        jet.newJob(p).join();
    }

    private static class PollHttp {
        private final CloseableHttpClient httpc = HttpClients.createDefault();
        private final long pollIntervalMillis = 20L;
        private long lastPolled;

        private PollHttp() {
        }

        void fillBuffer(SourceBuilder.TimestampedSourceBuffer<MemoryUsageMetric> buf) throws IOException {
            if (!this.readyToPoll()) {
                return;
            }
            try (Stream<String> lines = new BufferedReader(new InputStreamReader(this.httpc.execute((HttpUriRequest)new HttpGet("http://localhost:8008")).getEntity().getContent())).lines();){
                lines.forEach(line -> {
                    int splitPoint = line.indexOf(32);
                    long timestamp = Long.valueOf(line.substring(0, splitPoint));
                    long value = Long.valueOf(line.substring(splitPoint + 1));
                    buf.add((Object)new MemoryUsageMetric(timestamp, value), timestamp);
                });
            }
        }

        private boolean readyToPoll() {
            long now = System.currentTimeMillis();
            if (now - this.lastPolled < 20L) {
                return false;
            }
            this.lastPolled = now;
            return true;
        }

        void close() throws IOException {
            this.httpc.close();
        }
    }
}

