/*
 * Decompiled with CFR 0.152.
 */
package org.apache.carbondata.examples;

import java.io.File;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.examples.StructuredStreamingExample$;
import org.apache.carbondata.examples.StructuredStreamingExample$$anon$3$;
import org.apache.carbondata.examples.util.ExampleUtils$;
import org.apache.spark.sql.CarbonEnv$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.ProcessingTime$;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;

public final class StructuredStreamingExample$ {
    public static final StructuredStreamingExample$ MODULE$;

    static {
        new StructuredStreamingExample$();
    }

    public void main(String[] args) {
        String rootPath = new File(new StringBuilder().append((Object)this.getClass().getResource("/").getPath()).append((Object)"../../../..").toString()).getCanonicalPath();
        SparkSession spark = ExampleUtils$.MODULE$.createSparkSession("StructuredStreamingExample", 4);
        String streamTableName = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"stream_table"})).s((Seq)Nil$.MODULE$);
        boolean requireCreateTable = true;
        boolean useComplexDataType = false;
        if (requireCreateTable) {
            spark.sql(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"DROP TABLE IF EXISTS ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName})));
            Dataset dataset = useComplexDataType ? spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n             | CREATE TABLE ", "(\n             | id INT,\n             | name STRING,\n             | salary FLOAT,\n             | file struct<school:array<string>, age:int>\n             | )\n             | STORED AS carbondata\n             | TBLPROPERTIES(\n             | 'streaming'='true', 'sort_columns'='name')\n             | "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName})))).stripMargin()) : spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n             | CREATE TABLE ", "(\n             | id INT,\n             | name STRING,\n             | salary FLOAT\n             | )\n             | STORED AS carbondata\n             | TBLPROPERTIES(\n             | 'streaming'='true', 'sort_columns'='name')\n             | "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName})))).stripMargin());
            CarbonTable carbonTable = CarbonEnv$.MODULE$.getCarbonTable((Option)new Some((Object)"default"), streamTableName, spark);
            String path = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/examples/spark/src/main/resources/streamSample.csv"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{rootPath}));
            spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n           | LOAD DATA LOCAL INPATH '", "'\n           | INTO TABLE ", "\n           | OPTIONS('HEADER'='true')\n         "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{path, streamTableName})))).stripMargin());
            ServerSocket serverSocket = new ServerSocket(7071);
            Thread thread1 = this.startStreaming(spark, carbonTable);
            Thread thread2 = this.writeSocket(serverSocket, this.writeSocket$default$2());
            Thread thread3 = this.showTableCount(spark, streamTableName);
            System.out.println("type enter to interrupt streaming");
            System.in.read();
            thread1.interrupt();
            thread2.interrupt();
            thread3.interrupt();
            serverSocket.close();
        }
        spark.sql(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"select count(*) from ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName}))).show(100, false);
        spark.sql(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"select * from ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName}))).show(100, false);
        spark.sql(new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"select * "})).s((Seq)Nil$.MODULE$)).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"from ", " "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"where id = 100000001 or id = 1 limit 100"})).s((Seq)Nil$.MODULE$)).toString()).show(100, false);
        spark.sql(new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"select * "})).s((Seq)Nil$.MODULE$)).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"from ", " "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"where id < 10 limit 100"})).s((Seq)Nil$.MODULE$)).toString()).show(100, false);
        if (useComplexDataType) {
            spark.sql(new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"select file.age, file.school "})).s((Seq)Nil$.MODULE$)).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"from ", " "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{streamTableName}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"where where file.age = 30 "})).s((Seq)Nil$.MODULE$)).toString()).show(100, false);
        }
        spark.stop();
        System.out.println("streaming finished");
    }

    /*
     * WARNING - void declaration
     */
    public Thread showTableCount(SparkSession spark, String tableName) {
        void var3_3;
        Thread thread = new Thread(spark, tableName){
            public final SparkSession spark$1;
            public final String tableName$1;

            public void run() {
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1000).foreach$mVc$sp((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ anon.1 $outer;

                    public final void apply(int _) {
                        this.apply$mcVI$sp(_);
                    }

                    public void apply$mcVI$sp(int _) {
                        this.$outer.spark$1.sql(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"select count(*) from ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.tableName$1}))).show(false);
                        this.$outer.spark$1.sql(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"show segments for table ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.tableName$1}))).show();
                        Thread.sleep(3000L);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                });
            }
            {
                this.spark$1 = spark$1;
                this.tableName$1 = tableName$1;
            }
        };
        thread.start();
        return var3_3;
    }

    /*
     * WARNING - void declaration
     */
    public Thread startStreaming(SparkSession spark, CarbonTable carbonTable) {
        void var3_3;
        Thread thread = new Thread(spark, carbonTable){
            private final SparkSession spark$2;
            private final CarbonTable carbonTable$1;

            public void run() {
                StreamingQuery qry = null;
                try {
                    try {
                        Dataset readSocketDF = this.spark$2.readStream().format("socket").option("host", "localhost").option("port", 7071L).load();
                        qry = readSocketDF.writeStream().format("carbondata").trigger((Trigger)ProcessingTime$.MODULE$.apply("5 seconds")).option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir((String)this.carbonTable$1.getTablePath())).option("dbName", "default").option("tableName", "stream_table").option("carbon.stream.parser", "org.apache.carbondata.streaming.parser.CSVStreamParserImp").start();
                        qry.awaitTermination();
                    }
                    catch (Exception exception) {
                        exception.printStackTrace();
                        Predef$.MODULE$.println((Object)"Done reading and writing streaming data");
                    }
                }
                finally {
                    qry.stop();
                }
            }
            {
                this.spark$2 = spark$2;
                this.carbonTable$1 = carbonTable$1;
            }
        };
        thread.start();
        return var3_3;
    }

    /*
     * WARNING - void declaration
     */
    public Thread writeSocket(ServerSocket serverSocket, String recordFormat) {
        void var3_3;
        Thread thread = new Thread(serverSocket, recordFormat){
            private final ServerSocket serverSocket$1;
            public final String recordFormat$1;

            public void run() {
                Socket clientSocket = this.serverSocket$1.accept();
                PrintWriter socketWriter = new PrintWriter(clientSocket.getOutputStream());
                IntRef index = IntRef.create((int)0);
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 1000).foreach$mVc$sp((Function1)new Serializable(this, socketWriter, index){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ anon.3 $outer;
                    public final PrintWriter socketWriter$1;
                    public final IntRef index$1;

                    public final void apply(int _) {
                        this.apply$mcVI$sp(_);
                    }

                    public void apply$mcVI$sp(int _) {
                        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1000).foreach$mVc$sp((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;
                            private final /* synthetic */ anon$3$$anonfun$run$2 $outer;

                            public final void apply(int _) {
                                this.apply$mcVI$sp(_);
                            }

                            public void apply$mcVI$sp(int _) {
                                String string;
                                block4: {
                                    block3: {
                                        block2: {
                                            ++this.$outer.index$1.elem;
                                            string = this.$outer.org$apache$carbondata$examples$StructuredStreamingExample$$anon$$anonfun$$$outer().recordFormat$1;
                                            if (!"csv".equals(string)) break block2;
                                            this.$outer.socketWriter$1.println(new StringBuilder().append((Object)((Object)BoxesRunTime.boxToInteger((int)this.$outer.index$1.elem)).toString()).append((Object)",name_").append((Object)BoxesRunTime.boxToInteger((int)this.$outer.index$1.elem)).append((Object)",").append((Object)((Object)BoxesRunTime.boxToDouble((double)((double)this.$outer.index$1.elem * 10000.0))).toString()).append((Object)",school_").append((Object)BoxesRunTime.boxToInteger((int)this.$outer.index$1.elem)).append((Object)":school_").append((Object)BoxesRunTime.boxToInteger((int)this.$outer.index$1.elem)).append((Object)BoxesRunTime.boxToInteger((int)this.$outer.index$1.elem)).append((Object)"$").append((Object)BoxesRunTime.boxToInteger((int)this.$outer.index$1.elem)).toString());
                                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                            break block3;
                                        }
                                        if (!"json".equals(string)) break block4;
                                        this.$outer.socketWriter$1.println(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"{\"id\":", ",\"name\":\"s\",\"salary\":4.3,\"file\":{\"school\":[\"a\",\"b\"],\"age\":6}}"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.index$1.elem)})));
                                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                    }
                                    return;
                                }
                                throw new MatchError((Object)string);
                            }
                            {
                                if ($outer == null) {
                                    throw null;
                                }
                                this.$outer = $outer;
                            }
                        });
                        this.socketWriter$1.flush();
                        Thread.sleep(1000L);
                    }

                    public /* synthetic */ anon.3 org$apache$carbondata$examples$StructuredStreamingExample$$anon$$anonfun$$$outer() {
                        return this.$outer;
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.socketWriter$1 = socketWriter$1;
                        this.index$1 = index$1;
                    }
                });
                socketWriter.close();
                System.out.println("Socket closed");
            }
            {
                this.serverSocket$1 = serverSocket$1;
                this.recordFormat$1 = recordFormat$1;
            }
        };
        thread.start();
        return var3_3;
    }

    public String writeSocket$default$2() {
        return "csv";
    }

    private StructuredStreamingExample$() {
        MODULE$ = this;
    }
}

