/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.partitioning;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.function.HashFunction;
import org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
public class DistributedPrePartitionOperator
extends AbstractStreamOperator<PartitioningEvent>
implements OneInputStreamOperator<Event, PartitioningEvent>,
Serializable {
    private static final long serialVersionUID = 1L;
    private final int downstreamParallelism;
    private final HashFunctionProvider<DataChangeEvent> hashFunctionProvider;
    private transient Map<TableId, Schema> schemaMap;
    private transient Map<TableId, HashFunction<DataChangeEvent>> hashFunctionMap;
    private transient int subTaskId;

    public DistributedPrePartitionOperator(int downstreamParallelism, HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.downstreamParallelism = downstreamParallelism;
        this.hashFunctionProvider = hashFunctionProvider;
    }

    public void open() throws Exception {
        super.open();
        this.subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
        this.schemaMap = new HashMap<TableId, Schema>();
        this.hashFunctionMap = new HashMap<TableId, HashFunction<DataChangeEvent>>();
    }

    public void processElement(StreamRecord<Event> element) throws Exception {
        Event event = (Event)element.getValue();
        if (event instanceof SchemaChangeEvent) {
            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent)event;
            TableId tableId = schemaChangeEvent.tableId();
            this.schemaMap.compute(tableId, (tId, oldSchema) -> SchemaUtils.applySchemaChangeEvent(oldSchema, schemaChangeEvent));
            this.hashFunctionMap.put(tableId, this.recreateHashFunction(tableId));
            this.broadcastEvent(event);
        } else if (event instanceof DataChangeEvent) {
            this.partitionBy((DataChangeEvent)event);
        } else {
            throw new IllegalStateException(this.subTaskId + "> PrePartition operator received an unexpected event: " + event);
        }
    }

    private void partitionBy(DataChangeEvent dataChangeEvent) {
        this.output.collect((Object)new StreamRecord((Object)PartitioningEvent.ofDistributed(dataChangeEvent, this.subTaskId, this.hashFunctionMap.get(dataChangeEvent.tableId()).hashcode(dataChangeEvent) % this.downstreamParallelism)));
    }

    private void broadcastEvent(Event toBroadcast) {
        for (int i = 0; i < this.downstreamParallelism; ++i) {
            Event copiedEvent = EventSerializer.INSTANCE.copy(toBroadcast);
            this.output.collect((Object)new StreamRecord((Object)PartitioningEvent.ofDistributed(copiedEvent, this.subTaskId, i)));
        }
    }

    private HashFunction<DataChangeEvent> recreateHashFunction(TableId tableId) {
        return this.hashFunctionProvider.getHashFunction(tableId, this.schemaMap.get(tableId));
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
    }
}

