/*
 * Decompiled with CFR 0.152.
 */
package org.apache.carbondata.examples;

import java.io.File;
import java.io.PrintWriter;
import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.examples.DStreamData;
import org.apache.carbondata.examples.SparkStreamingExample$;
import org.apache.carbondata.examples.util.ExampleUtils$;
import org.apache.carbondata.streaming.CarbonSparkStreamingListener;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.SparkListenerInterface;
import org.apache.spark.sql.CarbonEnv$;
import org.apache.spark.sql.CarbonSparkStreamingFactory$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.ReceiverInputDStream;
import scala.Function1;
import scala.Function2;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction1;

public final class SparkStreamingExample$ {
    public static SparkStreamingExample$ MODULE$;

    static {
        new SparkStreamingExample$();
    }

    public void main(String[] args) {
        String rootPath = new File(new StringBuilder(11).append(this.getClass().getResource("/").getPath()).append("../../../..").toString()).getCanonicalPath();
        String checkpointPath = new StringBuilder(42).append(rootPath).append("/examples/spark/target/spark_streaming_cp_").append(((Object)BoxesRunTime.boxToLong((long)System.currentTimeMillis())).toString()).toString();
        String streamTableName = "dstream_stream_table";
        SparkSession spark = ExampleUtils$.MODULE$.createSparkSession("SparkStreamingExample", 4);
        boolean requireCreateTable = true;
        if (requireCreateTable) {
            spark.sql(new StringBuilder(21).append("DROP TABLE IF EXISTS ").append(streamTableName).toString());
            spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(287).append("\n           | CREATE TABLE ").append(streamTableName).append("(\n           | id INT,\n           | name STRING,\n           | city STRING,\n           | salary FLOAT\n           | )\n           | STORED AS carbondata\n           | TBLPROPERTIES(\n           | 'streaming'='true',\n           | 'sort_columns'='name')\n           | ").toString())).stripMargin());
            CarbonTable carbonTable = CarbonEnv$.MODULE$.getCarbonTable((Option)new Some((Object)"default"), streamTableName, spark);
            String path = new StringBuilder(51).append(rootPath).append("/examples/spark/src/main/resources/streamSample.csv").toString();
            spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(112).append("\n           | LOAD DATA LOCAL INPATH '").append(path).append("'\n           | INTO TABLE ").append(streamTableName).append("\n           | OPTIONS('HEADER'='true')\n         ").toString())).stripMargin());
            ServerSocket serverSocket = new ServerSocket(7071);
            Thread thread1 = this.writeSocket(serverSocket);
            Thread thread2 = this.showTableCount(spark, streamTableName);
            StreamingContext ssc = this.startStreaming(spark, streamTableName, checkpointPath);
            ssc.sparkContext().addSparkListener((SparkListenerInterface)new CarbonSparkStreamingListener());
            this.waitForStopSignal(ssc);
            ssc.start();
            ssc.awaitTermination();
            thread1.interrupt();
            thread2.interrupt();
            serverSocket.close();
        }
        spark.sql(new StringBuilder(21).append("select count(*) from ").append(streamTableName).toString()).show(100, false);
        spark.sql(new StringBuilder(31).append("select * from ").append(streamTableName).append(" order by id desc").toString()).show(100, false);
        spark.sql(new StringBuilder(49).append("select * ").append(new StringBuilder(6).append("from ").append(streamTableName).append(" ").toString()).append("where id = 100000001 or id = 1 limit 100").toString()).show(100, false);
        spark.sql(new StringBuilder(32).append("select * ").append(new StringBuilder(6).append("from ").append(streamTableName).append(" ").toString()).append("where id < 10 limit 100").toString()).show(100, false);
        spark.sql(new StringBuilder(24).append("SHOW SEGMENTS FOR TABLE ").append(streamTableName).toString()).show(false);
        spark.stop();
        System.out.println("streaming finished");
    }

    /*
     * WARNING - void declaration
     */
    public Thread showTableCount(SparkSession spark, String tableName) {
        void var3_3;
        Thread thread = new Thread(spark, tableName){
            private final SparkSession spark$1;
            private final String tableName$1;

            public void run() {
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1000).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)_ -> {
                    Predef$.MODULE$.println((Object)BoxesRunTime.boxToLong((long)System.currentTimeMillis()));
                    $this.spark$1.sql(new StringBuilder(21).append("select count(*) from ").append($this.tableName$1).toString()).show(false);
                    $this.spark$1.sql(new StringBuilder(24).append("SHOW SEGMENTS FOR TABLE ").append($this.tableName$1).toString()).show(false);
                    Thread.sleep(5000L);
                });
            }
            {
                this.spark$1 = spark$1;
                this.tableName$1 = tableName$1;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$1(org.apache.carbondata.examples.SparkStreamingExample$$anon$1 int )}, serializedLambda);
            }
        };
        thread.start();
        return var3_3;
    }

    /*
     * WARNING - void declaration
     */
    public Thread waitForStopSignal(StreamingContext ssc) {
        void var2_2;
        Thread thread = new Thread(ssc){
            private final StreamingContext ssc$1;

            public void run() {
                new ServerSocket(7072).accept();
                this.ssc$1.stop(false, true);
            }
            {
                this.ssc$1 = ssc$1;
            }
        };
        thread.start();
        return var2_2;
    }

    public StreamingContext startStreaming(SparkSession spark, String tableName, String checkpointPath) {
        StreamingContext ssc = null;
        try {
            ssc = new StreamingContext(spark.sparkContext(), Seconds$.MODULE$.apply(30L));
            ssc.checkpoint(checkpointPath);
            StreamingContext qual$1 = ssc;
            String x$12 = "localhost";
            int x$2 = 7071;
            StorageLevel x$3 = qual$1.socketTextStream$default$3();
            ReceiverInputDStream readSocketDF = qual$1.socketTextStream(x$12, x$2, x$3);
            DStream batchData = readSocketDF.map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.split(","), ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(String.class))).map((Function1 & Serializable & scala.Serializable)fields -> new DStreamData(new StringOps(Predef$.MODULE$.augmentString(fields[0])).toInt(), fields[1], fields[2], new StringOps(Predef$.MODULE$.augmentString(fields[3])).toFloat()), ClassTag$.MODULE$.apply(DStreamData.class));
            Predef$.MODULE$.println((Object)"init carbon table info");
            batchData.foreachRDD((Function2 & Serializable & scala.Serializable)(rdd, time) -> {
                SparkStreamingExample$.$anonfun$startStreaming$3(spark, tableName, rdd, time);
                return BoxedUnit.UNIT;
            });
        }
        catch (Exception ex) {
            ex.printStackTrace();
            Predef$.MODULE$.println((Object)"Done reading and writing streaming data");
        }
        return ssc;
    }

    /*
     * WARNING - void declaration
     */
    public Thread writeSocket(ServerSocket serverSocket) {
        void var2_2;
        Thread thread = new Thread(serverSocket){
            private final ServerSocket serverSocket$1;

            public void run() {
                Socket clientSocket = this.serverSocket$1.accept();
                PrintWriter socketWriter = new PrintWriter(clientSocket.getOutputStream());
                IntRef index = IntRef.create((int)0);
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 1000).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)_2 -> {
                    RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 100).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)_ -> {
                        ++index$1.elem;
                        socketWriter.println(new StringBuilder(30).append(((Object)BoxesRunTime.boxToInteger((int)index$1.elem)).toString()).append(",name_").append(index$1.elem).append(",city_").append(index$1.elem).append(",").append(((Object)BoxesRunTime.boxToDouble((double)((double)index$1.elem * 10000.0))).toString()).append(",school_").append(index$1.elem).append(":school_").append(index$1.elem).append(index$1.elem).append("$").append(index$1.elem).toString());
                    });
                    socketWriter.flush();
                    Thread.sleep(2000L);
                });
                socketWriter.close();
                System.out.println("Socket closed");
            }
            {
                this.serverSocket$1 = serverSocket$1;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$2(scala.runtime.IntRef java.io.PrintWriter int ), $anonfun$run$3(scala.runtime.IntRef java.io.PrintWriter int )}, serializedLambda);
            }
        };
        thread.start();
        return var2_2;
    }

    public static final /* synthetic */ void $anonfun$startStreaming$3(SparkSession spark$2, String tableName$2, RDD rdd, Time time) {
        JavaUniverse $u = package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(MODULE$.getClass().getClassLoader());
        public final class Org_apache_carbondata_examples_SparkStreamingExample$$typecreator1$1
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $m.staticClass("org.apache.carbondata.examples.DStreamData").asType().toTypeConstructor();
            }

            public Org_apache_carbondata_examples_SparkStreamingExample$$typecreator1$1() {
            }
        }
        Dataset df = spark$2.createDataFrame(rdd, ((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Org_apache_carbondata_examples_SparkStreamingExample$$typecreator1$1())).toDF();
        Predef$.MODULE$.println((Object)new StringBuilder(45).append(((Object)BoxesRunTime.boxToLong((long)System.currentTimeMillis())).toString()).append(" at batch time: ").append(time.toString()).append(" the count of received data: ").append(df.count()).toString());
        CarbonSparkStreamingFactory$.MODULE$.getStreamSparkStreamingWriter(spark$2, "default", tableName$2).mode(SaveMode.Append).writeStreamData(df, time);
    }

    private SparkStreamingExample$() {
        MODULE$ = this;
    }
}

