/*
 * Decompiled with CFR 0.152.
 */
package za.co.absa.abris.examples;

import java.io.Serializable;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.io.BufferedSource;
import scala.io.Codec$;
import scala.io.Source$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.SymbolLiteral;
import za.co.absa.abris.avro.format.SparkAvroConversions$;
import za.co.absa.abris.avro.functions$;
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils$;
import za.co.absa.abris.avro.read.confluent.SchemaManager$;
import za.co.absa.abris.examples.data.generation.ComplexRecordsGenerator$;
import za.co.absa.abris.examples.utils.ExamplesUtils$;

public final class ConfluentKafkaAvroWriterWithKey$ {
    public static ConfluentKafkaAvroWriterWithKey$ MODULE$;
    private final String PARAM_JOB_NAME;
    private final String PARAM_JOB_MASTER;
    private final String PARAM_PAYLOAD_AVRO_SCHEMA;
    private final String PARAM_KEY_AVRO_SCHEMA;
    private final String PARAM_AVRO_RECORD_NAME;
    private final String PARAM_AVRO_RECORD_NAMESPACE;
    private final String PARAM_INFER_SCHEMA;
    private final String PARAM_LOG_LEVEL;
    private final String PARAM_TEST_DATA_ENTRIES;
    private final String PARAM_EXECUTION_REPEAT;
    private final String PARAM_NUM_PARTITIONS;
    private final String PARAM_TOPIC;

    static {
        new ConfluentKafkaAvroWriterWithKey$();
    }

    private String PARAM_JOB_NAME() {
        return this.PARAM_JOB_NAME;
    }

    private String PARAM_JOB_MASTER() {
        return this.PARAM_JOB_MASTER;
    }

    private String PARAM_PAYLOAD_AVRO_SCHEMA() {
        return this.PARAM_PAYLOAD_AVRO_SCHEMA;
    }

    private String PARAM_KEY_AVRO_SCHEMA() {
        return this.PARAM_KEY_AVRO_SCHEMA;
    }

    private String PARAM_AVRO_RECORD_NAME() {
        return this.PARAM_AVRO_RECORD_NAME;
    }

    private String PARAM_AVRO_RECORD_NAMESPACE() {
        return this.PARAM_AVRO_RECORD_NAMESPACE;
    }

    private String PARAM_INFER_SCHEMA() {
        return this.PARAM_INFER_SCHEMA;
    }

    private String PARAM_LOG_LEVEL() {
        return this.PARAM_LOG_LEVEL;
    }

    private String PARAM_TEST_DATA_ENTRIES() {
        return this.PARAM_TEST_DATA_ENTRIES;
    }

    private String PARAM_EXECUTION_REPEAT() {
        return this.PARAM_EXECUTION_REPEAT;
    }

    private String PARAM_NUM_PARTITIONS() {
        return this.PARAM_NUM_PARTITIONS;
    }

    private String PARAM_TOPIC() {
        return this.PARAM_TOPIC;
    }

    public void main(String[] args) {
        ExamplesUtils$.MODULE$.checkArgs(args);
        Properties properties = ExamplesUtils$.MODULE$.loadProperties(args);
        SparkSession spark = ExamplesUtils$.MODULE$.getSparkSession(properties, this.PARAM_JOB_NAME(), this.PARAM_JOB_MASTER(), this.PARAM_LOG_LEVEL());
        spark.sparkContext().setLogLevel(properties.getProperty(this.PARAM_LOG_LEVEL()));
        Encoder<Row> encoder = this.getEncoder(properties);
        do {
            List<Row> rows = this.createRows(new StringOps(Predef$.MODULE$.augmentString(properties.getProperty(this.PARAM_TEST_DATA_ENTRIES()).trim())).toInt());
            Dataset dataFrame = spark.implicits().rddToDatasetHolder(spark.sparkContext().parallelize(rows, new StringOps(Predef$.MODULE$.augmentString(properties.getProperty(this.PARAM_NUM_PARTITIONS()))).toInt(), ClassTag$.MODULE$.apply(Row.class)), encoder).toDF();
            dataFrame.show(false);
            ExamplesUtils$.MODULE$.WriterRowOptions((DataFrameWriter<Row>)this.toAvro((Dataset<Row>)dataFrame, (Map<String, String>)((TraversableOnce)JavaConverters$.MODULE$.propertiesAsScalaMapConverter(properties).asScala()).toMap(Predef$.MODULE$.$conforms())).write().format("kafka")).addOptions(properties).save();
        } while (new StringOps(Predef$.MODULE$.augmentString(properties.getProperty(this.PARAM_EXECUTION_REPEAT()))).toBoolean());
    }

