/*
 * Decompiled with CFR 0.152.
 */
package org.apache.carbondata.examples;

import java.io.File;
import java.io.PrintWriter;
import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.examples.util.ExampleUtils$;
import org.apache.spark.sql.CarbonEnv$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.immutable.StringOps;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;

public final class StructuredStreamingExample$ {
    public static StructuredStreamingExample$ MODULE$;

    static {
        new StructuredStreamingExample$();
    }

    public void main(String[] args) {
        String rootPath = new File(new StringBuilder(11).append(this.getClass().getResource("/").getPath()).append("../../../..").toString()).getCanonicalPath();
        SparkSession spark = ExampleUtils$.MODULE$.createSparkSession("StructuredStreamingExample", 4);
        String streamTableName = "stream_table";
        boolean requireCreateTable = true;
        boolean useComplexDataType = false;
        if (requireCreateTable) {
            spark.sql(new StringBuilder(21).append("DROP TABLE IF EXISTS ").append(streamTableName).toString());
            Dataset dataset = useComplexDataType ? spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(325).append("\n             | CREATE TABLE ").append(streamTableName).append("(\n             | id INT,\n             | name STRING,\n             | salary FLOAT,\n             | file struct<school:array<string>, age:int>\n             | )\n             | STORED AS carbondata\n             | TBLPROPERTIES(\n             | 'streaming'='true', 'sort_columns'='name')\n             | ").toString())).stripMargin()) : spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(266).append("\n             | CREATE TABLE ").append(streamTableName).append("(\n             | id INT,\n             | name STRING,\n             | salary FLOAT\n             | )\n             | STORED AS carbondata\n             | TBLPROPERTIES(\n             | 'streaming'='true', 'sort_columns'='name')\n             | ").toString())).stripMargin());
            CarbonTable carbonTable = CarbonEnv$.MODULE$.getCarbonTable((Option)new Some((Object)"default"), streamTableName, spark);
            String path = new StringBuilder(51).append(rootPath).append("/examples/spark/src/main/resources/streamSample.csv").toString();
            spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(112).append("\n           | LOAD DATA LOCAL INPATH '").append(path).append("'\n           | INTO TABLE ").append(streamTableName).append("\n           | OPTIONS('HEADER'='true')\n         ").toString())).stripMargin());
            ServerSocket serverSocket = new ServerSocket(7071);
            Thread thread1 = this.startStreaming(spark, carbonTable);
            Thread thread2 = this.writeSocket(serverSocket, this.writeSocket$default$2());
            Thread thread3 = this.showTableCount(spark, streamTableName);
            System.out.println("type enter to interrupt streaming");
            System.in.read();
            thread1.interrupt();
            thread2.interrupt();
            thread3.interrupt();
            serverSocket.close();
        }
        spark.sql(new StringBuilder(21).append("select count(*) from ").append(streamTableName).toString()).show(100, false);
        spark.sql(new StringBuilder(14).append("select * from ").append(streamTableName).toString()).show(100, false);
        spark.sql(new StringBuilder(49).append("select * ").append(new StringBuilder(6).append("from ").append(streamTableName).append(" ").toString()).append("where id = 100000001 or id = 1 limit 100").toString()).show(100, false);
        spark.sql(new StringBuilder(32).append("select * ").append(new StringBuilder(6).append("from ").append(streamTableName).append(" ").toString()).append("where id < 10 limit 100").toString()).show(100, false);
        if (useComplexDataType) {
            spark.sql(new StringBuilder(55).append("select file.age, file.school ").append(new StringBuilder(6).append("from ").append(streamTableName).append(" ").toString()).append("where where file.age = 30 ").toString()).show(100, false);
        }
        spark.stop();
        System.out.println("streaming finished");
    }

    /*
     * WARNING - void declaration
     */
    public Thread showTableCount(SparkSession spark, String tableName) {
        void var3_3;
        Thread thread = new Thread(spark, tableName){
            private final SparkSession spark$1;
            private final String tableName$1;

            public void run() {
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1000).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)_ -> {
                    $this.spark$1.sql(new StringBuilder(21).append("select count(*) from ").append($this.tableName$1).toString()).show(false);
                    $this.spark$1.sql(new StringBuilder(24).append("show segments for table ").append($this.tableName$1).toString()).show();
                    Thread.sleep(3000L);
                });
            }
            {
                this.spark$1 = spark$1;
                this.tableName$1 = tableName$1;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$1(org.apache.carbondata.examples.StructuredStreamingExample$$anon$1 int )}, serializedLambda);
            }
        };
        thread.start();
        return var3_3;
    }

    /*
     * WARNING - void declaration
     */
    public Thread startStreaming(SparkSession spark, CarbonTable carbonTable) {
        void var3_3;
        Thread thread = new Thread(spark, carbonTable){
            private final SparkSession spark$2;
            private final CarbonTable carbonTable$1;

            public void run() {
                StreamingQuery qry = null;
                try {
                    try {
                        Dataset readSocketDF = this.spark$2.readStream().format("socket").option("host", "localhost").option("port", 7071L).load();
                        qry = readSocketDF.writeStream().format("carbondata").trigger(Trigger.ProcessingTime((String)"5 seconds")).option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir((String)this.carbonTable$1.getTablePath())).option("dbName", "default").option("tableName", "stream_table").option("carbon.stream.parser", "org.apache.carbondata.streaming.parser.CSVStreamParserImp").start();
                        qry.awaitTermination();
                    }
                    catch (Exception ex) {
                        ex.printStackTrace();
                        Predef$.MODULE$.println((Object)"Done reading and writing streaming data");
                    }
                }
                finally {
                    qry.stop();
                }
            }
            {
                this.spark$2 = spark$2;
                this.carbonTable$1 = carbonTable$1;
            }
        };
        thread.start();
        return var3_3;
    }

    /*
     * WARNING - void declaration
     */
    public Thread writeSocket(ServerSocket serverSocket, String recordFormat) {
        void var3_3;
        Thread thread = new Thread(serverSocket, recordFormat){
            private final ServerSocket serverSocket$1;
            private final String recordFormat$1;

            public void run() {
                Socket clientSocket = this.serverSocket$1.accept();
                PrintWriter socketWriter = new PrintWriter(clientSocket.getOutputStream());
                IntRef index = IntRef.create((int)0);
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 1000).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)_2 -> {
                    RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1000).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)_ -> {
                        ++index$1.elem;
                        String string = $this.recordFormat$1;
                        if ("csv".equals(string)) {
                            socketWriter.println(new StringBuilder(24).append(((Object)BoxesRunTime.boxToInteger((int)index$1.elem)).toString()).append(",name_").append(index$1.elem).append(",").append(((Object)BoxesRunTime.boxToDouble((double)((double)index$1.elem * 10000.0))).toString()).append(",school_").append(index$1.elem).append(":school_").append(index$1.elem).append(index$1.elem).append("$").append(index$1.elem).toString());
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        } else if ("json".equals(string)) {
                            socketWriter.println(new StringBuilder(67).append("{\"id\":").append(index$1.elem).append(",\"name\":\"s\",\"salary\":4.3,\"file\":{\"school\":[\"a\",\"b\"],\"age\":6}}").toString());
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        } else {
                            throw new MatchError((Object)string);
                        }
                    });
                    socketWriter.flush();
                    Thread.sleep(1000L);
                });
                socketWriter.close();
                System.out.println("Socket closed");
            }
            {
                this.serverSocket$1 = serverSocket$1;
                this.recordFormat$1 = recordFormat$1;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$2(org.apache.carbondata.examples.StructuredStreamingExample$$anon$3 scala.runtime.IntRef java.io.PrintWriter int ), $anonfun$run$3(org.apache.carbondata.examples.StructuredStreamingExample$$anon$3 scala.runtime.IntRef java.io.PrintWriter int )}, serializedLambda);
            }
        };
        thread.start();
        return var3_3;
    }

    public String writeSocket$default$2() {
        return "csv";
    }

    private StructuredStreamingExample$() {
        MODULE$ = this;
    }
}

