/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.commons.csv.CSVFormat;
import org.apache.kafka.clients.producer.ProducerRecord;

public class BeamKafkaCSVTable
extends BeamKafkaTable {
    private final CSVFormat csvFormat;

    public BeamKafkaCSVTable(Schema beamSchema, String bootstrapServers, List<String> topics) {
        this(beamSchema, bootstrapServers, topics, CSVFormat.DEFAULT);
    }

    public BeamKafkaCSVTable(Schema beamSchema, String bootstrapServers, List<String> topics, CSVFormat format) {
        super(beamSchema, bootstrapServers, topics);
        this.csvFormat = format;
    }

    @Override
    protected PTransform<PCollection<KafkaRecord<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
        return new CsvRecorderDecoder(this.schema, this.csvFormat);
    }

    @Override
    protected PTransform<PCollection<Row>, PCollection<ProducerRecord<byte[], byte[]>>> getPTransformForOutput() {
        return new CsvRecorderEncoder(this.csvFormat, (String)Iterables.getOnlyElement(this.getTopics()));
    }

    private static class CsvRecorderEncoder
    extends PTransform<PCollection<Row>, PCollection<ProducerRecord<byte[], byte[]>>> {
        private final CSVFormat format;
        private final String topic;

        CsvRecorderEncoder(CSVFormat format, String topic) {
            this.format = format;
            this.topic = topic;
        }

        public PCollection<ProducerRecord<byte[], byte[]>> expand(PCollection<Row> input) {
            return (PCollection)input.apply("encodeCsvRecord", (PTransform)ParDo.of((DoFn)new DoFn<Row, ProducerRecord<byte[], byte[]>>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c) {
                    Row in = (Row)Preconditions.checkArgumentNotNull((Object)((Row)c.element()));
                    c.output((Object)new ProducerRecord(topic, (Object)new byte[0], (Object)BeamTableUtils.beamRow2CsvLine(in, format).getBytes(StandardCharsets.UTF_8)));
                }
            }));
        }
    }

    private static class CsvRecorderDecoder
    extends PTransform<PCollection<KafkaRecord<byte[], byte[]>>, PCollection<Row>> {
        private final Schema schema;
        private final CSVFormat format;

        CsvRecorderDecoder(Schema schema, CSVFormat format) {
            this.schema = schema;
            this.format = format;
        }

        public PCollection<Row> expand(PCollection<KafkaRecord<byte[], byte[]>> input) {
            return ((PCollection)input.apply("decodeCsvRecord", (PTransform)ParDo.of((DoFn)new DoFn<KafkaRecord<byte[], byte[]>, Row>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c) {
                    KV kv = ((KafkaRecord)Preconditions.checkArgumentNotNull((Object)((KafkaRecord)c.element()))).getKV();
                    String rowInString = new String((byte[])Preconditions.checkArgumentNotNull((Object)((byte[])kv.getValue())), StandardCharsets.UTF_8);
                    for (Row row : BeamTableUtils.csvLines2BeamRows(format, rowInString, schema)) {
                        c.output((Object)row);
                    }
                }
            }))).setRowSchema(this.schema);
        }
    }
}

