/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.dynamic;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.sink.dynamic.CompareSchemasVisitor;
import org.apache.iceberg.flink.sink.dynamic.DataConverter;
import org.apache.iceberg.flink.sink.dynamic.DynamicRecord;
import org.apache.iceberg.flink.sink.dynamic.DynamicRecordGenerator;
import org.apache.iceberg.flink.sink.dynamic.DynamicRecordInternal;
import org.apache.iceberg.flink.sink.dynamic.DynamicRecordInternalType;
import org.apache.iceberg.flink.sink.dynamic.DynamicSinkUtil;
import org.apache.iceberg.flink.sink.dynamic.HashKeyGenerator;
import org.apache.iceberg.flink.sink.dynamic.TableMetadataCache;
import org.apache.iceberg.flink.sink.dynamic.TableUpdater;

@Internal
class DynamicRecordProcessor<T>
extends ProcessFunction<T, DynamicRecordInternal>
implements Collector<DynamicRecord> {
    @VisibleForTesting
    static final String DYNAMIC_TABLE_UPDATE_STREAM = "dynamic-table-update-stream";
    private final DynamicRecordGenerator<T> generator;
    private final CatalogLoader catalogLoader;
    private final boolean immediateUpdate;
    private final int cacheMaximumSize;
    private final long cacheRefreshMs;
    private final int inputSchemasPerTableCacheMaximumSize;
    private transient TableMetadataCache tableCache;
    private transient HashKeyGenerator hashKeyGenerator;
    private transient TableUpdater updater;
    private transient OutputTag<DynamicRecordInternal> updateStream;
    private transient Collector<DynamicRecordInternal> collector;
    private transient ProcessFunction.Context context;

    DynamicRecordProcessor(DynamicRecordGenerator<T> generator, CatalogLoader catalogLoader, boolean immediateUpdate, int cacheMaximumSize, long cacheRefreshMs, int inputSchemasPerTableCacheMaximumSize) {
        this.generator = generator;
        this.catalogLoader = catalogLoader;
        this.immediateUpdate = immediateUpdate;
        this.cacheMaximumSize = cacheMaximumSize;
        this.cacheRefreshMs = cacheRefreshMs;
        this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
    }

    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
        Catalog catalog = this.catalogLoader.loadCatalog();
        this.tableCache = new TableMetadataCache(catalog, this.cacheMaximumSize, this.cacheRefreshMs, this.inputSchemasPerTableCacheMaximumSize);
        this.hashKeyGenerator = new HashKeyGenerator(this.cacheMaximumSize, this.getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks());
        if (this.immediateUpdate) {
            this.updater = new TableUpdater(this.tableCache, catalog);
        } else {
            this.updateStream = new OutputTag<DynamicRecordInternal>(DYNAMIC_TABLE_UPDATE_STREAM, (TypeInformation)new DynamicRecordInternalType(this.catalogLoader, true, this.cacheMaximumSize)){};
        }
        this.generator.open(openContext);
    }

    public void processElement(T element, ProcessFunction.Context ctx, Collector<DynamicRecordInternal> out) throws Exception {
        this.context = ctx;
        this.collector = out;
        this.generator.generate(element, this);
    }

    public void collect(DynamicRecord data) {
        PartitionSpec foundSpec;
        boolean exists = (Boolean)this.tableCache.exists((TableIdentifier)data.tableIdentifier()).f0;
        String foundBranch = exists ? this.tableCache.branch(data.tableIdentifier(), data.branch()) : null;
        TableMetadataCache.ResolvedSchemaInfo foundSchema = exists ? this.tableCache.schema(data.tableIdentifier(), data.schema()) : TableMetadataCache.NOT_FOUND;
        PartitionSpec partitionSpec = foundSpec = exists ? this.tableCache.spec(data.tableIdentifier(), data.spec()) : null;
        if (!exists || foundBranch == null || foundSpec == null || foundSchema.compareResult() == CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) {
            if (this.immediateUpdate) {
                Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> newData = this.updater.update(data.tableIdentifier(), data.branch(), data.schema(), data.spec());
                this.emit(this.collector, data, ((TableMetadataCache.ResolvedSchemaInfo)newData.f0).resolvedTableSchema(), ((TableMetadataCache.ResolvedSchemaInfo)newData.f0).recordConverter(), (PartitionSpec)newData.f1);
            } else {
                int writerKey = this.hashKeyGenerator.generateKey(data, foundSchema.resolvedTableSchema() != null ? foundSchema.resolvedTableSchema() : data.schema(), foundSpec != null ? foundSpec : data.spec(), data.rowData());
                this.context.output(this.updateStream, (Object)new DynamicRecordInternal(data.tableIdentifier().toString(), data.branch(), data.schema(), data.rowData(), data.spec(), writerKey, data.upsertMode(), DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), data.schema())));
            }
        } else {
            this.emit(this.collector, data, foundSchema.resolvedTableSchema(), foundSchema.recordConverter(), foundSpec);
        }
    }

    private void emit(Collector<DynamicRecordInternal> out, DynamicRecord data, Schema schema, DataConverter recordConverter, PartitionSpec spec) {
        RowData rowData = (RowData)recordConverter.convert(data.rowData());
        int writerKey = this.hashKeyGenerator.generateKey(data, schema, spec, rowData);
        String tableName = data.tableIdentifier().toString();
        out.collect((Object)new DynamicRecordInternal(tableName, data.branch(), schema, rowData, spec, writerKey, data.upsertMode(), DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), schema)));
    }

    public void close() {
        try {
            super.close();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

