/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auto.service.AutoService;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.beam.sdk.extensions.sql.TableUtils;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaCSVTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.NestedPayloadKafkaTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.PayloadSerializerKafkaTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.Schemas;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.format.PeriodFormat;

@AutoService(value={TableProvider.class})
public class KafkaTableProvider
extends InMemoryMetaTableProvider {
    private static @UnknownKeyFor @NonNull @Initialized ParsedLocation parseLocation(@UnknownKeyFor @NonNull @Initialized String location) {
        ParsedLocation parsed = new ParsedLocation();
        List split = Splitter.on((char)'/').splitToList((CharSequence)location);
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((split.size() >= 2 ? 1 : 0) != 0, (String)"Location string `%s` invalid: must be <broker bootstrap location>/<topic>.", (Object)location);
        parsed.topic = (String)Iterables.getLast((Iterable)split);
        parsed.brokerLocation = String.join((CharSequence)"/", split.subList(0, split.size() - 1));
        return parsed;
    }

    private static @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> mergeParam(@UnknownKeyFor @NonNull @Initialized Optional<@UnknownKeyFor @NonNull @Initialized String> initial, @Nullable @UnknownKeyFor @Initialized ArrayNode toMerge) {
        ImmutableList.Builder merged = ImmutableList.builder();
        initial.ifPresent(arg_0 -> ((ImmutableList.Builder)merged).add(arg_0));
        if (toMerge != null) {
            toMerge.forEach(o -> merged.add((Object)o.asText()));
        }
        return merged.build();
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized BeamSqlTable buildBeamSqlTable(@UnknownKeyFor @NonNull @Initialized Table table) {
        PayloadSerializer serializer;
        Schema schema = table.getSchema();
        ObjectNode properties = table.getProperties();
        Optional<Object> parsedLocation = Optional.empty();
        if (!Strings.isNullOrEmpty((String)table.getLocation())) {
            parsedLocation = Optional.of(KafkaTableProvider.parseLocation((String)Preconditions.checkArgumentNotNull((Object)table.getLocation())));
        }
        List<String> topics = KafkaTableProvider.mergeParam(parsedLocation.map(loc -> loc.topic), (ArrayNode)properties.get("topics"));
        List<String> allBootstrapServers = KafkaTableProvider.mergeParam(parsedLocation.map(loc -> loc.brokerLocation), (ArrayNode)properties.get("bootstrap_servers"));
        String bootstrapServers = String.join((CharSequence)",", allBootstrapServers);
        Optional<String> payloadFormat = properties.has("format") ? Optional.of(properties.get("format").asText()) : Optional.empty();
        TimestampPolicyFactory timestampPolicyFactory = TimestampPolicyFactory.withProcessingTime();
        if (properties.has("watermark.type")) {
            String type;
            switch (type = properties.get("watermark.type").asText().toUpperCase()) {
                case "PROCESSINGTIME": {
                    timestampPolicyFactory = TimestampPolicyFactory.withProcessingTime();
                    break;
                }
                case "LOGAPPENDTIME": {
                    timestampPolicyFactory = TimestampPolicyFactory.withLogAppendTime();
                    break;
                }
                case "CREATETIME": {
                    Duration delay = Duration.ZERO;
                    if (properties.has("watermark.delay")) {
                        String delayStr = properties.get("watermark.delay").asText();
                        delay = PeriodFormat.getDefault().parsePeriod(delayStr).toStandardDuration();
                    }
                    timestampPolicyFactory = TimestampPolicyFactory.withCreateTime((Duration)delay);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unknown watermark type: " + type + ". Supported types are ProcessingTime, LogAppendTime, CreateTime.");
                }
            }
        }
        BeamKafkaTable kafkaTable = null;
        if (Schemas.isNestedSchema(schema)) {
            serializer = payloadFormat.map(format -> PayloadSerializers.getSerializer((String)format, (Schema)((Schema)Preconditions.checkArgumentNotNull((Object)schema.getField("payload").getType().getRowSchema())), TableUtils.convertNode2Map((JsonNode)properties)));
            kafkaTable = new NestedPayloadKafkaTable(schema, bootstrapServers, topics, (Optional<PayloadSerializer>)serializer, timestampPolicyFactory);
        } else if (payloadFormat.orElse("csv").equals("csv")) {
            kafkaTable = new BeamKafkaCSVTable(schema, bootstrapServers, topics, timestampPolicyFactory);
        } else {
            serializer = PayloadSerializers.getSerializer((String)((String)payloadFormat.get()), (Schema)schema, TableUtils.convertNode2Map((JsonNode)properties));
            kafkaTable = new PayloadSerializerKafkaTable(schema, bootstrapServers, topics, serializer, timestampPolicyFactory);
        }
        HashMap<String, Object> configUpdates = new HashMap<String, Object>();
        Iterator tableProperties = properties.fields();
        while (tableProperties.hasNext()) {
            Map.Entry field = (Map.Entry)tableProperties.next();
            if (!((String)field.getKey()).startsWith("properties.")) continue;
            configUpdates.put(((String)field.getKey()).replace("properties.", ""), ((JsonNode)field.getValue()).textValue());
        }
        if (!configUpdates.isEmpty()) {
            kafkaTable.updateConsumerProperties(configUpdates);
        }
        return kafkaTable;
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized String getTableType() {
        return "kafka";
    }

    private static class ParsedLocation {
        @UnknownKeyFor @NonNull @Initialized String brokerLocation = "";
        @UnknownKeyFor @NonNull @Initialized String topic = "";

        private ParsedLocation() {
        }
    }
}

