/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.iceberg.writer;

import com.codahale.metrics.Timer;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.LambdaMetafactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
import org.apache.gobblin.hive.AutoCloseableHiveLock;
import org.apache.gobblin.hive.HiveLock;
import org.apache.gobblin.hive.HivePartition;
import org.apache.gobblin.hive.HiveTable;
import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.hive.writer.MetadataWriter;
import org.apache.gobblin.iceberg.Utils.IcebergUtils;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metrics.GobblinMetricsRegistry;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.WriterUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.FindFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.False;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.HiveCatalogs;
import org.apache.iceberg.shaded.org.apache.avro.Schema;
import org.joda.time.DateTime;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergMetadataWriter
implements MetadataWriter {
    private static final Logger log = LoggerFactory.getLogger(IcebergMetadataWriter.class);
    public static final String USE_DATA_PATH_AS_TABLE_LOCATION = "use.data.path.as.table.location";
    public static final String TABLE_LOCATION_SUFFIX = "/_iceberg_metadata/%s";
    public static final String GMCE_HIGH_WATERMARK_KEY = "gmce.high.watermark.%s";
    public static final String GMCE_LOW_WATERMARK_KEY = "gmce.low.watermark.%s";
    private static final String EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "gobblin.iceberg.dataset.expire.snapshots.lookBackTime";
    private static final String DEFAULT_EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "3d";
    private static final String ICEBERG_REGISTRATION_BLACKLIST = "iceberg.registration.blacklist";
    private static final String ICEBERG_REGISTRATION_WHITELIST = "iceberg.registration.whitelist";
    private static final String ICEBERG_METADATA_FILE_PERMISSION = "iceberg.metadata.file.permission";
    private static final String CLUSTER_IDENTIFIER_KEY_NAME = "clusterIdentifier";
    private static final String CREATE_TABLE_TIME = "iceberg.create.table.time";
    private static final String SCHEMA_CREATION_TIME_KEY = "schema.creation.time";
    private static final String ADDED_FILES_CACHE_EXPIRING_TIME = "added.files.cache.expiring.time";
    private static final int DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME = 1;
    private static final String OFFSET_RANGE_KEY_PREFIX = "offset.range.";
    private static final String OFFSET_RANGE_KEY_FORMAT = "offset.range.%s";
    private static final String DEFAULT_CREATION_TIME = "0";
    private static final String SNAPSHOT_EXPIRE_THREADS = "snapshot.expire.threads";
    private static final long DEFAULT_WATERMARK = -1L;
    private static final String ICEBERG_FILE_PATH_COLUMN = DataFile.FILE_PATH.name();
    protected final MetricContext metricContext;
    protected EventSubmitter eventSubmitter;
    private final WhitelistBlacklist whitelistBlacklist;
    private final Closer closer = Closer.create();
    private final Map<TableIdentifier, Long> tableCurrentWatermarkMap;
    private final Map<TableIdentifier, String> tableTopicPartitionMap;
    private final KafkaSchemaRegistry schemaRegistry;
    private final Map<TableIdentifier, TableMetadata> tableMetadataMap;
    protected HiveCatalog catalog;
    protected final Configuration conf;
    protected final ReadWriteLock readWriteLock;
    private final HiveLock locks;
    private final ParallelRunner parallelRunner;
    private final boolean useDataLocationAsTableLocation;
    private FsPermission permission;

    public IcebergMetadataWriter(State state) throws IOException {
        this.schemaRegistry = KafkaSchemaRegistry.get((Properties)state.getProperties());
        this.conf = HadoopUtils.getConfFromState((State)state);
        this.initializeCatalog();
        this.tableTopicPartitionMap = new HashMap<TableIdentifier, String>();
        this.tableMetadataMap = new HashMap<TableIdentifier, TableMetadata>();
        this.tableCurrentWatermarkMap = new HashMap<TableIdentifier, Long>();
        ArrayList tags = Lists.newArrayList();
        String clusterIdentifier = ClustersNames.getInstance().getClusterName();
        tags.add(new Tag(CLUSTER_IDENTIFIER_KEY_NAME, (Object)clusterIdentifier));
        this.metricContext = (MetricContext)this.closer.register((Closeable)GobblinMetricsRegistry.getInstance().getMetricContext(state, IcebergMetadataWriter.class, (List)tags));
        this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, "IcebergWriter").build();
        this.whitelistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_REGISTRATION_WHITELIST, ""), state.getProp(ICEBERG_REGISTRATION_BLACKLIST, ""));
        this.readWriteLock = new ReentrantReadWriteLock();
        this.locks = new HiveLock(state.getProperties());
        this.parallelRunner = (ParallelRunner)this.closer.register((Closeable)new ParallelRunner(state.getPropAsInt(SNAPSHOT_EXPIRE_THREADS, 20), FileSystem.get((Configuration)HadoopUtils.getConfFromState((State)state))));
        this.useDataLocationAsTableLocation = state.getPropAsBoolean(USE_DATA_PATH_AS_TABLE_LOCATION, false);
        if (this.useDataLocationAsTableLocation) {
            this.permission = HadoopUtils.deserializeFsPermission((State)state, (String)ICEBERG_METADATA_FILE_PERMISSION, (FsPermission)FsPermission.getDefault());
        }
    }

    protected void initializeCatalog() {
        this.catalog = HiveCatalogs.loadCatalog((Configuration)this.conf);
    }

    private org.apache.iceberg.Table getIcebergTable(TableIdentifier tid) throws NoSuchTableException {
        TableMetadata tableMetadata = this.tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
        if (!tableMetadata.table.isPresent()) {
            tableMetadata.table = Optional.of((Object)this.catalog.loadTable(tid));
        }
        return (org.apache.iceberg.Table)tableMetadata.table.get();
    }

    private Long getAndPersistCurrentWatermark(TableIdentifier tid, String topicPartition) {
        org.apache.iceberg.Table icebergTable;
        if (this.tableCurrentWatermarkMap.containsKey(tid)) {
            return this.tableCurrentWatermarkMap.get(tid);
        }
        Long currentWatermark = -1L;
        try {
            icebergTable = this.getIcebergTable(tid);
        }
        catch (NoSuchTableException e) {
            return currentWatermark;
        }
        currentWatermark = icebergTable.properties().containsKey(String.format(GMCE_HIGH_WATERMARK_KEY, topicPartition)) ? Long.parseLong((String)icebergTable.properties().get(String.format(GMCE_HIGH_WATERMARK_KEY, topicPartition))) : -1L;
        if (currentWatermark != -1L) {
            this.tableMetadataMap.computeIfAbsent((TableIdentifier)tid, (Function<TableIdentifier, TableMetadata>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$getAndPersistCurrentWatermark$1(org.apache.iceberg.catalog.TableIdentifier ), (Lorg/apache/iceberg/catalog/TableIdentifier;)Lorg/apache/gobblin/iceberg/writer/IcebergMetadataWriter$TableMetadata;)((IcebergMetadataWriter)this)).lowWatermark = Optional.of((Object)currentWatermark);
        }
        return currentWatermark;
    }

    public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap, Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) throws IOException {
        org.apache.iceberg.Table table;
        TableIdentifier tid = TableIdentifier.of((String[])new String[]{tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName()});
        TableMetadata tableMetadata = this.tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
        try {
            table = this.getIcebergTable(tid);
        }
        catch (NoSuchTableException e) {
            try {
                if (gmce.getOperationType() == OperationType.drop_files || gmce.getOperationType() == OperationType.change_property) {
                    log.warn("Table {} does not exist, skip processing this {} event", (Object)tid.toString(), (Object)gmce.getOperationType());
                    return;
                }
                table = this.createTable(gmce, tableSpec);
                tableMetadata.table = Optional.of((Object)table);
            }
            catch (Exception e1) {
                log.error("skip processing {} for table {}.{} due to error when creating table", new Object[]{gmce.toString(), tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName()});
                log.debug(e1.toString());
                return;
            }
        }
        this.computeCandidateSchema(gmce, tid, tableSpec);
        tableMetadata.ensureTxnInit();
        tableMetadata.lowestGMCEEmittedTime = Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime());
        switch (gmce.getOperationType()) {
            case add_files: {
                this.updateTableProperty(tableSpec, tid);
                this.addFiles(gmce, newSpecsMap, table, tableMetadata);
                if (gmce.getTopicPartitionOffsetsRange() == null) break;
                this.mergeOffsets(gmce, tid);
                break;
            }
            case rewrite_files: {
                this.updateTableProperty(tableSpec, tid);
                this.rewriteFiles(gmce, newSpecsMap, oldSpecsMap, table, tableMetadata);
                break;
            }
            case drop_files: {
                this.dropFiles(gmce, oldSpecsMap, table, tableMetadata, tid);
                break;
            }
            case change_property: {
                this.updateTableProperty(tableSpec, tid);
                if (gmce.getTopicPartitionOffsetsRange() != null) {
                    this.mergeOffsets(gmce, tid);
                }
                log.info("No file operation need to be performed by Iceberg Metadata Writer at this point.");
                break;
            }
            default: {
                log.error("unsupported operation {}", (Object)gmce.getOperationType().toString());
                return;
            }
        }
    }

    private HashMap<String, List<Range>> getLastOffset(TableMetadata tableMetadata) {
        HashMap<String, List<Range>> offsets = new HashMap<String, List<Range>>();
        if (tableMetadata.lastProperties.isPresent()) {
            for (Map.Entry entry : ((Map)tableMetadata.lastProperties.get()).entrySet()) {
                if (!((String)entry.getKey()).startsWith(OFFSET_RANGE_KEY_PREFIX)) continue;
                List ranges = Arrays.asList(((String)entry.getValue()).split(",")).stream().map(s -> {
                    List rangePair = Splitter.on((String)"-").splitToList((CharSequence)s);
                    return Range.openClosed((Comparable)Long.valueOf(Long.parseLong((String)rangePair.get(0))), (Comparable)Long.valueOf(Long.parseLong((String)rangePair.get(1))));
                }).collect(Collectors.toList());
                offsets.put(((String)entry.getKey()).substring(OFFSET_RANGE_KEY_PREFIX.length()), ranges);
            }
        }
        return offsets;
    }

    private void mergeOffsets(GobblinMetadataChangeEvent gmce, TableIdentifier tid) {
        TableMetadata tableMetadata = this.tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
        tableMetadata.dataOffsetRange = Optional.of((Object)tableMetadata.dataOffsetRange.or(() -> this.getLastOffset(tableMetadata)));
        Map offsets = (Map)tableMetadata.dataOffsetRange.get();
        for (Map.Entry entry : gmce.getTopicPartitionOffsetsRange().entrySet()) {
            List rangePair = Splitter.on((String)"-").splitToList((CharSequence)entry.getValue());
            Range range = Range.openClosed((Comparable)Long.valueOf(Long.parseLong((String)rangePair.get(0))), (Comparable)Long.valueOf(Long.parseLong((String)rangePair.get(1))));
            if (range.lowerEndpoint().equals(range.upperEndpoint())) continue;
            List existRanges = offsets.getOrDefault(entry.getKey(), new ArrayList());
            ArrayList<Range> newRanges = new ArrayList<Range>();
            for (Range r : existRanges) {
                if (range.isConnected(r)) {
                    range = range.span(r);
                    continue;
                }
                newRanges.add(r);
            }
            newRanges.add(range);
            Collections.sort(newRanges, new Comparator<Range>(){

                @Override
                public int compare(Range o1, Range o2) {
                    return o1.lowerEndpoint().compareTo(o2.lowerEndpoint());
                }
            });
            offsets.put(entry.getKey(), newRanges);
        }
    }

    private void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid) {
        Table table = HiveMetaStoreUtils.getTable((HiveTable)tableSpec.getTable());
        this.tableMetadataMap.computeIfAbsent((TableIdentifier)tid, (Function<TableIdentifier, TableMetadata>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$updateTableProperty$6(org.apache.iceberg.catalog.TableIdentifier ), (Lorg/apache/iceberg/catalog/TableIdentifier;)Lorg/apache/gobblin/iceberg/writer/IcebergMetadataWriter$TableMetadata;)((IcebergMetadataWriter)this)).newProperties = Optional.of(IcebergUtils.getTableProperties(table));
    }

    private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdentifier tid, HiveSpec spec) {
        org.apache.iceberg.Table table = this.getIcebergTable(tid);
        TableMetadata tableMetadata = this.tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
        Table hiveTable = HiveMetaStoreUtils.getTable((HiveTable)spec.getTable());
        tableMetadata.lastProperties = Optional.of((Object)tableMetadata.lastProperties.or(() -> table.properties()));
        Map props = (Map)tableMetadata.lastProperties.get();
        tableMetadata.lastSchemaVersion = Optional.of((Object)tableMetadata.lastSchemaVersion.or(() -> props.getOrDefault(SCHEMA_CREATION_TIME_KEY, DEFAULT_CREATION_TIME)));
        String lastSchemaVersion = (String)tableMetadata.lastSchemaVersion.get();
        tableMetadata.candidateSchemas = Optional.of((Object)tableMetadata.candidateSchemas.or(() -> CacheBuilder.newBuilder().expireAfterAccess((long)this.conf.getInt("GMCEWriter.cache.expiring.time.hours", 1), TimeUnit.HOURS).build()));
        Cache candidate = (Cache)tableMetadata.candidateSchemas.get();
        try {
            switch (gmce.getSchemaSource()) {
                case SCHEMAREGISTRY: {
                    org.apache.avro.Schema schema = new Schema.Parser().parse(gmce.getTableSchema());
                    String createdOn = AvroUtils.getSchemaCreationTime((org.apache.avro.Schema)schema);
                    if (createdOn == null) {
                        candidate.put((Object)DEFAULT_CREATION_TIME, (Object)IcebergUtils.getIcebergSchema((String)gmce.getTableSchema(), (Table)hiveTable).tableSchema);
                        break;
                    }
                    if (!createdOn.equals(lastSchemaVersion)) {
                        candidate.put((Object)createdOn, (Object)IcebergUtils.getIcebergSchema((String)gmce.getTableSchema(), (Table)hiveTable).tableSchema);
                    }
                    break;
                }
                case EVENT: {
                    candidate.put((Object)DEFAULT_CREATION_TIME, (Object)IcebergUtils.getIcebergSchema((String)gmce.getTableSchema(), (Table)hiveTable).tableSchema);
                    break;
                }
                case NONE: {
                    log.debug("Schema source set to be none, will ignore the schema");
                    break;
                }
                default: {
                    throw new IOException(String.format("unsupported schema source %s", gmce.getSchemaSource()));
                }
            }
        }
        catch (Exception e) {
            log.error("Cannot get candidate schema from event due to", (Throwable)e);
        }
    }

    protected org.apache.iceberg.Table createTable(GobblinMetadataChangeEvent gmce, HiveSpec spec) throws IOException {
        String schema = gmce.getTableSchema();
        Table table = HiveMetaStoreUtils.getTable((HiveTable)spec.getTable());
        IcebergUtils.IcebergDataAndPartitionSchema schemas = IcebergUtils.getIcebergSchema(schema, table);
        TableIdentifier tid = TableIdentifier.of((String[])new String[]{table.getDbName(), table.getTableName()});
        org.apache.iceberg.Schema tableSchema = schemas.tableSchema;
        Preconditions.checkState((tableSchema != null ? 1 : 0) != 0, (Object)"Table schema cannot be null when creating a table");
        PartitionSpec partitionSpec = IcebergUtils.getPartitionSpec(tableSchema, schemas.partitionSchema);
        org.apache.iceberg.Table icebergTable = null;
        String tableLocation = null;
        if (this.useDataLocationAsTableLocation) {
            tableLocation = gmce.getDatasetIdentifier().getNativeName() + String.format(TABLE_LOCATION_SUFFIX, table.getDbName());
            Path tablePath = new Path(tableLocation);
            WriterUtils.mkdirsWithRecursivePermission((FileSystem)tablePath.getFileSystem(this.conf), (Path)tablePath, (FsPermission)this.permission);
        }
        try (Timer.Context context = this.metricContext.timer(CREATE_TABLE_TIME).time();){
            icebergTable = this.catalog.createTable(tid, tableSchema, partitionSpec, tableLocation, IcebergUtils.getTableProperties(table));
            log.info("Created table {}, schema: {} partition spec: {}", new Object[]{tid, tableSchema, partitionSpec});
        }
        catch (AlreadyExistsException e) {
            log.warn("table {} already exist, there may be some other process try to create table concurrently", (Object)tid);
        }
        return icebergTable;
    }

    protected void rewriteFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap, Map<String, Collection<HiveSpec>> oldSpecsMap, org.apache.iceberg.Table table, TableMetadata tableMetadata) throws IOException {
        PartitionSpec partitionSpec = table.spec();
        tableMetadata.ensureTxnInit();
        HashSet newDataFiles = new HashSet();
        this.getIcebergDataFilesToBeAddedHelper(gmce, table, newSpecsMap, tableMetadata).forEach(dataFile -> {
            newDataFiles.add(dataFile);
            tableMetadata.addedFiles.put((Object)dataFile.path(), (Object)"");
        });
        Set<DataFile> oldDataFiles = this.getIcebergDataFilesToBeDeleted(gmce, table, newSpecsMap, oldSpecsMap, partitionSpec);
        if (oldDataFiles.isEmpty() && !newDataFiles.isEmpty()) {
            DataFile dataFile2 = (DataFile)newDataFiles.iterator().next();
            UnboundPredicate exp = Expressions.startsWith((String)ICEBERG_FILE_PATH_COLUMN, (String)dataFile2.path().toString());
            if (FindFiles.in((org.apache.iceberg.Table)table).withMetadataMatching((Expression)exp).collect().iterator().hasNext()) {
                return;
            }
            AppendFiles appendFiles = tableMetadata.getOrInitAppendFiles();
            newDataFiles.forEach(arg_0 -> ((AppendFiles)appendFiles).appendFile(arg_0));
            return;
        }
        ((Transaction)tableMetadata.transaction.get()).newRewrite().rewriteFiles(oldDataFiles, newDataFiles).commit();
    }

    private org.apache.iceberg.Schema getSchemaWithOriginId(GobblinMetadataChangeEvent gmce) {
        org.apache.iceberg.Schema schemaWithOriginId = null;
        if (gmce.getAvroSchemaWithIcebergSchemaID() != null) {
            Schema avroSchema = new Schema.Parser().parse(gmce.getAvroSchemaWithIcebergSchemaID());
            schemaWithOriginId = AvroSchemaUtil.toIceberg((Schema)avroSchema);
        }
        return schemaWithOriginId;
    }

    protected void dropFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> oldSpecsMap, final org.apache.iceberg.Table table, TableMetadata tableMetadata, final TableIdentifier tid) throws IOException {
        PartitionSpec partitionSpec = table.spec();
        DeleteFiles deleteFiles = tableMetadata.getOrInitDeleteFiles();
        Set<DataFile> oldDataFiles = this.getIcebergDataFilesToBeDeleted(gmce, table, new HashMap<String, Collection<HiveSpec>>(), oldSpecsMap, partitionSpec);
        oldDataFiles.forEach(arg_0 -> ((DeleteFiles)deleteFiles).deleteFile(arg_0));
        this.parallelRunner.submitCallable((Callable)new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                try {
                    long olderThan = IcebergMetadataWriter.this.getExpireSnapshotTime();
                    long start = System.currentTimeMillis();
                    ExpireSnapshots expireSnapshots = table.expireSnapshots();
                    final org.apache.iceberg.Table tmpTable = table;
                    expireSnapshots.deleteWith((Consumer)new Consumer<String>(){

                        @Override
                        public void accept(String file) {
                            if (file.startsWith(tmpTable.location())) {
                                tmpTable.io().deleteFile(file);
                            }
                        }
                    }).expireOlderThan(olderThan).commit();
                    log.info("Spent {} ms to expire snapshots older than {} ({}) in table {}", new Object[]{System.currentTimeMillis() - start, new DateTime(olderThan).toString(), olderThan, tid.toString()});
                }
                catch (Exception e) {
                    log.error(String.format("Fail to expire snapshots for table %s due to exception ", tid.toString()), (Throwable)e);
                }
                return null;
            }
        }, tid.toString());
    }

    private long getExpireSnapshotTime() {
        PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendYears().appendSuffix("y").appendMonths().appendSuffix("M").appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").toFormatter();
        return DateTime.now().minus((ReadablePeriod)periodFormatter.parsePeriod(this.conf.get(EXPIRE_SNAPSHOTS_LOOKBACK_TIME, DEFAULT_EXPIRE_SNAPSHOTS_LOOKBACK_TIME))).getMillis();
    }

    protected void addFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap, org.apache.iceberg.Table table, TableMetadata tableMetadata) {
        AppendFiles appendFiles = tableMetadata.getOrInitAppendFiles();
        this.getIcebergDataFilesToBeAddedHelper(gmce, table, newSpecsMap, tableMetadata).forEach(dataFile -> {
            appendFiles.appendFile(dataFile);
            tableMetadata.addedFiles.put((Object)dataFile.path(), (Object)"");
        });
    }

    private Stream<DataFile> getIcebergDataFilesToBeAddedHelper(GobblinMetadataChangeEvent gmce, org.apache.iceberg.Table table, Map<String, Collection<HiveSpec>> newSpecsMap, TableMetadata tableMetadata) {
        return this.getIcebergDataFilesToBeAdded(gmce.getNewFiles(), table.spec(), newSpecsMap, IcebergUtils.getSchemaIdMap(this.getSchemaWithOriginId(gmce), table.schema())).stream().filter(dataFile -> tableMetadata.addedFiles.getIfPresent((Object)dataFile.path()) == null);
    }

    private Set<DataFile> getIcebergDataFilesToBeDeleted(GobblinMetadataChangeEvent gmce, org.apache.iceberg.Table table, Map<String, Collection<HiveSpec>> newSpecsMap, Map<String, Collection<HiveSpec>> oldSpecsMap, PartitionSpec partitionSpec) throws IOException {
        HashSet<DataFile> oldDataFiles = new HashSet<DataFile>();
        if (gmce.getOldFilePrefixes() != null) {
            False exp = Expressions.alwaysFalse();
            for (String prefix : gmce.getOldFilePrefixes()) {
                exp = Expressions.or((Expression)exp, (Expression)Expressions.startsWith((String)ICEBERG_FILE_PATH_COLUMN, (String)prefix));
                String rawPathPrefix = new Path(prefix).toUri().getRawPath();
                exp = Expressions.or((Expression)exp, (Expression)Expressions.startsWith((String)ICEBERG_FILE_PATH_COLUMN, (String)rawPathPrefix));
            }
            long start = System.currentTimeMillis();
            oldDataFiles.addAll(Sets.newHashSet((Iterator)FindFiles.in((org.apache.iceberg.Table)table).withMetadataMatching((Expression)exp).collect().iterator()));
            log.info("Spent {}ms to query all old files in iceberg.", (Object)(System.currentTimeMillis() - start));
        } else {
            for (String file : gmce.getOldFiles()) {
                String specPath = new Path(file).getParent().toString();
                StructLike partitionVal = this.getIcebergPartitionVal(oldSpecsMap.containsKey(specPath) ? oldSpecsMap.get(specPath) : newSpecsMap.get(specPath), file, partitionSpec);
                oldDataFiles.add(IcebergUtils.getIcebergDataFileWithoutMetric(file, partitionSpec, partitionVal));
            }
        }
        return oldDataFiles;
    }

    private Set<DataFile> getIcebergDataFilesToBeAdded(List<org.apache.gobblin.metadata.DataFile> files, PartitionSpec partitionSpec, Map<String, Collection<HiveSpec>> newSpecsMap, Map<Integer, Integer> schemaIdMap) {
        HashSet<DataFile> dataFiles = new HashSet<DataFile>();
        for (org.apache.gobblin.metadata.DataFile file : files) {
            try {
                StructLike partition = this.getIcebergPartitionVal(newSpecsMap.get(new Path(file.getFilePath()).getParent().toString()), file.getFilePath(), partitionSpec);
                dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, partitionSpec, partition, this.conf, schemaIdMap));
            }
            catch (Exception e) {
                log.warn("Cannot get DataFile for {} dur to {}", (Object)file.getFilePath(), (Object)e);
            }
        }
        return dataFiles;
    }

    private StructLike getIcebergPartitionVal(Collection<HiveSpec> specs, String filePath, PartitionSpec partitionSpec) throws IOException {
        if (specs == null || specs.isEmpty()) {
            throw new IOException("Cannot get hive spec for " + filePath);
        }
        HivePartition hivePartition = (HivePartition)specs.iterator().next().getPartition().orNull();
        StructLike partitionVal = hivePartition == null ? null : IcebergUtils.getPartition(partitionSpec.partitionType(), hivePartition.getValues());
        return partitionVal;
    }

    public void flush(String dbName, String tableName) throws IOException {
        block23: {
            Lock writeLock = this.readWriteLock.writeLock();
            writeLock.lock();
            try {
                TableIdentifier tid = TableIdentifier.of((String[])new String[]{dbName, tableName});
                TableMetadata tableMetadata = this.tableMetadataMap.getOrDefault(tid, new TableMetadata());
                if (tableMetadata.transaction.isPresent()) {
                    Transaction transaction = (Transaction)tableMetadata.transaction.get();
                    if (tableMetadata.appendFiles.isPresent()) {
                        ((AppendFiles)tableMetadata.appendFiles.get()).commit();
                    }
                    if (tableMetadata.deleteFiles.isPresent()) {
                        ((DeleteFiles)tableMetadata.deleteFiles.get()).commit();
                    }
                    Map props = (Map)tableMetadata.newProperties.or((Object)Maps.newHashMap((Map)((Map)tableMetadata.lastProperties.or((Object)this.getIcebergTable(tid).properties()))));
                    Long highWatermark = this.tableCurrentWatermarkMap.get(tid);
                    props.put(String.format(GMCE_HIGH_WATERMARK_KEY, this.tableTopicPartitionMap.get(tid)), highWatermark.toString());
                    props.put(String.format(GMCE_LOW_WATERMARK_KEY, this.tableTopicPartitionMap.get(tid)), ((Long)tableMetadata.lowWatermark.get()).toString());
                    props.put("write.metadata.delete-after-commit.enabled", Boolean.toString(this.conf.getBoolean("write.metadata.delete-after-commit.enabled", false)));
                    props.put("write.metadata.previous-versions-max", Integer.toString(this.conf.getInt("write.metadata.previous-versions-max", 100)));
                    boolean containOffsetRange = this.setDatasetOffsetRange(tableMetadata, props);
                    String topicName = tableName;
                    if (containOffsetRange) {
                        String topicPartitionString = (String)((Map)tableMetadata.dataOffsetRange.get()).keySet().iterator().next();
                        topicName = topicPartitionString.substring(0, topicPartitionString.lastIndexOf(45));
                    }
                    this.updateSchema(tableMetadata, props, topicName);
                    UpdateProperties updateProperties = transaction.updateProperties();
                    props.forEach((arg_0, arg_1) -> ((UpdateProperties)updateProperties).set(arg_0, arg_1));
                    updateProperties.commit();
                    try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName);){
                        transaction.commitTransaction();
                    }
                    Snapshot snapshot = ((org.apache.iceberg.Table)tableMetadata.table.get()).currentSnapshot();
                    Map currentProps = ((org.apache.iceberg.Table)tableMetadata.table.get()).properties();
                    this.submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
                    tableMetadata.reset(currentProps, highWatermark);
                    log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid.toString()));
                    break block23;
                }
                log.info("There's no transaction initiated for the table {}", (Object)tid.toString());
            }
            catch (RuntimeException e) {
                throw new RuntimeException(String.format("Fail to flush table %s %s", dbName, tableName), e);
            }
            catch (Exception e) {
                throw new IOException(String.format("Fail to flush table %s %s", dbName, tableName), e);
            }
            finally {
                writeLock.unlock();
            }
        }
    }

    private void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata tableMetadata, String dbName, String tableName, Map<String, String> props, Long highWaterMark) {
        GobblinEventBuilder gobblinTrackingEvent = new GobblinEventBuilder("IcebergMetadataCommitEvent");
        long currentSnapshotID = snapshot.snapshotId();
        long endToEndLag = System.currentTimeMillis() - tableMetadata.lowestGMCEEmittedTime;
        TableIdentifier tid = TableIdentifier.of((String[])new String[]{dbName, tableName});
        String gmceTopicPartition = this.tableTopicPartitionMap.get(tid);
        gobblinTrackingEvent.addMetadata("gmceTopicName", gmceTopicPartition.split("-")[0]);
        gobblinTrackingEvent.addMetadata("gmceTopicPartition", gmceTopicPartition.split("-")[1]);
        gobblinTrackingEvent.addMetadata("gmceHighWatermark", highWaterMark.toString());
        gobblinTrackingEvent.addMetadata("gmceLowWatermark", ((Long)tableMetadata.lowWatermark.get()).toString());
        gobblinTrackingEvent.addMetadata("endToEndLag", Long.toString(endToEndLag));
        gobblinTrackingEvent.addMetadata("currentSnapshotId", Long.toString(currentSnapshotID));
        gobblinTrackingEvent.addMetadata("currentManifestLocation", snapshot.manifestListLocation());
        gobblinTrackingEvent.addMetadata("currentSnapshotDetailedInformation", Joiner.on((String)",").withKeyValueSeparator("=").join(snapshot.summary()));
        gobblinTrackingEvent.addMetadata("icebergTableName", tableName);
        gobblinTrackingEvent.addMetadata("icebergDatabaseName", dbName);
        gobblinTrackingEvent.addMetadata("datasetHdfsPath", tableMetadata.datasetName);
        for (Map.Entry<String, String> entry : props.entrySet()) {
            if (!entry.getKey().startsWith(OFFSET_RANGE_KEY_PREFIX)) continue;
            gobblinTrackingEvent.addMetadata(entry.getKey(), entry.getValue());
        }
        this.eventSubmitter.submit(gobblinTrackingEvent);
    }

    private boolean setDatasetOffsetRange(TableMetadata tableMetadata, Map<String, String> props) {
        if (tableMetadata.dataOffsetRange.isPresent() && !((Map)tableMetadata.dataOffsetRange.get()).isEmpty()) {
            for (Map.Entry offsets : ((Map)tableMetadata.dataOffsetRange.get()).entrySet()) {
                List ranges = (List)offsets.getValue();
                String rangeString = ranges.stream().map(r -> Joiner.on((String)"-").join((Object)r.lowerEndpoint(), (Object)r.upperEndpoint(), new Object[0])).collect(Collectors.joining(","));
                props.put(String.format(OFFSET_RANGE_KEY_FORMAT, offsets.getKey()), rangeString);
            }
            return true;
        }
        return false;
    }

    private void updateSchema(TableMetadata tableMetadata, Map<String, String> props, String topicName) {
        props.put(SCHEMA_CREATION_TIME_KEY, (String)tableMetadata.lastSchemaVersion.or((Object)DEFAULT_CREATION_TIME));
        try {
            if (tableMetadata.candidateSchemas.isPresent() && ((Cache)tableMetadata.candidateSchemas.get()).size() > 0L) {
                Cache candidates = (Cache)tableMetadata.candidateSchemas.get();
                if (candidates.size() == 1L && candidates.getIfPresent((Object)DEFAULT_CREATION_TIME) != null) {
                    this.updateSchemaHelper(DEFAULT_CREATION_TIME, (org.apache.iceberg.Schema)candidates.getIfPresent((Object)DEFAULT_CREATION_TIME), props, (org.apache.iceberg.Table)tableMetadata.table.get());
                } else {
                    org.apache.avro.Schema latestSchema = (org.apache.avro.Schema)this.schemaRegistry.getLatestSchemaByTopic(topicName);
                    String creationTime = AvroUtils.getSchemaCreationTime((org.apache.avro.Schema)latestSchema);
                    if (creationTime == null) {
                        log.warn("Schema from schema registry does not contain creation time, check config for schema registry class");
                    } else if (candidates.getIfPresent((Object)creationTime) != null) {
                        this.updateSchemaHelper(creationTime, (org.apache.iceberg.Schema)candidates.getIfPresent((Object)creationTime), props, (org.apache.iceberg.Table)tableMetadata.table.get());
                    }
                }
            }
        }
        catch (SchemaRegistryException e) {
            log.error("Cannot get schema form schema registry, will not update this schema", (Throwable)e);
        }
    }

    private void updateSchemaHelper(String schemaCreationTime, org.apache.iceberg.Schema schema, Map<String, String> props, org.apache.iceberg.Table table) {
        try {
            table.updateSchema().unionByNameWith(schema).commit();
            props.put(SCHEMA_CREATION_TIME_KEY, schemaCreationTime);
        }
        catch (Exception e) {
            log.error("Cannot update schema to " + schema.toString() + "for table " + table.location(), (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope, Map<String, Collection<HiveSpec>> newSpecsMap, Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) throws IOException {
        Lock readLock = this.readWriteLock.readLock();
        readLock.lock();
        try {
            GenericRecord genericRecord = (GenericRecord)recordEnvelope.getRecord();
            GobblinMetadataChangeEvent gmce = (GobblinMetadataChangeEvent)SpecificData.get().deepCopy(genericRecord.getSchema(), (Object)genericRecord);
            if (this.whitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) {
                TableIdentifier tid = TableIdentifier.of((String[])new String[]{tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName()});
                String topicPartition = this.tableTopicPartitionMap.computeIfAbsent(tid, t -> ((KafkaStreamingExtractor.KafkaWatermark)recordEnvelope.getWatermark()).getTopicPartition().toString());
                Long currentWatermark = this.getAndPersistCurrentWatermark(tid, topicPartition);
                Long currentOffset = ((KafkaStreamingExtractor.KafkaWatermark)recordEnvelope.getWatermark()).getLwm().getValue();
                if (currentOffset > currentWatermark) {
                    if (currentWatermark == -1L) {
                        this.tableMetadataMap.computeIfAbsent((TableIdentifier)tid, (Function<TableIdentifier, TableMetadata>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$writeEnvelope$16(org.apache.iceberg.catalog.TableIdentifier ), (Lorg/apache/iceberg/catalog/TableIdentifier;)Lorg/apache/gobblin/iceberg/writer/IcebergMetadataWriter$TableMetadata;)((IcebergMetadataWriter)this)).lowWatermark = Optional.of((Object)(currentOffset - 1L));
                    }
                    this.tableMetadataMap.get(tid).setDatasetName(gmce.getDatasetIdentifier().getNativeName());
                    this.write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
                    this.tableCurrentWatermarkMap.put(tid, currentOffset);
                } else {
                    log.warn(String.format("Skip processing record %s since it has lower watermark", genericRecord.toString()));
                }
            } else {
                log.info(String.format("Skip table %s.%s since it's not selected", tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName()));
            }
        }
        finally {
            readLock.unlock();
        }
    }

    public void close() throws IOException {
        this.closer.close();
    }

    public KafkaSchemaRegistry getSchemaRegistry() {
        return this.schemaRegistry;
    }

    public void setCatalog(HiveCatalog catalog) {
        this.catalog = catalog;
    }

    private /* synthetic */ TableMetadata lambda$writeEnvelope$16(TableIdentifier t) {
        return new TableMetadata();
    }

    private /* synthetic */ TableMetadata lambda$updateTableProperty$6(TableIdentifier t) {
        return new TableMetadata();
    }

    private /* synthetic */ TableMetadata lambda$getAndPersistCurrentWatermark$1(TableIdentifier t) {
        return new TableMetadata();
    }

    private class TableMetadata {
        Optional<org.apache.iceberg.Table> table = Optional.absent();
        Optional<Transaction> transaction = Optional.absent();
        private Optional<AppendFiles> appendFiles = Optional.absent();
        private Optional<DeleteFiles> deleteFiles = Optional.absent();
        Optional<Map<String, String>> lastProperties = Optional.absent();
        Optional<Map<String, String>> newProperties = Optional.absent();
        Optional<Cache<String, org.apache.iceberg.Schema>> candidateSchemas = Optional.absent();
        Optional<Map<String, List<Range>>> dataOffsetRange = Optional.absent();
        Optional<String> lastSchemaVersion = Optional.absent();
        Optional<Long> lowWatermark = Optional.absent();
        String datasetName;
        Cache<CharSequence, String> addedFiles;
        long lowestGMCEEmittedTime;

        private TableMetadata() {
            this.addedFiles = CacheBuilder.newBuilder().expireAfterAccess((long)IcebergMetadataWriter.this.conf.getInt(IcebergMetadataWriter.ADDED_FILES_CACHE_EXPIRING_TIME, 1), TimeUnit.HOURS).build();
            this.lowestGMCEEmittedTime = Long.MAX_VALUE;
        }

        AppendFiles getOrInitAppendFiles() {
            this.ensureTxnInit();
            if (!this.appendFiles.isPresent()) {
                this.appendFiles = Optional.of((Object)((Transaction)this.transaction.get()).newAppend());
            }
            return (AppendFiles)this.appendFiles.get();
        }

        DeleteFiles getOrInitDeleteFiles() {
            this.ensureTxnInit();
            if (!this.deleteFiles.isPresent()) {
                this.deleteFiles = Optional.of((Object)((Transaction)this.transaction.get()).newDelete());
            }
            return (DeleteFiles)this.deleteFiles.get();
        }

        void ensureTxnInit() {
            if (!this.transaction.isPresent()) {
                this.transaction = Optional.of((Object)((org.apache.iceberg.Table)this.table.get()).newTransaction());
            }
        }

        void reset(Map<String, String> props, Long lowWaterMark) {
            this.lastProperties = Optional.of(props);
            this.lastSchemaVersion = Optional.of((Object)props.get(IcebergMetadataWriter.SCHEMA_CREATION_TIME_KEY));
            this.transaction = Optional.absent();
            this.deleteFiles = Optional.absent();
            this.appendFiles = Optional.absent();
            if (this.candidateSchemas.isPresent()) {
                ((Cache)this.candidateSchemas.get()).cleanUp();
            }
            this.candidateSchemas = Optional.absent();
            this.dataOffsetRange = Optional.absent();
            this.newProperties = Optional.absent();
            this.lowestGMCEEmittedTime = Long.MAX_VALUE;
            this.lowWatermark = Optional.of((Object)lowWaterMark);
        }

        public void setDatasetName(String datasetName) {
            this.datasetName = datasetName;
        }
    }
}

