/*
 * Decompiled with CFR 0.152.
 */
package org.apache.carbondata.examples;

import java.io.File;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.examples.DStreamData;
import org.apache.carbondata.examples.StreamingUsingBatchLoadExample$;
import org.apache.carbondata.examples.StreamingUsingBatchLoadExample$$anon$3$;
import org.apache.carbondata.examples.StreamingUsingBatchLoadExample$$anonfun$startStreaming$1$;
import org.apache.carbondata.examples.util.ExampleUtils$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.CarbonEnv$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.ReceiverInputDStream;
import scala.Function1;
import scala.Function2;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

public final class StreamingUsingBatchLoadExample$ {
    public static final StreamingUsingBatchLoadExample$ MODULE$;

    static {
        new StreamingUsingBatchLoadExample$();
    }

    public void main(String[] args) {
        String rootPath = new File(new StringBuilder().append((Object)this.getClass().getResource("/").getPath()).append((Object)"../../../..").toString()).getCanonicalPath();
        String checkpointPath = new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/examples/spark/target/spark_streaming_cp_"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{rootPath}))).append((Object)((Object)BoxesRunTime.boxToLong((long)System.currentTimeMillis())).toString()).toString();
        String streamTableName = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"dstream_batch_table"})).s((Seq)Nil$.MODULE$);
        SparkSession spark = ExampleUtils$.MODULE$.createSparkSession("StreamingUsingBatchLoadExample", 4);
        boolean requireCreateTable = true;
        if (requireCreateTable) {
            spark.sql(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"DROP TABLE IF EXISTS ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName})));
            spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n           | CREATE TABLE ", "(\n           | id INT,\n           | name STRING,\n           | city STRING,\n           | salary FLOAT\n           | )\n           | STORED AS carbondata\n           | TBLPROPERTIES(\n           | 'sort_columns'='name',\n           | 'AUTO_LOAD_MERGE'='true',\n           | 'COMPACTION_LEVEL_THRESHOLD'='4,10')\n           | "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName})))).stripMargin());
            CarbonTable carbonTable = CarbonEnv$.MODULE$.getCarbonTable((Option)new Some((Object)"default"), streamTableName, spark);
            String path = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/examples/spark/src/main/resources/streamSample.csv"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{rootPath}));
            spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n           | LOAD DATA LOCAL INPATH '", "'\n           | INTO TABLE ", "\n           | OPTIONS('HEADER'='true')\n         "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{path, streamTableName})))).stripMargin());
            ServerSocket serverSocket = new ServerSocket(7071);
            Thread thread1 = this.writeSocket(serverSocket);
            Thread thread2 = this.showTableCount(spark, streamTableName);
            StreamingContext ssc = this.startStreaming(spark, streamTableName, checkpointPath);
            this.waitForStopSignal(ssc);
            ssc.start();
            ssc.awaitTermination();
            thread1.interrupt();
            thread2.interrupt();
            serverSocket.close();
        }
        spark.sql(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"select count(*) from ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName}))).show(100, false);
        spark.sql(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"select * from ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName}))).show(100, false);
        spark.sql(new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"select * "})).s((Seq)Nil$.MODULE$)).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"from ", " "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"where id = 100000001 or id = 1 limit 100"})).s((Seq)Nil$.MODULE$)).toString()).show(100, false);
        spark.sql(new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"select * "})).s((Seq)Nil$.MODULE$)).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"from ", " "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"where id < 10 limit 100"})).s((Seq)Nil$.MODULE$)).toString()).show(100, false);
        spark.sql(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"SHOW SEGMENTS FOR TABLE ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName}))).show(false);
        spark.sql(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"DROP TABLE IF EXISTS ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName})));
        spark.stop();
        System.out.println("streaming finished");
    }

    /*
     * WARNING - void declaration
     */
    public Thread showTableCount(SparkSession spark, String tableName) {
        void var3_3;
        Thread thread = new Thread(spark, tableName){
            public final SparkSession spark$1;
            public final String tableName$1;

            public void run() {
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1000).foreach$mVc$sp((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ anon.1 $outer;

                    public final void apply(int _) {
                        this.apply$mcVI$sp(_);
                    }

                    public void apply$mcVI$sp(int _) {
                        this.$outer.spark$1.sql(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"select count(*) from ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.tableName$1}))).show(false);
                        this.$outer.spark$1.sql(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"SHOW SEGMENTS FOR TABLE ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.tableName$1}))).show(false);
                        Thread.sleep(5000L);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                });
            }
            {
                this.spark$1 = spark$1;
                this.tableName$1 = tableName$1;
            }
        };
        thread.start();
        return var3_3;
    }

    /*
     * WARNING - void declaration
     */
    public Thread waitForStopSignal(StreamingContext ssc) {
        void var2_2;
        Thread thread = new Thread(ssc){
            private final StreamingContext ssc$1;

            public void run() {
                new ServerSocket(7072).accept();
                this.ssc$1.stop(false, true);
            }
            {
                this.ssc$1 = ssc$1;
            }
        };
        thread.start();
        return var2_2;
    }

    public StreamingContext startStreaming(SparkSession spark, String tableName, String checkpointPath) {
        StreamingContext ssc = null;
        try {
            ssc = new StreamingContext(spark.sparkContext(), Seconds$.MODULE$.apply(15L));
            ssc.checkpoint(checkpointPath);
            StreamingContext qual$1 = ssc;
            String x$2 = "localhost";
            int x$3 = 7071;
            StorageLevel x$4 = qual$1.socketTextStream$default$3();
            ReceiverInputDStream readSocketDF = qual$1.socketTextStream(x$2, x$3, x$4);
            DStream batchData = readSocketDF.map((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String[] apply(String x$1) {
                    return x$1.split(",");
                }
            }, ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(String.class))).map((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final DStreamData apply(String[] fields) {
                    return new DStreamData(new StringOps(Predef$.MODULE$.augmentString(fields[0])).toInt(), fields[1], fields[2], new StringOps(Predef$.MODULE$.augmentString(fields[3])).toFloat());
                }
            }, ClassTag$.MODULE$.apply(DStreamData.class));
            batchData.foreachRDD((Function2)new Serializable(spark, tableName){
                public static final long serialVersionUID = 0L;
                private final SparkSession spark$2;
                private final String tableName$2;

                public final void apply(RDD<DStreamData> rdd, Time time) {
                    JavaUniverse $u = package$.MODULE$.universe();
                    JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(StreamingUsingBatchLoadExample$.MODULE$.getClass().getClassLoader());
                    public final class Org_apache_carbondata_examples_StreamingUsingBatchLoadExample$$anonfun$startStreaming$1$$typecreator1$1
                    extends TypeCreator {
                        public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                            Universe $u = $m$untyped.universe();
                            Mirror<U> $m = $m$untyped;
                            return $m.staticClass("org.apache.carbondata.examples.DStreamData").asType().toTypeConstructor();
                        }

                        public Org_apache_carbondata_examples_StreamingUsingBatchLoadExample$$anonfun$startStreaming$1$$typecreator1$1(anonfun.startStreaming.1 $outer) {
                        }
                    }
                    Dataset df = this.spark$2.createDataFrame(rdd, ((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Org_apache_carbondata_examples_StreamingUsingBatchLoadExample$$anonfun$startStreaming$1$$typecreator1$1(this))).toDF((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"id", "name", "city", "salary"}));
                    Predef$.MODULE$.println((Object)new StringBuilder().append((Object)"at time: ").append((Object)time.toString()).append((Object)" the count of received data: ").append((Object)BoxesRunTime.boxToLong((long)df.count())).toString());
                    df.write().format("carbondata").option("tableName", this.tableName$2).mode(SaveMode.Append).save();
                }
                {
                    this.spark$2 = spark$2;
                    this.tableName$2 = tableName$2;
                }
            });
        }
        catch (Exception exception) {
            exception.printStackTrace();
            Predef$.MODULE$.println((Object)"Done reading and writing streaming data");
        }
        return ssc;
    }

    /*
     * WARNING - void declaration
     */
    public Thread writeSocket(ServerSocket serverSocket) {
        void var2_2;
        Thread thread = new Thread(serverSocket){
            private final ServerSocket serverSocket$1;

            public void run() {
                Socket clientSocket = this.serverSocket$1.accept();
                PrintWriter socketWriter = new PrintWriter(clientSocket.getOutputStream());
                IntRef index = IntRef.create((int)0);
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 1000).foreach$mVc$sp((Function1)new Serializable(this, socketWriter, index){
                    public static final long serialVersionUID = 0L;
                    public final PrintWriter socketWriter$1;
                    public final IntRef index$1;

                    public final void apply(int _) {
                        this.apply$mcVI$sp(_);
                    }

                    public void apply$mcVI$sp(int _) {
                        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1000).foreach$mVc$sp((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;
                            private final /* synthetic */ anon$3$$anonfun$run$2 $outer;

                            public final void apply(int _) {
                                this.apply$mcVI$sp(_);
                            }

                            public void apply$mcVI$sp(int _) {
                                ++this.$outer.index$1.elem;
                                this.$outer.socketWriter$1.println(new StringBuilder().append((Object)((Object)BoxesRunTime.boxToInteger((int)this.$outer.index$1.elem)).toString()).append((Object)",name_").append((Object)BoxesRunTime.boxToInteger((int)this.$outer.index$1.elem)).append((Object)",city_").append((Object)BoxesRunTime.boxToInteger((int)this.$outer.index$1.elem)).append((Object)",").append((Object)((Object)BoxesRunTime.boxToDouble((double)((double)this.$outer.index$1.elem * 10000.0))).toString()).append((Object)",school_").append((Object)BoxesRunTime.boxToInteger((int)this.$outer.index$1.elem)).append((Object)":school_").append((Object)BoxesRunTime.boxToInteger((int)this.$outer.index$1.elem)).append((Object)BoxesRunTime.boxToInteger((int)this.$outer.index$1.elem)).append((Object)"$").append((Object)BoxesRunTime.boxToInteger((int)this.$outer.index$1.elem)).toString());
                            }
                            {
                                if ($outer == null) {
                                    throw null;
                                }
                                this.$outer = $outer;
                            }
                        });
                        this.socketWriter$1.flush();
                        Thread.sleep(1000L);
                    }
                    {
                        this.socketWriter$1 = socketWriter$1;
                        this.index$1 = index$1;
                    }
                });
                socketWriter.close();
                System.out.println("Socket closed");
            }
            {
                this.serverSocket$1 = serverSocket$1;
            }
        };
        thread.start();
        return var2_2;
    }

    private StreamingUsingBatchLoadExample$() {
        MODULE$ = this;
    }
}

