/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import com.alibaba.fastjson.JSONObject;
import com.google.auto.service.AutoService;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaCSVTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.NestedPayloadKafkaTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.PayloadSerializerKafkaTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.Schemas;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

@AutoService(value={TableProvider.class})
public class KafkaTableProvider
extends InMemoryMetaTableProvider {
    private static @UnknownKeyFor @NonNull @Initialized ParsedLocation parseLocation(@UnknownKeyFor @NonNull @Initialized String location) {
        ParsedLocation parsed = new ParsedLocation();
        List split = Splitter.on((char)'/').splitToList((CharSequence)location);
        Preconditions.checkArgument((split.size() >= 2 ? 1 : 0) != 0, (String)"Location string `%s` invalid: must be <broker bootstrap location>/<topic>.", (Object)location);
        parsed.topic = (String)Iterables.getLast((Iterable)split);
        parsed.brokerLocation = String.join((CharSequence)"/", split.subList(0, split.size() - 1));
        return parsed;
    }

    private static @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> mergeParam(@UnknownKeyFor @NonNull @Initialized Optional<@UnknownKeyFor @NonNull @Initialized String> initial, @Nullable @UnknownKeyFor @Initialized List<@UnknownKeyFor @NonNull @Initialized Object> toMerge) {
        ImmutableList.Builder merged = ImmutableList.builder();
        initial.ifPresent(arg_0 -> ((ImmutableList.Builder)merged).add(arg_0));
        if (toMerge != null) {
            toMerge.forEach(o -> merged.add((Object)o.toString()));
        }
        return merged.build();
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized BeamSqlTable buildBeamSqlTable(@UnknownKeyFor @NonNull @Initialized Table table) {
        Optional<String> payloadFormat;
        Schema schema = table.getSchema();
        JSONObject properties = table.getProperties();
        Optional<Object> parsedLocation = Optional.empty();
        if (!Strings.isNullOrEmpty((String)table.getLocation())) {
            parsedLocation = Optional.of(KafkaTableProvider.parseLocation((String)org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull((Object)table.getLocation())));
        }
        List<String> topics = KafkaTableProvider.mergeParam(parsedLocation.map(loc -> loc.topic), (List<Object>)properties.getJSONArray("topics"));
        List<String> allBootstrapServers = KafkaTableProvider.mergeParam(parsedLocation.map(loc -> loc.brokerLocation), (List<Object>)properties.getJSONArray("bootstrap_servers"));
        String bootstrapServers = String.join((CharSequence)",", allBootstrapServers);
        Optional<String> optional = payloadFormat = properties.containsKey((Object)"format") ? Optional.of(properties.getString("format")) : Optional.empty();
        if (Schemas.isNestedSchema(schema)) {
            Optional<PayloadSerializer> serializer = payloadFormat.map(format -> PayloadSerializers.getSerializer((String)format, (Schema)((Schema)org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull((Object)schema.getField("payload").getType().getRowSchema())), (Map)properties.getInnerMap()));
            return new NestedPayloadKafkaTable(schema, bootstrapServers, topics, serializer);
        }
        if (payloadFormat.orElse("csv").equals("csv")) {
            return new BeamKafkaCSVTable(schema, bootstrapServers, topics);
        }
        PayloadSerializer serializer = PayloadSerializers.getSerializer((String)((String)payloadFormat.get()), (Schema)schema, (Map)properties.getInnerMap());
        return new PayloadSerializerKafkaTable(schema, bootstrapServers, topics, serializer);
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized String getTableType() {
        return "kafka";
    }

    private static class ParsedLocation {
        @UnknownKeyFor @NonNull @Initialized String brokerLocation = "";
        @UnknownKeyFor @NonNull @Initialized String topic = "";

        private ParsedLocation() {
        }
    }
}

