/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.iceberg.publisher;

import com.google.common.io.Closer;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.stream.Collectors;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.iceberg.GobblinMCEProducer;
import org.apache.gobblin.iceberg.Utils.IcebergUtils;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metadata.SchemaSource;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.filters.HiddenFilter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.orc.OrcMetrics;
import org.apache.iceberg.shaded.org.apache.avro.Schema;
import org.apache.iceberg.types.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GobblinMCEPublisher
extends DataPublisher {
    private static final Logger log = LoggerFactory.getLogger(GobblinMCEPublisher.class);
    public static final String OFFSET_RANGE_KEY = "offset.range";
    public static final String MAP_DELIMITER_KEY = ":";
    public static final String NEW_FILES_LIST = "new.files.list";
    public static final String AVRO_SCHEMA_WITH_ICEBERG_ID = "avro.schema.with.iceberg.id";
    private final GobblinMCEProducer producer;
    private final Closer closer = Closer.create();
    private final Configuration conf;
    private static final PathFilter HIDDEN_FILES_FILTER = new HiddenFilter();

    public GobblinMCEPublisher(State state) throws IOException {
        this(state, GobblinMCEProducer.getGobblinMCEProducer(state));
    }

    public GobblinMCEPublisher(State state, GobblinMCEProducer producer) {
        super(state);
        this.producer = (GobblinMCEProducer)this.closer.register((Closeable)producer);
        this.conf = HadoopUtils.getConfFromState((State)state);
    }

    public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
        for (State state : states) {
            Map<Path, Metrics> newFiles = this.computeFileMetrics(state);
            Map<String, String> offsetRange = this.getPartitionOffsetRange(OFFSET_RANGE_KEY);
            if (newFiles.isEmpty()) {
                newFiles = this.computeDummyFile(state);
                if (newFiles.isEmpty()) continue;
                this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.change_property, SchemaSource.NONE);
                continue;
            }
            this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
        }
    }

    private Map<String, String> getPartitionOffsetRange(String offsetKey) {
        return this.state.getPropAsList(offsetKey).stream().collect(Collectors.toMap(s -> s.split(MAP_DELIMITER_KEY)[0], s -> s.split(MAP_DELIMITER_KEY)[1]));
    }

    private Map<Path, Metrics> computeFileMetrics(State state) throws IOException {
        HashMap<Path, Metrics> newFiles = new HashMap<Path, Metrics>();
        NameMapping mapping = this.getNameMapping();
        FileSystem fs = FileSystem.get((Configuration)this.conf);
        for (String pathString : state.getPropAsList(NEW_FILES_LIST, "")) {
            Path path = new Path(pathString);
            LinkedList<FileStatus> fileStatuses = new LinkedList<FileStatus>();
            fileStatuses.add(fs.getFileStatus(path));
            while (!fileStatuses.isEmpty()) {
                FileStatus fileStatus = (FileStatus)fileStatuses.pollFirst();
                if (fileStatus.isDirectory()) {
                    fileStatuses.addAll(Arrays.asList(fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_FILTER)));
                    continue;
                }
                Path filePath = fileStatus.getPath();
                Metrics metrics = GobblinMCEPublisher.getMetrics(state, filePath, this.conf, mapping);
                newFiles.put(filePath, metrics);
            }
        }
        return newFiles;
    }

    private Map<Path, Metrics> computeDummyFile(State state) throws IOException {
        HashMap<Path, Metrics> newFiles = new HashMap<Path, Metrics>();
        FileSystem fs = FileSystem.get((Configuration)this.conf);
        for (String pathString : state.getPropAsList("data.publisher.dataset.dir", "")) {
            Path path = new Path(pathString);
            PriorityQueue<FileStatus> fileStatuses = new PriorityQueue<FileStatus>((x, y) -> Long.compare(y.getModificationTime(), x.getModificationTime()));
            if (fs.exists(path)) {
                fileStatuses.add(fs.getFileStatus(path));
            }
            while (!fileStatuses.isEmpty()) {
                FileStatus fileStatus = (FileStatus)fileStatuses.poll();
                if (fileStatus.isDirectory()) {
                    fileStatuses.addAll(Arrays.asList(fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_FILTER)));
                    continue;
                }
                Path filePath = fileStatus.getPath();
                newFiles.put(filePath, null);
                return newFiles;
            }
        }
        return newFiles;
    }

    protected NameMapping getNameMapping() {
        String writerSchema = this.state.getProp("writer.latest.schema");
        if (writerSchema == null) {
            return null;
        }
        try {
            org.apache.iceberg.shaded.org.apache.avro.Schema avroSchema = new Schema.Parser().parse(writerSchema);
            Schema icebergSchema = AvroSchemaUtil.toIceberg((org.apache.iceberg.shaded.org.apache.avro.Schema)avroSchema);
            this.state.setProp(AVRO_SCHEMA_WITH_ICEBERG_ID, (Object)AvroSchemaUtil.convert((Type)icebergSchema.asStruct()).toString());
            return MappingUtil.create((Schema)icebergSchema);
        }
        catch (Exception e) {
            log.warn("Dataset {} contains schema that does not compatible with iceberg, will not emit file metrics for it", (Object)this.state.getProp("data.publisher.dataset.dir"));
            return null;
        }
    }

    public static Metrics getMetrics(State state, Path path, Configuration conf, NameMapping mapping) {
        switch (IcebergUtils.getIcebergFormat(state)) {
            case ORC: {
                if (mapping == null) {
                    return new Metrics(Long.valueOf(100000000L), null, null, null);
                }
                return OrcMetrics.fromInputFile((InputFile)HadoopInputFile.fromPath((Path)path, (Configuration)conf), (MetricsConfig)MetricsConfig.getDefault(), (NameMapping)mapping);
            }
            case AVRO: {
                try {
                    return new Metrics(Long.valueOf(100000000L), null, null, null);
                }
                catch (Exception e) {
                    throw new RuntimeException("Cannot get file information for file " + path.toString(), e);
                }
            }
        }
        throw new IllegalArgumentException("Unsupported data format for file " + path);
    }

    public void publishMetadata(Collection<? extends WorkUnitState> states) {
    }

    @Deprecated
    public void initialize() {
    }

    public void close() throws IOException {
        this.closer.close();
    }
}

