/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.iceberg.writer;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.hive.writer.MetadataWriter;
import org.apache.gobblin.iceberg.writer.IcebergMetadataWriter;
import org.apache.gobblin.metadata.DataFile;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.DataWriterBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GobblinMCEWriter
implements DataWriter<GenericRecord> {
    private static final Logger log = LoggerFactory.getLogger(GobblinMCEWriter.class);
    public static final String DEFAULT_HIVE_REGISTRATION_POLICY_KEY = "default.hive.registration.policy";
    public static final String FORCE_HIVE_DATABASE_NAME = "force.hive.database.name";
    public static final String ACCEPTED_CLUSTER_NAMES = "accepted.cluster.names";
    public static final String METADATA_REGISTRATION_THREADS = "metadata.registration.threads";
    public static final String METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS = "metadata.parallel.runner.timeout.mills";
    public static final String HIVE_PARTITION_NAME = "hive.partition.name";
    public static final String GMCE_METADATA_WRITER_CLASSES = "gmce.metadata.writer.classes";
    public static final int DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS = 60000;
    public static final String TABLE_NAME_DELIMITER = ".";
    List<MetadataWriter> metadataWriters;
    Map<String, OperationType> tableOperationTypeMap;
    Map<String, OperationType> datasetOperationTypeMap;
    Set<String> acceptedClusters;
    protected State state;
    private final ParallelRunner parallelRunner;
    private int parallelRunnerTimeoutMills;
    private Map<String, Cache<String, Collection<HiveSpec>>> oldSpecsMaps;
    private Map<String, Cache<String, Collection<HiveSpec>>> newSpecsMaps;
    private Closer closer = Closer.create();
    protected final AtomicLong recordCount = new AtomicLong(0L);

    GobblinMCEWriter(DataWriterBuilder<Schema, GenericRecord> builder, State properties) throws IOException {
        this.newSpecsMaps = new HashMap<String, Cache<String, Collection<HiveSpec>>>();
        this.oldSpecsMaps = new HashMap<String, Cache<String, Collection<HiveSpec>>>();
        this.metadataWriters = new ArrayList<MetadataWriter>();
        this.acceptedClusters = properties.getPropAsSet(ACCEPTED_CLUSTER_NAMES, ClustersNames.getInstance().getClusterName());
        this.state = properties;
        for (String className : this.state.getPropAsList(GMCE_METADATA_WRITER_CLASSES, IcebergMetadataWriter.class.getName())) {
            this.metadataWriters.add((MetadataWriter)this.closer.register((Closeable)GobblinConstructorUtils.invokeConstructor(MetadataWriter.class, (String)className, (Object[])new Object[]{this.state})));
        }
        this.tableOperationTypeMap = new HashMap<String, OperationType>();
        this.datasetOperationTypeMap = new HashMap<String, OperationType>();
        this.parallelRunner = (ParallelRunner)this.closer.register((Closeable)new ParallelRunner(this.state.getPropAsInt(METADATA_REGISTRATION_THREADS, 20), FileSystem.get((Configuration)HadoopUtils.getConfFromState((State)properties))));
        this.parallelRunnerTimeoutMills = this.state.getPropAsInt(METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS, 60000);
    }

    public void write(GenericRecord record) throws IOException {
    }

    private void computeSpecMap(List<String> files, final ConcurrentHashMap<String, Collection<HiveSpec>> specsMap, final Cache<String, Collection<HiveSpec>> cache, State registerState, final boolean isPrefix) throws IOException {
        final HiveRegistrationPolicy policy = HiveRegistrationPolicyBase.getPolicy((State)registerState);
        for (final String file : files) {
            this.parallelRunner.submitCallable((Callable)new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    try {
                        Path regPath = isPrefix ? new Path(file) : new Path(file).getParent();
                        Path rawPath = new Path(regPath.toUri().getRawPath());
                        specsMap.put(regPath.toString(), cache.get((Object)regPath.toString(), () -> policy.getHiveSpecs(rawPath)));
                    }
                    catch (Exception e) {
                        log.warn("Cannot get Hive Spec for {} using policy {}", (Object)file, (Object)policy.toString());
                    }
                    return null;
                }
            }, file);
        }
        this.parallelRunner.waitForTasks((long)this.parallelRunnerTimeoutMills);
    }

    public void commit() throws IOException {
        this.flush();
    }

    public void cleanup() throws IOException {
    }

    public long recordsWritten() {
        return this.recordCount.get();
    }

    public long bytesWritten() throws IOException {
        return 0L;
    }

    public Descriptor getDataDescriptor() {
        return null;
    }

    public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope) throws IOException {
        State registerState;
        GenericRecord genericRecord = (GenericRecord)recordEnvelope.getRecord();
        if (!this.acceptedClusters.contains(genericRecord.get("cluster"))) {
            return;
        }
        GobblinMetadataChangeEvent gmce = (GobblinMetadataChangeEvent)SpecificData.get().deepCopy(genericRecord.getSchema(), (Object)genericRecord);
        String datasetName = gmce.getDatasetIdentifier().toString();
        if (!this.datasetOperationTypeMap.containsKey(datasetName)) {
            this.oldSpecsMaps.remove(datasetName);
        }
        if (this.datasetOperationTypeMap.containsKey(datasetName) && this.datasetOperationTypeMap.get(datasetName) != gmce.getOperationType()) {
            this.datasetOperationTypeMap.put(datasetName, gmce.getOperationType());
        }
        ConcurrentHashMap<String, Collection<HiveSpec>> newSpecsMap = new ConcurrentHashMap<String, Collection<HiveSpec>>();
        ConcurrentHashMap<String, Collection<HiveSpec>> oldSpecsMap = new ConcurrentHashMap<String, Collection<HiveSpec>>();
        if (gmce.getNewFiles() != null) {
            registerState = GobblinMCEWriter.setHiveRegProperties(this.state, gmce, true);
            this.computeSpecMap(Lists.newArrayList((Iterable)Iterables.transform((Iterable)gmce.getNewFiles(), DataFile::getFilePath)), newSpecsMap, (Cache<String, Collection<HiveSpec>>)this.newSpecsMaps.computeIfAbsent(datasetName, t -> CacheBuilder.newBuilder().expireAfterAccess((long)this.state.getPropAsInt("GMCEWriter.cache.expiring.time.hours", 1), TimeUnit.HOURS).build()), registerState, false);
        }
        if (gmce.getOldFilePrefixes() != null) {
            registerState = GobblinMCEWriter.setHiveRegProperties(this.state, gmce, false);
            this.computeSpecMap(gmce.getOldFilePrefixes(), oldSpecsMap, (Cache<String, Collection<HiveSpec>>)this.oldSpecsMaps.computeIfAbsent(datasetName, t -> CacheBuilder.newBuilder().expireAfterAccess((long)this.state.getPropAsInt("GMCEWriter.cache.expiring.time.hours", 1), TimeUnit.HOURS).build()), registerState, true);
        } else if (gmce.getOldFiles() != null) {
            registerState = GobblinMCEWriter.setHiveRegProperties(this.state, gmce, false);
            this.computeSpecMap(gmce.getOldFiles(), oldSpecsMap, (Cache<String, Collection<HiveSpec>>)this.oldSpecsMaps.computeIfAbsent(datasetName, t -> CacheBuilder.newBuilder().expireAfterAccess((long)this.state.getPropAsInt("GMCEWriter.cache.expiring.time.hours", 1), TimeUnit.HOURS).build()), registerState, false);
        }
        if (newSpecsMap.isEmpty() && oldSpecsMap.isEmpty()) {
            return;
        }
        Collection specs = newSpecsMap.isEmpty() ? (Collection)oldSpecsMap.values().iterator().next() : (Collection)newSpecsMap.values().iterator().next();
        for (HiveSpec spec : specs) {
            String dbName = spec.getTable().getDbName();
            String tableName = spec.getTable().getTableName();
            String tableString = Joiner.on((String)TABLE_NAME_DELIMITER).join((Object)dbName, (Object)tableName, new Object[0]);
            if (this.tableOperationTypeMap.containsKey(tableString) && this.tableOperationTypeMap.get(tableString) != gmce.getOperationType()) {
                for (MetadataWriter writer : this.metadataWriters) {
                    writer.flush(dbName, tableName);
                }
            }
            this.tableOperationTypeMap.put(tableString, gmce.getOperationType());
            for (MetadataWriter writer : this.metadataWriters) {
                writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
            }
        }
        this.recordCount.incrementAndGet();
    }

    public void flush() throws IOException {
        log.info(String.format("start to flushing %s records", String.valueOf(this.recordCount.get())));
        for (String tableString : this.tableOperationTypeMap.keySet()) {
            List tid = Splitter.on((String)TABLE_NAME_DELIMITER).splitToList((CharSequence)tableString);
            for (MetadataWriter writer : this.metadataWriters) {
                writer.flush((String)tid.get(0), (String)tid.get(1));
            }
        }
        this.tableOperationTypeMap.clear();
        this.recordCount.lazySet(0L);
    }

    public void close() throws IOException {
        this.flush();
        this.closer.close();
    }

    public static State setHiveRegProperties(State state, GobblinMetadataChangeEvent gmce, boolean forNew) {
        Preconditions.checkArgument((boolean)state.contains(DEFAULT_HIVE_REGISTRATION_POLICY_KEY), (Object)String.format("Missing required configuration %s", DEFAULT_HIVE_REGISTRATION_POLICY_KEY));
        String defaultPolicy = state.getProp(DEFAULT_HIVE_REGISTRATION_POLICY_KEY);
        State tmpState = new State(state);
        String policyClass = forNew ? (gmce.getRegistrationPolicy() != null ? gmce.getRegistrationPolicy() : defaultPolicy) : (gmce.getRegistrationPolicyForOldData() != null ? gmce.getRegistrationPolicyForOldData() : defaultPolicy);
        tmpState.setProp("hive.registration.policy", (Object)policyClass);
        if (!forNew) {
            tmpState.setProp("mapreduce.job.input.path.empty", (Object)true);
        }
        if (gmce.getPartitionColumns() != null && !gmce.getPartitionColumns().isEmpty()) {
            tmpState.setProp(HIVE_PARTITION_NAME, (Object)String.join((CharSequence)",", gmce.getPartitionColumns()));
        }
        if (gmce.getRegistrationProperties() != null) {
            for (Map.Entry entry : gmce.getRegistrationProperties().entrySet()) {
                tmpState.setProp((String)entry.getKey(), entry.getValue());
            }
        }
        if (state.contains(FORCE_HIVE_DATABASE_NAME)) {
            tmpState.setProp("hive.database.name", (Object)state.getProp(FORCE_HIVE_DATABASE_NAME));
        }
        if (gmce.getTableSchema() != null) {
            tmpState.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), (Object)gmce.getTableSchema());
        }
        return tmpState;
    }

    public List<MetadataWriter> getMetadataWriters() {
        return this.metadataWriters;
    }
}