    private Dataset<Row> toAvro(Dataset<Row> dataFrame, Map<String, String> properties) {
        Dataset dataset;
        Map commonRegistryConfig = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)SchemaManager$.MODULE$.PARAM_SCHEMA_REGISTRY_TOPIC()), properties.apply((Object)this.PARAM_TOPIC())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)SchemaManager$.MODULE$.PARAM_SCHEMA_REGISTRY_URL()), properties.apply((Object)SchemaManager$.MODULE$.PARAM_SCHEMA_REGISTRY_URL())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)SchemaManager$.MODULE$.PARAM_SCHEMA_NAME_FOR_RECORD_STRATEGY()), properties.apply((Object)this.PARAM_AVRO_RECORD_NAME())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)SchemaManager$.MODULE$.PARAM_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY()), properties.apply((Object)this.PARAM_AVRO_RECORD_NAMESPACE()))}));
        Map valueRegistryConfig = commonRegistryConfig.$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)SchemaManager$.MODULE$.PARAM_VALUE_SCHEMA_NAMING_STRATEGY()), properties.apply((Object)SchemaManager$.MODULE$.PARAM_VALUE_SCHEMA_NAMING_STRATEGY())));
        Map keyRegistryConfig = commonRegistryConfig.$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)SchemaManager$.MODULE$.PARAM_KEY_SCHEMA_NAMING_STRATEGY()), properties.apply((Object)SchemaManager$.MODULE$.PARAM_KEY_SCHEMA_NAMING_STRATEGY())));
        boolean inferSchema = new StringOps(Predef$.MODULE$.augmentString(((String)properties.apply((Object)this.PARAM_INFER_SCHEMA())).trim())).toBoolean();
        if (inferSchema) {
            dataset = dataFrame.select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.to_confluent_avro(org.apache.spark.sql.functions$.MODULE$.col("key"), (Map<String, String>)keyRegistryConfig).as((Symbol)SymbolLiteral.bootstrap("apply", "key")), functions$.MODULE$.to_confluent_avro(org.apache.spark.sql.functions$.MODULE$.col("value"), (Map<String, String>)valueRegistryConfig).as((Symbol)SymbolLiteral.bootstrap("apply", "value"))}));
        } else {
            String valueSchema = this.loadSchemaFromFile((String)properties.apply((Object)this.PARAM_PAYLOAD_AVRO_SCHEMA()));
            String keySchema = this.loadSchemaFromFile((String)properties.apply((Object)this.PARAM_KEY_AVRO_SCHEMA()));
            dataset = dataFrame.select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.to_confluent_avro(org.apache.spark.sql.functions$.MODULE$.col("key"), keySchema, (Map<String, String>)keyRegistryConfig).as((Symbol)SymbolLiteral.bootstrap("apply", "key")), functions$.MODULE$.to_confluent_avro(org.apache.spark.sql.functions$.MODULE$.col("value"), valueSchema, (Map<String, String>)valueRegistryConfig).as((Symbol)SymbolLiteral.bootstrap("apply", "value"))}));
        }
        return dataset;
    }

    private List<Row> createRows(int howMany) {
        IntRef count = IntRef.create((int)0);
        return (List)ComplexRecordsGenerator$.MODULE$.generateUnparsedRows(howMany).map((Function1 & Serializable & scala.Serializable)row -> {
            ++count$1.elem;
            return Row$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Row$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)count$1.elem), new StringBuilder(16).append("whatever string ").append(count$1.elem).toString()})), row}));
        }, List$.MODULE$.canBuildFrom());
    }

    private Encoder<Row> getEncoder(Properties properties) {
        Schema avroSchema = AvroSchemaUtils$.MODULE$.parse(ComplexRecordsGenerator$.MODULE$.usedAvroSchema());
        StructType payloadSparkSchema = SparkAvroConversions$.MODULE$.toSqlType(avroSchema);
        Tuple2<Schema, Schema> avroSchemas = this.getKeyAndPayloadSchemas(properties);
        StructField keySparkSchema = new StructField("key", (DataType)SparkAvroConversions$.MODULE$.toSqlType((Schema)avroSchemas._1()), false, StructField$.MODULE$.apply$default$4());
        StructField valueSparkSchema = new StructField("value", (DataType)payloadSparkSchema, false, StructField$.MODULE$.apply$default$4());
        StructType finalSchema = new StructType((StructField[])((Object[])new StructField[]{keySparkSchema, valueSparkSchema}));
        return RowEncoder$.MODULE$.apply(finalSchema);
    }

    private Tuple2<Schema, Schema> getKeyAndPayloadSchemas(Properties properties) {
        Schema keyAvroSchema = AvroSchemaUtils$.MODULE$.load(properties.getProperty(this.PARAM_KEY_AVRO_SCHEMA()));
        Schema payloadAvroSchema = AvroSchemaUtils$.MODULE$.load(properties.getProperty(this.PARAM_PAYLOAD_AVRO_SCHEMA()));
        return new Tuple2((Object)keyAvroSchema, (Object)payloadAvroSchema);
    }

    private String loadSchemaFromFile(String path) {
        String string;
        try (BufferedSource source = Source$.MODULE$.fromFile(path, Codec$.MODULE$.fallbackSystemCodec());){
            string = source.mkString();
        }
        return string;
    }

    private ConfluentKafkaAvroWriterWithKey$() {
        MODULE$ = this;
        this.PARAM_JOB_NAME = "job.name";
        this.PARAM_JOB_MASTER = "job.master";
        this.PARAM_PAYLOAD_AVRO_SCHEMA = "payload.avro.schema";
        this.PARAM_KEY_AVRO_SCHEMA = "key.avro.schema";
        this.PARAM_AVRO_RECORD_NAME = "avro.record.name";
        this.PARAM_AVRO_RECORD_NAMESPACE = "avro.record.namespace";
        this.PARAM_INFER_SCHEMA = "infer.schema";
        this.PARAM_LOG_LEVEL = "log.level";
        this.PARAM_TEST_DATA_ENTRIES = "test.data.entries";
        this.PARAM_EXECUTION_REPEAT = "execution.repeat";
        this.PARAM_NUM_PARTITIONS = "num.partitions";
        this.PARAM_TOPIC = "option.topic";
    }
}

