/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.runtime.partitioning;

import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.annotation.VisibleForTesting;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.event.DataChangeEvent;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.FlushEvent;
import com.ververica.cdc.common.event.OperationType;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Column;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.runtime.operators.sink.SchemaEvolutionClient;
import com.ververica.cdc.runtime.partitioning.PartitioningEvent;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
public class PrePartitionOperator
extends AbstractStreamOperator<PartitioningEvent>
implements OneInputStreamOperator<Event, PartitioningEvent> {
    private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1L);
    private final OperatorID schemaOperatorId;
    private final int downstreamParallelism;
    private transient SchemaEvolutionClient schemaEvolutionClient;
    private transient LoadingCache<TableId, HashFunction> cachedHashFunctions;

    public PrePartitionOperator(OperatorID schemaOperatorId, int downstreamParallelism) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.schemaOperatorId = schemaOperatorId;
        this.downstreamParallelism = downstreamParallelism;
    }

    public void open() throws Exception {
        super.open();
        TaskOperatorEventGateway toCoordinator = this.getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway();
        this.schemaEvolutionClient = new SchemaEvolutionClient(toCoordinator, this.schemaOperatorId);
        this.cachedHashFunctions = this.createCache();
    }

    public void processElement(StreamRecord<Event> element) throws Exception {
        Event event = (Event)element.getValue();
        if (event instanceof SchemaChangeEvent) {
            TableId tableId = ((SchemaChangeEvent)event).tableId();
            this.cachedHashFunctions.put((Object)tableId, (Object)this.recreateHashFunction(tableId));
            this.broadcastEvent(event);
        } else if (event instanceof FlushEvent) {
            this.broadcastEvent(event);
        } else if (event instanceof DataChangeEvent) {
            this.partitionBy((DataChangeEvent)event);
        }
    }

    private void partitionBy(DataChangeEvent dataChangeEvent) throws Exception {
        this.output.collect((Object)new StreamRecord((Object)new PartitioningEvent((Event)dataChangeEvent, ((HashFunction)this.cachedHashFunctions.get((Object)dataChangeEvent.tableId())).apply(dataChangeEvent) % this.downstreamParallelism)));
    }

    private void broadcastEvent(Event toBroadcast) {
        for (int i = 0; i < this.downstreamParallelism; ++i) {
            this.output.collect((Object)new StreamRecord((Object)new PartitioningEvent(toBroadcast, i)));
        }
    }

    private Schema loadLatestSchemaFromRegistry(TableId tableId) {
        Optional<Schema> schema;
        try {
            schema = this.schemaEvolutionClient.getLatestSchema(tableId);
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Failed to request latest schema for table \"%s\"", tableId), e);
        }
        if (!schema.isPresent()) {
            throw new IllegalStateException(String.format("Schema is never registered or outdated for table \"%s\"", tableId));
        }
        return schema.get();
    }

    private HashFunction recreateHashFunction(TableId tableId) {
        return new HashFunction(this.loadLatestSchemaFromRegistry(tableId));
    }

    private LoadingCache<TableId, HashFunction> createCache() {
        return CacheBuilder.newBuilder().expireAfterAccess(CACHE_EXPIRE_DURATION).build((CacheLoader)new CacheLoader<TableId, HashFunction>(){

            public HashFunction load(TableId key) {
                return PrePartitionOperator.this.recreateHashFunction(key);
            }
        });
    }

    @VisibleForTesting
    static class HashFunction
    implements Function<DataChangeEvent, Integer> {
        private final List<RecordData.FieldGetter> primaryKeyGetters;

        public HashFunction(Schema schema) {
            this.primaryKeyGetters = this.createFieldGetters(schema);
        }

        @Override
        public Integer apply(DataChangeEvent event) {
            ArrayList<Object> objectsToHash = new ArrayList<Object>();
            TableId tableId = event.tableId();
            Optional.ofNullable(tableId.getNamespace()).ifPresent(objectsToHash::add);
            Optional.ofNullable(tableId.getSchemaName()).ifPresent(objectsToHash::add);
            objectsToHash.add(tableId.getTableName());
            RecordData data = event.op().equals((Object)OperationType.DELETE) ? event.before() : event.after();
            for (RecordData.FieldGetter primaryKeyGetter : this.primaryKeyGetters) {
                objectsToHash.add(primaryKeyGetter.getFieldOrNull(data));
            }
            return Objects.hash(objectsToHash.toArray()) * 31 & Integer.MAX_VALUE;
        }

        private List<RecordData.FieldGetter> createFieldGetters(Schema schema) {
            int[] primaryKeyPositions;
            ArrayList<RecordData.FieldGetter> fieldGetters = new ArrayList<RecordData.FieldGetter>(schema.primaryKeys().size());
            for (int primaryKeyPosition : primaryKeyPositions = schema.primaryKeys().stream().mapToInt(pk -> {
                int i = 0;
                while (!((Column)schema.getColumns().get(i)).getName().equals(pk)) {
                    ++i;
                }
                if (i >= schema.getColumnCount()) {
                    throw new IllegalStateException(String.format("Unable to find column \"%s\" which is defined as primary key", pk));
                }
                return i;
            }).toArray()) {
                fieldGetters.add(RecordData.createFieldGetter((DataType)((Column)schema.getColumns().get(primaryKeyPosition)).getType(), (int)primaryKeyPosition));
            }
            return fieldGetters;
        }
    }
}

