/*
 * Decompiled with CFR 0.152.
 */
package org.apache.carbondata.examples;

import java.io.File;
import java.io.PrintWriter;
import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.examples.FileElement;
import org.apache.carbondata.examples.StreamData;
import org.apache.carbondata.examples.StreamingWithRowParserExample$;
import org.apache.carbondata.examples.StreamingWithRowParserExample$$anon$2$;
import org.apache.carbondata.examples.util.ExampleUtils$;
import org.apache.spark.sql.CarbonEnv$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import scala.Function1;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.immutable.StringOps;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;

public final class StreamingWithRowParserExample$ {
    public static StreamingWithRowParserExample$ MODULE$;

    static {
        new StreamingWithRowParserExample$();
    }

    public void main(String[] args) {
        String rootPath = new File(new StringBuilder(11).append(this.getClass().getResource("/").getPath()).append("../../../..").toString()).getCanonicalPath();
        SparkSession spark = ExampleUtils$.MODULE$.createSparkSession("StreamingWithRowParserExample", 4);
        String streamTableName = "stream_table_with_row_parser";
        boolean requireCreateTable = true;
        boolean useComplexDataType = false;
        if (requireCreateTable) {
            spark.sql(new StringBuilder(21).append("DROP TABLE IF EXISTS ").append(streamTableName).toString());
            Dataset dataset = useComplexDataType ? spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(353).append("\n             | CREATE TABLE ").append(streamTableName).append("(\n             | id INT,\n             | name STRING,\n             | city STRING,\n             | salary FLOAT,\n             | file struct<school:array<string>, age:int>\n             | )\n             | STORED AS carbondata\n             | TBLPROPERTIES(\n             | 'streaming'='true', 'sort_columns'='name')\n             | ").toString())).stripMargin()) : spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(294).append("\n             | CREATE TABLE ").append(streamTableName).append("(\n             | id INT,\n             | name STRING,\n             | city STRING,\n             | salary FLOAT\n             | )\n             | STORED AS carbondata\n             | TBLPROPERTIES(\n             | 'streaming'='true', 'sort_columns'='name')\n             | ").toString())).stripMargin());
            CarbonTable carbonTable = CarbonEnv$.MODULE$.getCarbonTable((Option)new Some((Object)"default"), streamTableName, spark);
            String path = new StringBuilder(51).append(rootPath).append("/examples/spark/src/main/resources/streamSample.csv").toString();
            spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(112).append("\n           | LOAD DATA LOCAL INPATH '").append(path).append("'\n           | INTO TABLE ").append(streamTableName).append("\n           | OPTIONS('HEADER'='true')\n         ").toString())).stripMargin());
            ServerSocket serverSocket = new ServerSocket(7071);
            Thread thread1 = this.startStreaming(spark, carbonTable.getTablePath());
            Thread thread2 = this.writeSocket(serverSocket);
            Thread thread3 = this.showTableCount(spark, streamTableName);
            System.out.println("type enter to interrupt streaming");
            System.in.read();
            thread1.interrupt();
            thread2.interrupt();
            thread3.interrupt();
            serverSocket.close();
        }
        spark.sql(new StringBuilder(21).append("select count(*) from ").append(streamTableName).toString()).show(100, false);
        spark.sql(new StringBuilder(14).append("select * from ").append(streamTableName).toString()).show(100, false);
        spark.sql(new StringBuilder(49).append("select * ").append(new StringBuilder(6).append("from ").append(streamTableName).append(" ").toString()).append("where id = 100000001 or id = 1 limit 100").toString()).show(100, false);
        spark.sql(new StringBuilder(32).append("select * ").append(new StringBuilder(6).append("from ").append(streamTableName).append(" ").toString()).append("where id < 10 limit 100").toString()).show(100, false);
        if (useComplexDataType) {
            spark.sql(new StringBuilder(55).append("select file.age, file.school ").append(new StringBuilder(6).append("from ").append(streamTableName).append(" ").toString()).append("where where file.age = 30 ").toString()).show(100, false);
        }
        spark.stop();
        System.out.println("streaming finished");
    }

    /*
     * WARNING - void declaration
     */
    public Thread showTableCount(SparkSession spark, String tableName) {
        void var3_3;
        Thread thread = new Thread(spark, tableName){
            private final SparkSession spark$1;
            private final String tableName$1;

            public void run() {
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1000).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)_ -> {
                    $this.spark$1.sql(new StringBuilder(21).append("select count(*) from ").append($this.tableName$1).toString()).show(false);
                    Thread.sleep(3000L);
                });
            }
            {
                this.spark$1 = spark$1;
                this.tableName$1 = tableName$1;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$1(org.apache.carbondata.examples.StreamingWithRowParserExample$$anon$1 int )}, serializedLambda);
            }
        };
        thread.start();
        return var3_3;
    }

    /*
     * WARNING - void declaration
     */
    public Thread startStreaming(SparkSession spark, String tablePath) {
        void var3_3;
        Thread thread = new Thread(spark, tablePath){
            private final SparkSession spark$2;
            private final String tablePath$1;

            public void run() {
                StreamingQuery qry = null;
                try {
                    try {
                        public final class Org_apache_carbondata_examples_StreamingWithRowParserExample$$anon$2$$typecreator11$1
                        extends TypeCreator {
                            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                                Universe $u = $m$untyped.universe();
                                Mirror<U> $m = $m$untyped;
                                return $m.staticClass("org.apache.carbondata.examples.StreamData").asType().toTypeConstructor();
                            }

                            public Org_apache_carbondata_examples_StreamingWithRowParserExample$$anon$2$$typecreator11$1(anon.2 $outer) {
                            }
                        }
                        JavaUniverse $u = package$.MODULE$.universe();
                        JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(anon.2.class.getClassLoader());
                        Dataset readSocketDF = this.spark$2.readStream().format("socket").option("host", "localhost").option("port", 7071L).load().as(this.spark$2.implicits().newStringEncoder()).map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.split(","), this.spark$2.implicits().newStringArrayEncoder()).map((Function1 & Serializable & scala.Serializable)fields -> {
                            String[] tmp = fields[4].split("\\$");
                            FileElement file = new FileElement(tmp[0].split(":"), new StringOps(Predef$.MODULE$.augmentString(tmp[1])).toInt());
                            return new StringOps(Predef$.MODULE$.augmentString(fields[0])).toInt() % 2 == 0 ? new StreamData(new StringOps(Predef$.MODULE$.augmentString(fields[0])).toInt(), null, fields[2], new StringOps(Predef$.MODULE$.augmentString(fields[3])).toFloat(), file) : new StreamData(new StringOps(Predef$.MODULE$.augmentString(fields[0])).toInt(), fields[1], fields[2], new StringOps(Predef$.MODULE$.augmentString(fields[3])).toFloat(), file);
                        }, this.spark$2.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Org_apache_carbondata_examples_StreamingWithRowParserExample$$anon$2$$typecreator11$1(null))));
                        qry = readSocketDF.writeStream().format("carbondata").trigger(Trigger.ProcessingTime((String)"5 seconds")).option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir((String)this.tablePath$1)).option("dbName", "default").option("tableName", "stream_table_with_row_parser").start();
                        qry.awaitTermination();
                    }
                    catch (Exception ex) {
                        ex.printStackTrace();
                        Predef$.MODULE$.println((Object)"Done reading and writing streaming data");
                    }
                }
                finally {
                    qry.stop();
                }
            }
            {
                this.spark$2 = spark$2;
                this.tablePath$1 = tablePath$1;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$2(java.lang.String ), $anonfun$run$3(java.lang.String[] )}, serializedLambda);
            }
        };
        thread.start();
        return var3_3;
    }

    /*
     * WARNING - void declaration
     */
    public Thread writeSocket(ServerSocket serverSocket) {
        void var2_2;
        Thread thread = new Thread(serverSocket){
            private final ServerSocket serverSocket$1;

            public void run() {
                Socket clientSocket = this.serverSocket$1.accept();
                PrintWriter socketWriter = new PrintWriter(clientSocket.getOutputStream());
                IntRef index = IntRef.create((int)0);
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 1000).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)_2 -> {
                    RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1000).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)_ -> {
                        ++index$1.elem;
                        socketWriter.println(new StringBuilder(30).append(((Object)BoxesRunTime.boxToInteger((int)index$1.elem)).toString()).append(",name_").append(index$1.elem).append(",city_").append(index$1.elem).append(",").append(((Object)BoxesRunTime.boxToDouble((double)((double)index$1.elem * 10000.0))).toString()).append(",school_").append(index$1.elem).append(":school_").append(index$1.elem).append(index$1.elem).append("$").append(index$1.elem).toString());
                    });
                    socketWriter.flush();
                    Thread.sleep(1000L);
                });
                socketWriter.close();
                System.out.println("Socket closed");
            }
            {
                this.serverSocket$1 = serverSocket$1;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$4(scala.runtime.IntRef java.io.PrintWriter int ), $anonfun$run$5(scala.runtime.IntRef java.io.PrintWriter int )}, serializedLambda);
            }
        };
        thread.start();
        return var2_2;
    }

    private StreamingWithRowParserExample$() {
        MODULE$ = this;
    }
}

