/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.dynamic;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.sink.EqualityFieldKeySelector;
import org.apache.iceberg.flink.sink.PartitionKeySelector;
import org.apache.iceberg.flink.sink.dynamic.DynamicRecord;
import org.apache.iceberg.flink.sink.dynamic.DynamicSinkUtil;
import org.apache.iceberg.flink.sink.dynamic.LRUCache;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class HashKeyGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(HashKeyGenerator.class);
    private final int maxWriteParallelism;
    private final Map<SelectorKey, KeySelector<RowData, Integer>> keySelectorCache;

    HashKeyGenerator(int maxCacheSize, int maxWriteParallelism) {
        this.maxWriteParallelism = maxWriteParallelism;
        this.keySelectorCache = new LRUCache<SelectorKey, KeySelector<RowData, Integer>>(maxCacheSize);
    }

    int generateKey(DynamicRecord dynamicRecord) throws Exception {
        return this.generateKey(dynamicRecord, null, null, null);
    }

    int generateKey(DynamicRecord dynamicRecord, @Nullable Schema tableSchema, @Nullable PartitionSpec tableSpec, @Nullable RowData overrideRowData) {
        String tableIdent = dynamicRecord.tableIdentifier().toString();
        SelectorKey cacheKey = new SelectorKey(tableIdent, dynamicRecord.branch(), tableSchema != null ? Integer.valueOf(tableSchema.schemaId()) : null, tableSpec != null ? Integer.valueOf(tableSpec.specId()) : null, dynamicRecord.schema(), dynamicRecord.spec(), dynamicRecord.equalityFields());
        KeySelector keySelector = this.keySelectorCache.computeIfAbsent(cacheKey, k -> this.getKeySelector(tableIdent, MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()), MoreObjects.firstNonNull(tableSpec, dynamicRecord.spec()), MoreObjects.firstNonNull(dynamicRecord.distributionMode(), DistributionMode.NONE), MoreObjects.firstNonNull(dynamicRecord.equalityFields(), Collections.emptySet()), dynamicRecord.writeParallelism()));
        try {
            return (Integer)keySelector.getKey((Object)(overrideRowData != null ? overrideRowData : dynamicRecord.rowData()));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private KeySelector<RowData, Integer> getKeySelector(String tableName, Schema schema, PartitionSpec spec, DistributionMode mode, Set<String> equalityFields, int writeParallelism) {
        LOG.debug("Creating new KeySelector for table '{}' with distribution mode '{}'", (Object)tableName, (Object)mode);
        switch (mode) {
            case NONE: {
                if (equalityFields.isEmpty()) {
                    return HashKeyGenerator.tableKeySelector(tableName, writeParallelism, this.maxWriteParallelism);
                }
                LOG.info("{}: Distribute rows by equality fields, because there are equality fields set", (Object)tableName);
                return HashKeyGenerator.equalityFieldKeySelector(tableName, schema, equalityFields, writeParallelism, this.maxWriteParallelism);
            }
            case HASH: {
                if (equalityFields.isEmpty()) {
                    if (spec.isUnpartitioned()) {
                        LOG.warn("{}: Fallback to use 'none' distribution mode, because there are no equality fields set and table is unpartitioned", (Object)tableName);
                        return HashKeyGenerator.tableKeySelector(tableName, writeParallelism, this.maxWriteParallelism);
                    }
                    return HashKeyGenerator.partitionKeySelector(tableName, schema, spec, writeParallelism, this.maxWriteParallelism);
                }
                if (spec.isUnpartitioned()) {
                    LOG.info("{}: Distribute rows by equality fields, because there are equality fields set and table is unpartitioned", (Object)tableName);
                    return HashKeyGenerator.equalityFieldKeySelector(tableName, schema, equalityFields, writeParallelism, this.maxWriteParallelism);
                }
                for (PartitionField partitionField : spec.fields()) {
                    Preconditions.checkState(equalityFields.contains(partitionField.name()), "%s: In 'hash' distribution mode with equality fields set, partition field '%s' should be included in equality fields: '%s'", (Object)tableName, (Object)partitionField, schema.columns().stream().filter(c -> equalityFields.contains(c.name())).collect(Collectors.toList()));
                }
                return HashKeyGenerator.partitionKeySelector(tableName, schema, spec, writeParallelism, this.maxWriteParallelism);
            }
            case RANGE: {
                if (schema.identifierFieldIds().isEmpty()) {
                    LOG.warn("{}: Fallback to use 'none' distribution mode, because there are no equality fields set and {}='range' is not supported yet in flink", (Object)tableName, (Object)"write.distribution-mode");
                    return HashKeyGenerator.tableKeySelector(tableName, writeParallelism, this.maxWriteParallelism);
                }
                LOG.info("{}: Distribute rows by equality fields, because there are equality fields set and {}='range' is not supported yet in flink", (Object)tableName, (Object)"write.distribution-mode");
                return HashKeyGenerator.equalityFieldKeySelector(tableName, schema, equalityFields, writeParallelism, this.maxWriteParallelism);
            }
        }
        throw new IllegalArgumentException(tableName + ": Unrecognized write.distribution-mode: " + String.valueOf((Object)mode));
    }

    private static KeySelector<RowData, Integer> equalityFieldKeySelector(String tableName, Schema schema, Set<String> equalityFields, int writeParallelism, int maxWriteParallelism) {
        return new TargetLimitedKeySelector(new EqualityFieldKeySelector(schema, FlinkSchemaUtil.convert(schema), DynamicSinkUtil.getEqualityFieldIds(equalityFields, schema)), tableName, writeParallelism, maxWriteParallelism);
    }

    private static KeySelector<RowData, Integer> partitionKeySelector(String tableName, Schema schema, PartitionSpec spec, int writeParallelism, int maxWriteParallelism) {
        PartitionKeySelector inner = new PartitionKeySelector(spec, schema, FlinkSchemaUtil.convert(schema));
        return new TargetLimitedKeySelector((KeySelector<RowData, Integer>)(KeySelector & Serializable)in -> ((String)inner.getKey(in)).hashCode(), tableName, writeParallelism, maxWriteParallelism);
    }

    private static KeySelector<RowData, Integer> tableKeySelector(String tableName, int writeParallelism, int maxWriteParallelism) {
        return new TargetLimitedKeySelector(new RoundRobinKeySelector<RowData>(writeParallelism), tableName, writeParallelism, maxWriteParallelism);
    }

    @VisibleForTesting
    Map<SelectorKey, KeySelector<RowData, Integer>> getKeySelectorCache() {
        return this.keySelectorCache;
    }

    static class SelectorKey {
        private final String tableName;
        private final String branch;
        private final Integer schemaId;
        private final Integer specId;
        private final Schema schema;
        private final PartitionSpec spec;
        private final Set<String> equalityFields;

        SelectorKey(String tableName, String branch, @Nullable Integer tableSchemaId, @Nullable Integer tableSpecId, Schema schema, PartitionSpec spec, Set<String> equalityFields) {
            this.tableName = tableName;
            this.branch = branch;
            this.schemaId = tableSchemaId;
            this.specId = tableSpecId;
            this.schema = tableSchemaId == null ? schema : null;
            this.spec = tableSpecId == null ? spec : null;
            this.equalityFields = equalityFields;
        }

        public boolean equals(Object other) {
            if (this == other) {
                return true;
            }
            if (other == null || this.getClass() != other.getClass()) {
                return false;
            }
            SelectorKey that = (SelectorKey)other;
            return Objects.equals(this.tableName, that.tableName) && Objects.equals(this.branch, that.branch) && Objects.equals(this.schemaId, that.schemaId) && Objects.equals(this.specId, that.specId) && Objects.equals(this.schema, that.schema) && Objects.equals(this.spec, that.spec) && Objects.equals(this.equalityFields, that.equalityFields);
        }

        public int hashCode() {
            return Objects.hash(this.tableName, this.branch, this.schemaId, this.specId, this.schema, this.spec, this.equalityFields);
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("tableName", this.tableName).add("branch", this.branch).add("schemaId", this.schemaId).add("specId", this.specId).add("schema", this.schema).add("spec", this.spec).add("equalityFields", this.equalityFields).toString();
        }
    }

    private static class TargetLimitedKeySelector
    implements KeySelector<RowData, Integer> {
        private final KeySelector<RowData, Integer> wrapped;
        private final int writeParallelism;
        private final int[] distinctKeys;

        TargetLimitedKeySelector(KeySelector<RowData, Integer> wrapped, String tableName, int writeParallelism, int maxWriteParallelism) {
            if (writeParallelism > maxWriteParallelism) {
                LOG.warn("{}: writeParallelism {} is greater than maxWriteParallelism {}. Capping writeParallelism at {}", new Object[]{tableName, writeParallelism, maxWriteParallelism, maxWriteParallelism});
                writeParallelism = maxWriteParallelism;
            }
            this.wrapped = wrapped;
            this.writeParallelism = writeParallelism;
            this.distinctKeys = new int[writeParallelism];
            HashSet<Integer> targetSlots = Sets.newHashSetWithExpectedSize(writeParallelism);
            int nextKey = tableName.hashCode();
            for (int i = 0; i < writeParallelism; ++i) {
                int subtaskId = TargetLimitedKeySelector.subtaskId(nextKey, writeParallelism, maxWriteParallelism);
                while (targetSlots.contains(subtaskId)) {
                    subtaskId = TargetLimitedKeySelector.subtaskId(++nextKey, writeParallelism, maxWriteParallelism);
                }
                targetSlots.add(subtaskId);
                this.distinctKeys[i] = nextKey++;
            }
        }

        public Integer getKey(RowData value) throws Exception {
            return this.distinctKeys[DynamicSinkUtil.safeAbs(((Integer)this.wrapped.getKey((Object)value)).hashCode()) % this.writeParallelism];
        }

        private static int subtaskId(int key, int writeParallelism, int maxWriteParallelism) {
            return KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxWriteParallelism, (int)writeParallelism, (int)KeyGroupRangeAssignment.computeKeyGroupForKeyHash((int)key, (int)maxWriteParallelism));
        }
    }

    private static class RoundRobinKeySelector<T>
    implements KeySelector<T, Integer> {
        private final int maxTarget;
        private int lastTarget = 0;

        RoundRobinKeySelector(int maxTarget) {
            this.maxTarget = maxTarget;
        }

        public Integer getKey(T value) {
            this.lastTarget = (this.lastTarget + 1) % this.maxTarget;
            return this.lastTarget;
        }
    }
}

