/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.Schemas;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.schemas.transforms.Cast;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableListMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.joda.time.ReadableDateTime;

class NestedPayloadKafkaTable
extends BeamKafkaTable {
    @Nullable
    private final @UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized PayloadSerializer payloadSerializer;

    public NestedPayloadKafkaTable(@UnknownKeyFor @NonNull @Initialized Schema beamSchema, @UnknownKeyFor @NonNull @Initialized String bootstrapServers, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> topics, @UnknownKeyFor @NonNull @Initialized Optional<@UnknownKeyFor @NonNull @Initialized PayloadSerializer> payloadSerializer) {
        this(beamSchema, bootstrapServers, topics, payloadSerializer, TimestampPolicyFactory.withLogAppendTime());
    }

    public NestedPayloadKafkaTable(@UnknownKeyFor @NonNull @Initialized Schema beamSchema, @UnknownKeyFor @NonNull @Initialized String bootstrapServers, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> topics, @UnknownKeyFor @NonNull @Initialized Optional<@UnknownKeyFor @NonNull @Initialized PayloadSerializer> payloadSerializer, @UnknownKeyFor @NonNull @Initialized TimestampPolicyFactory timestampPolicyFactory) {
        super(beamSchema, bootstrapServers, topics, timestampPolicyFactory);
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((boolean)Schemas.isNestedSchema(this.schema));
        Schemas.validateNestedSchema(this.schema);
        if (payloadSerializer.isPresent()) {
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((boolean)this.schema.getField("payload").getType().getTypeName().equals((Object)Schema.TypeName.ROW));
            this.payloadSerializer = payloadSerializer.get();
        } else {
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((boolean)this.schema.getField("payload").getType().equals((Object)Schema.FieldType.BYTES));
            this.payloadSerializer = null;
        }
    }

    @Override
    protected @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KafkaRecord<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row>> getPTransformForInput() {
        return new PTransform<PCollection<KafkaRecord<byte[], byte[]>>, PCollection<Row>>(){

            public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row> expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KafkaRecord<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>> input) {
                return (PCollection)input.apply((PTransform)MapElements.into((TypeDescriptor)new TypeDescriptor<Row>(){}).via((SerializableFunction & Serializable)record -> NestedPayloadKafkaTable.this.transformInput((KafkaRecord<byte[], byte[]>)record)));
            }
        };
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized Row transformInput(@UnknownKeyFor @NonNull @Initialized KafkaRecord<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> record) {
        Headers recordHeaders;
        Row.FieldValueBuilder builder = Row.withSchema((Schema)this.getSchema()).withFieldValues((Map)ImmutableMap.of());
        if (this.schema.hasField("message_key")) {
            builder.withFieldValue("message_key", record.getKV().getKey());
        }
        if (this.schema.hasField("event_timestamp")) {
            builder.withFieldValue("event_timestamp", (Object)Instant.ofEpochMilli((long)record.getTimestamp()));
        }
        if (this.schema.hasField("headers") && (recordHeaders = record.getHeaders()) != null) {
            ImmutableListMultimap.Builder headersBuilder = ImmutableListMultimap.builder();
            recordHeaders.forEach(header -> headersBuilder.put((Object)header.key(), (Object)header.value()));
            ImmutableList.Builder listBuilder = ImmutableList.builder();
            headersBuilder.build().asMap().forEach((key, values) -> {
                Row entry = Row.withSchema((Schema)Schemas.HEADERS_ENTRY_SCHEMA).withFieldValue("key", key).withFieldValue("values", values).build();
                listBuilder.add((Object)entry);
            });
            builder.withFieldValue("headers", (Object)listBuilder.build());
        }
        if (this.payloadSerializer == null) {
            builder.withFieldValue("payload", record.getKV().getValue());
        } else {
            byte[] payload = (byte[])record.getKV().getValue();
            if (payload != null) {
                builder.withFieldValue("payload", (Object)this.payloadSerializer.deserialize((byte[])record.getKV().getValue()));
            }
        }
        return builder.build();
    }

    @Override
    protected @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized ProducerRecord<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>>> getPTransformForOutput() {
        return new PTransform<PCollection<Row>, PCollection<ProducerRecord<byte[], byte[]>>>(){

            public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized ProducerRecord<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>> expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row> input) {
                return (PCollection)input.apply((PTransform)MapElements.into((TypeDescriptor)new TypeDescriptor<ProducerRecord<byte[], byte[]>>(){}).via((SerializableFunction & Serializable)row -> NestedPayloadKafkaTable.this.transformOutput((Row)row)));
            }
        };
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized ProducerRecord<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> transformOutput(@UnknownKeyFor @NonNull @Initialized Row row) {
        ReadableDateTime time;
        row = Cast.castRow((Row)row, (Schema)row.getSchema(), (Schema)this.schema);
        String topic = (String)Iterables.getOnlyElement(this.getTopics());
        byte[] key = null;
        ImmutableList headers = ImmutableList.of();
        Long timestampMillis = null;
        if (this.schema.hasField("message_key")) {
            key = row.getBytes("message_key");
        }
        if (this.schema.hasField("event_timestamp") && (time = row.getDateTime("event_timestamp")) != null) {
            timestampMillis = time.getMillis();
        }
        if (this.schema.hasField("headers")) {
            Collection headerRows = (Collection)Preconditions.checkArgumentNotNull((Object)row.getArray("headers"));
            ImmutableList.Builder headersBuilder = ImmutableList.builder();
            headerRows.forEach(entry -> {
                String headerKey = (String)Preconditions.checkArgumentNotNull((Object)entry.getString("key"));
                Collection values = (Collection)Preconditions.checkArgumentNotNull((Object)entry.getArray("values"));
                values.forEach(value -> headersBuilder.add((Object)new RecordHeader(headerKey, value)));
            });
            headers = headersBuilder.build();
        }
        byte[] payload = this.payloadSerializer == null ? row.getBytes("payload") : this.payloadSerializer.serialize((Row)Preconditions.checkArgumentNotNull((Object)row.getRow("payload")));
        return new ProducerRecord(topic, null, timestampMillis, (Object)key, (Object)payload, (Iterable)headers);
    }
}

