/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.partitioning;

import java.io.Serializable;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.function.HashFunction;
import org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
public class BatchRegularPrePartitionOperator
extends AbstractStreamOperator<PartitioningEvent>
implements OneInputStreamOperator<Event, PartitioningEvent>,
Serializable {
    private static final long serialVersionUID = 1L;
    private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1L);
    private final int downstreamParallelism;
    private final HashFunctionProvider<DataChangeEvent> hashFunctionProvider;
    private transient Map<TableId, HashFunction<DataChangeEvent>> cachedHashFunctions;
    private volatile transient Map<TableId, Schema> originalSchemaMap;

    public BatchRegularPrePartitionOperator(int downstreamParallelism, HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.downstreamParallelism = downstreamParallelism;
        this.hashFunctionProvider = hashFunctionProvider;
    }

    public void open() throws Exception {
        super.open();
        this.cachedHashFunctions = new HashMap<TableId, HashFunction<DataChangeEvent>>();
        this.originalSchemaMap = new HashMap<TableId, Schema>();
    }

    public void processElement(StreamRecord<Event> element) throws Exception {
        Event event = (Event)element.getValue();
        if (event instanceof CreateTableEvent) {
            CreateTableEvent createTableEvent = (CreateTableEvent)event;
            TableId tableId = createTableEvent.tableId();
            this.originalSchemaMap.put(tableId, createTableEvent.getSchema());
            this.cachedHashFunctions.put(tableId, this.recreateHashFunction(tableId));
            this.broadcastEvent(event);
        } else if (event instanceof DataChangeEvent) {
            this.partitionBy((DataChangeEvent)event);
        }
    }

    private void partitionBy(DataChangeEvent dataChangeEvent) throws Exception {
        this.output.collect((Object)new StreamRecord((Object)PartitioningEvent.ofRegular(dataChangeEvent, this.cachedHashFunctions.get(dataChangeEvent.tableId()).hashcode(dataChangeEvent) % this.downstreamParallelism)));
    }

    private void broadcastEvent(Event toBroadcast) {
        for (int i = 0; i < this.downstreamParallelism; ++i) {
            Event copiedEvent = EventSerializer.INSTANCE.copy(toBroadcast);
            this.output.collect((Object)new StreamRecord((Object)PartitioningEvent.ofRegular(copiedEvent, i)));
        }
    }

    private Schema loadSchemaFromCache(TableId tableId) {
        Optional<Schema> schema;
        try {
            schema = Optional.ofNullable(this.originalSchemaMap.get(tableId));
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Failed to request latest schema for table \"%s\"", tableId), e);
        }
        if (!schema.isPresent()) {
            throw new IllegalStateException(String.format("Schema is never registered or outdated for table \"%s\"", tableId));
        }
        return schema.get();
    }

    private HashFunction<DataChangeEvent> recreateHashFunction(TableId tableId) {
        return this.hashFunctionProvider.getHashFunction(tableId, this.loadSchemaFromCache(tableId));
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
    }
}

