/*
 * Decompiled with CFR 0.152.
 */
package org.apache.carbondata.examples;

import java.net.ServerSocket;
import org.apache.carbondata.examples.StructuredStreamingExample$;
import org.apache.carbondata.examples.util.ExampleUtils$;
import org.apache.spark.sql.SparkSession;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.runtime.BoxedUnit;

public final class StreamSQLExample$ {
    public static final StreamSQLExample$ MODULE$;

    static {
        new StreamSQLExample$();
    }

    public void main(String[] args) {
        BoxedUnit boxedUnit;
        SparkSession spark = ExampleUtils$.MODULE$.createSparkSession("StructuredStreamingExample", 4);
        boolean requireCreateTable = true;
        String recordFormat = "json";
        if (requireCreateTable) {
            spark.sql(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"DROP TABLE IF EXISTS sink"})).s((Seq)Nil$.MODULE$));
            spark.sql("DROP TABLE IF EXISTS source");
            boxedUnit = spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n           | CREATE TABLE sink(\n           | id INT,\n           | name STRING,\n           | salary FLOAT,\n           | file struct<school:array<string>, age:int>\n           | )\n           | STORED AS carbondata\n           | TBLPROPERTIES(\n           | 'streaming'='true', 'sort_columns'='')\n          "})).s((Seq)Nil$.MODULE$))).stripMargin());
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n        | CREATE TABLE source (\n        | id INT,\n        | name STRING,\n        | salary FLOAT,\n        | file struct<school:array<string>, age:int>\n        | )\n        | STORED AS carbondata\n        | TBLPROPERTIES(\n        | 'streaming'='source',\n        | 'format'='socket',\n        | 'host'='localhost',\n        | 'port'='7071',\n        | 'record_format'='", "'\n        | )\n      "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{recordFormat})))).stripMargin());
        ServerSocket serverSocket = new ServerSocket(7071);
        spark.sql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n        | CREATE STREAM ingest ON TABLE sink\n        | STMPROPERTIES(\n        | 'trigger' = 'ProcessingTime',\n        | 'interval' = '3 seconds')\n        | AS SELECT * FROM source\n      "})).s((Seq)Nil$.MODULE$))).stripMargin());
        Thread thread1 = StructuredStreamingExample$.MODULE$.writeSocket(serverSocket, recordFormat);
        Thread thread2 = StructuredStreamingExample$.MODULE$.showTableCount(spark, "sink");
        System.out.println("type enter to interrupt streaming");
        System.in.read();
        thread1.interrupt();
        thread2.interrupt();
        serverSocket.close();
        spark.sql("DROP STREAM ingest").show();
        spark.stop();
        System.out.println("streaming finished");
    }

    private StreamSQLExample$() {
        MODULE$ = this;
    }
}

