package org.apache.flink.table.examples.java.connectors;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/* loaded from: input_file:org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.class */
public final class ChangelogSocketExample {
    public static void main(String[] strArr) throws Exception {
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        String str = fromArgs.get("hostname", "localhost");
        String str2 = fromArgs.get("port", "9999");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        create.executeSql("CREATE TABLE UserScores (name STRING, score INT)\nWITH (\n  'connector' = 'socket',\n  'hostname' = '" + str + "',\n  'port' = '" + str2 + "',\n  'byte-delimiter' = '10',\n  'format' = 'changelog-csv',\n  'changelog-csv.column-delimiter' = '|'\n)");
        create.toRetractStream(create.sqlQuery("SELECT name, SUM(score) FROM UserScores GROUP BY name"), Row.class).print();
        executionEnvironment.execute();
    }
}
