/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.transform;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.SchemaMergingUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.transform.PostTransformChangeInfo;
import org.apache.flink.cdc.runtime.operators.transform.PostTransformOperatorBuilder;
import org.apache.flink.cdc.runtime.operators.transform.PostTransformer;
import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn;
import org.apache.flink.cdc.runtime.operators.transform.TransformContext;
import org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler;
import org.apache.flink.cdc.runtime.operators.transform.TransformFilter;
import org.apache.flink.cdc.runtime.operators.transform.TransformFilterProcessor;
import org.apache.flink.cdc.runtime.operators.transform.TransformProjection;
import org.apache.flink.cdc.runtime.operators.transform.TransformProjectionProcessor;
import org.apache.flink.cdc.runtime.operators.transform.TransformRule;
import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.runtime.operators.transform.converter.PostTransformConverters;
import org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
import org.apache.flink.shaded.guava31.com.google.common.collect.HashBasedTable;
import org.apache.flink.shaded.guava31.com.google.common.collect.Table;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

public class PostTransformOperator
extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event>,
Serializable {
    private static final long serialVersionUID = 1L;
    private final String timezone;
    private final List<TransformRule> transformRules;
    private final Map<TableId, Boolean> hasAsteriskMap;
    private final Map<TableId, List<String>> projectedColumnsMap;
    private final Map<TableId, PostTransformChangeInfo> postTransformInfoMap;
    private final List<Tuple3<String, String, Map<String, String>>> udfFunctions;
    private transient List<PostTransformer> transformers;
    private transient List<UserDefinedFunctionDescriptor> udfDescriptors;
    private transient List<Object> udfFunctionInstances;
    private transient Table<TableId, PostTransformer, TransformProjectionProcessor> projectionProcessors;
    private transient Table<TableId, PostTransformer, TransformFilterProcessor> filterProcessors;

    public static PostTransformOperatorBuilder newBuilder() {
        return new PostTransformOperatorBuilder();
    }

    PostTransformOperator(List<TransformRule> transformRules, String timezone, List<Tuple3<String, String, Map<String, String>>> udfFunctions) {
        this.timezone = timezone;
        this.transformRules = transformRules;
        this.hasAsteriskMap = new HashMap<TableId, Boolean>();
        this.projectedColumnsMap = new HashMap<TableId, List<String>>();
        this.postTransformInfoMap = new ConcurrentHashMap<TableId, PostTransformChangeInfo>();
        this.udfFunctions = udfFunctions;
    }

    public void open() throws Exception {
        super.open();
        this.projectionProcessors = HashBasedTable.create();
        this.filterProcessors = HashBasedTable.create();
        this.initializeUdf();
        this.transformers = this.createTransformers();
    }

    public void close() throws Exception {
        super.close();
        TransformExpressionCompiler.cleanUp();
        this.destroyUdf();
    }

    public void processElement(StreamRecord<Event> element) throws Exception {
        try {
            this.processElementInternal(element);
        }
        catch (Exception e) {
            PostTransformChangeInfo info;
            Event event = (Event)element.getValue();
            TableId tableId = null;
            Schema schemaBefore = null;
            Schema schemaAfter = null;
            if (event instanceof ChangeEvent && (info = this.postTransformInfoMap.get(tableId = ((ChangeEvent)event).tableId())) != null) {
                schemaBefore = info.getPreTransformedSchema();
                schemaAfter = info.getPostTransformedSchema();
            }
            throw new TransformException("post-transform", event, tableId, schemaBefore, schemaAfter, e);
        }
    }

    private void processElementInternal(StreamRecord<Event> element) {
        Event event = (Event)element.getValue();
        if (event == null) {
            return;
        }
        if (!(event instanceof ChangeEvent)) {
            throw new UnsupportedOperationException("Unexpected stream record event: " + event);
        }
        ChangeEvent changeEvent = (ChangeEvent)event;
        TableId tableId = changeEvent.tableId();
        List<PostTransformer> transformers = this.getEffectiveTransformers(tableId);
        if (transformers.isEmpty()) {
            this.output.collect(element);
            return;
        }
        if (event instanceof CreateTableEvent) {
            this.processCreateTableEvent((CreateTableEvent)event, transformers).map(StreamRecord::new).ifPresent(arg_0 -> ((Output)this.output).collect(arg_0));
            this.invalidateCache(tableId);
        } else if (event instanceof SchemaChangeEvent) {
            this.processSchemaChangeEvent((SchemaChangeEvent)event, transformers).map(StreamRecord::new).ifPresent(arg_0 -> ((Output)this.output).collect(arg_0));
            this.invalidateCache(tableId);
        } else if (event instanceof DataChangeEvent) {
            this.processDataChangeEvent((DataChangeEvent)event, transformers).map(StreamRecord::new).ifPresent(arg_0 -> ((Output)this.output).collect(arg_0));
        } else {
            throw new UnsupportedOperationException("Unexpected stream record event: " + event);
        }
    }

    private Optional<Event> processCreateTableEvent(CreateTableEvent event, List<PostTransformer> effectiveTransformers) {
        TableId tableId = event.tableId();
        Schema preSchema = event.getSchema();
        List schemas = effectiveTransformers.stream().map(trans -> this.transformSchema(preSchema, (PostTransformer)trans)).collect(Collectors.toList());
        Schema postSchema = SchemaUtils.ensurePkNonNull((Schema)SchemaMergingUtils.strictlyMergeSchemas(schemas));
        this.postTransformInfoMap.put(tableId, PostTransformChangeInfo.of(tableId, preSchema, postSchema));
        boolean wildcardMatched = effectiveTransformers.stream().map(PostTransformer::getProjection).flatMap(this::optionalToStream).map(TransformProjection::getProjection).anyMatch(TransformParser::hasAsterisk);
        this.hasAsteriskMap.put(tableId, wildcardMatched);
        this.projectedColumnsMap.put(tableId, preSchema.getColumnNames().stream().filter(postSchema.getColumnNames()::contains).collect(Collectors.toList()));
        return Optional.of(new CreateTableEvent(tableId, postSchema));
    }

    private Optional<Event> processSchemaChangeEvent(SchemaChangeEvent event, List<PostTransformer> effectiveTransformers) {
        TableId tableId = event.tableId();
        PostTransformChangeInfo info = (PostTransformChangeInfo)Preconditions.checkNotNull((Object)this.postTransformInfoMap.get(tableId));
        Schema prevPreSchema = info.getPreTransformedSchema();
        Schema nextPreSchema = SchemaUtils.applySchemaChangeEvent((Schema)prevPreSchema, (SchemaChangeEvent)event);
        List schemas = effectiveTransformers.stream().map(trans -> this.transformSchema(nextPreSchema, (PostTransformer)trans)).collect(Collectors.toList());
        Schema nextPostSchema = SchemaUtils.ensurePkNonNull((Schema)SchemaMergingUtils.strictlyMergeSchemas(schemas));
        this.postTransformInfoMap.put(tableId, PostTransformChangeInfo.of(tableId, nextPreSchema, nextPostSchema));
        Schema prevPostSchema = info.getPostTransformedSchema();
        List columnNamesBeforeChange = prevPostSchema.getColumnNames();
        if (this.hasAsteriskMap.getOrDefault(tableId, true).booleanValue()) {
            return SchemaUtils.transformSchemaChangeEvent((boolean)true, (List)columnNamesBeforeChange, (SchemaChangeEvent)event).map(Event.class::cast);
        }
        return SchemaUtils.transformSchemaChangeEvent((boolean)false, this.projectedColumnsMap.get(tableId), (SchemaChangeEvent)event).map(Event.class::cast);
    }

    private Optional<Event> processDataChangeEvent(DataChangeEvent event, List<PostTransformer> effectiveTransformers) {
        TableId tableId = event.tableId();
        PostTransformChangeInfo info = (PostTransformChangeInfo)Preconditions.checkNotNull((Object)this.postTransformInfoMap.get(tableId));
        TransformContext context = new TransformContext();
        context.epochTime = System.currentTimeMillis();
        context.meta = event.meta();
        String beforeOp = event.opTypeString(false);
        String afterOp = event.opTypeString(true);
        for (PostTransformer transformer : effectiveTransformers) {
            Tuple2<BinaryRecordData, Boolean> result;
            TransformProjectionProcessor projectionProcessor = this.getProjectionProcessor(tableId, transformer);
            TransformFilterProcessor filterProcessor = this.getFilterProcessor(tableId, transformer);
            RecordData beforeRow = null;
            RecordData afterRow = null;
            boolean filterPassed = true;
            if (event.before() != null) {
                context.opType = beforeOp;
                result = this.transformRecord(event.before(), info, projectionProcessor, filterProcessor, context);
                beforeRow = (RecordData)result.f0;
                filterPassed = (Boolean)result.f1;
            }
            if (event.after() != null) {
                context.opType = afterOp;
                result = this.transformRecord(event.after(), info, projectionProcessor, filterProcessor, context);
                afterRow = (RecordData)result.f0;
                filterPassed = (Boolean)result.f1;
            }
            if (!filterPassed) continue;
            DataChangeEvent finalEvent = DataChangeEvent.projectRecords((DataChangeEvent)event, beforeRow, afterRow);
            if (transformer.getPostTransformConverter().isPresent()) {
                return transformer.getPostTransformConverter().get().convert(finalEvent).map(Event.class::cast);
            }
            return Optional.of(finalEvent);
        }
        return Optional.empty();
    }

    private Schema transformSchema(Schema preSchema, PostTransformer transformer) {
        List<ProjectionColumn> projectionColumns = TransformParser.generateProjectionColumns(transformer.getProjection().map(TransformProjection::getProjection).orElse(null), preSchema.getColumns(), this.udfDescriptors, transformer.getSupportedMetadataColumns());
        return preSchema.copy(projectionColumns.stream().map(ProjectionColumn::getColumn).collect(Collectors.toList()));
    }

    private Tuple2<BinaryRecordData, Boolean> transformRecord(RecordData recordData, PostTransformChangeInfo info, @Nullable TransformProjectionProcessor projectionProcessor, @Nullable TransformFilterProcessor filterProcessor, TransformContext context) {
        RecordData.FieldGetter[] preFieldGetters = info.getPreTransformedFieldGetters();
        Schema preSchema = info.getPreTransformedSchema();
        Schema postSchema = info.getPostTransformedSchema();
        BinaryRecordDataGenerator postGenerator = info.getPostTransformedRecordDataGenerator();
        Object[] preRow = new Object[preFieldGetters.length];
        for (int i = 0; i < preFieldGetters.length; ++i) {
            preRow[i] = DataTypeConverter.convertToOriginal(preFieldGetters[i].getFieldOrNull(recordData), (DataType)preSchema.getColumnDataTypes().get(i));
        }
        Object[] postRow = projectionProcessor != null ? projectionProcessor.project(preRow, context) : preRow;
        boolean filterPassed = filterProcessor == null || filterProcessor.test(preRow, postRow, context);
        Object[] postRowBinary = new Object[postSchema.getColumnCount()];
        for (int i = 0; i < postRow.length; ++i) {
            postRowBinary[i] = DataTypeConverter.convert(postRow[i], (DataType)postSchema.getColumnDataTypes().get(i));
        }
        return Tuple2.of((Object)postGenerator.generate(postRowBinary), (Object)filterPassed);
    }

    private List<PostTransformer> getEffectiveTransformers(TableId tableId) {
        return this.transformers.stream().filter(trans -> trans.getSelectors().isMatch(tableId)).collect(Collectors.toList());
    }

    private TransformProjectionProcessor getProjectionProcessor(TableId tableId, PostTransformer postTransformer) {
        if (!this.projectionProcessors.contains((Object)tableId, (Object)postTransformer)) {
            PostTransformChangeInfo changeInfo = this.postTransformInfoMap.get(tableId);
            this.projectionProcessors.put((Object)tableId, (Object)postTransformer, (Object)new TransformProjectionProcessor(changeInfo, postTransformer.getProjection().map(TransformProjection::getProjection).orElse(null), this.timezone, this.udfDescriptors, this.udfFunctionInstances, postTransformer.getSupportedMetadataColumns()));
        }
        return (TransformProjectionProcessor)this.projectionProcessors.get((Object)tableId, (Object)postTransformer);
    }

    private TransformFilterProcessor getFilterProcessor(TableId tableId, PostTransformer postTransformer) {
        if (!this.filterProcessors.contains((Object)tableId, (Object)postTransformer)) {
            if (!postTransformer.getFilter().isPresent()) {
                this.filterProcessors.put((Object)tableId, (Object)postTransformer, (Object)TransformFilterProcessor.ofNoOp());
            } else {
                PostTransformChangeInfo changeInfo = this.postTransformInfoMap.get(tableId);
                this.filterProcessors.put((Object)tableId, (Object)postTransformer, (Object)TransformFilterProcessor.of(changeInfo, postTransformer.getFilter().orElse(null), this.timezone, this.udfDescriptors, this.udfFunctionInstances, postTransformer.getSupportedMetadataColumns()));
            }
        }
        return (TransformFilterProcessor)this.filterProcessors.get((Object)tableId, (Object)postTransformer);
    }

    private void invalidateCache(TableId tableId) {
        this.projectionProcessors.row((Object)tableId).clear();
        this.filterProcessors.row((Object)tableId).clear();
    }

    private List<PostTransformer> createTransformers() {
        ArrayList<PostTransformer> list = new ArrayList<PostTransformer>();
        for (TransformRule rule : this.transformRules) {
            String projection = rule.getProjection();
            String filterExpression = rule.getFilter();
            String tableInclusions = rule.getTableInclusions();
            Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
            PostTransformer apply = new PostTransformer(selectors, TransformProjection.of(projection).orElse(null), TransformFilter.of(filterExpression, this.udfDescriptors).orElse(null), PostTransformConverters.of(rule.getPostTransformConverter()).orElse(null), rule.getSupportedMetadataColumns());
            list.add(apply);
        }
        return list;
    }

    private void initializeUdf() {
        this.udfDescriptors = this.udfFunctions.stream().map(UserDefinedFunctionDescriptor::new).collect(Collectors.toList());
        this.udfFunctionInstances = new ArrayList<Object>();
        for (UserDefinedFunctionDescriptor udf : this.udfDescriptors) {
            try {
                Class<?> clazz = Class.forName(udf.getClasspath());
                Object udfInstance = clazz.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                this.udfFunctionInstances.add(udfInstance);
                if (!udf.isCdcPipelineUdf()) continue;
                UserDefinedFunctionContext userDefinedFunctionContext = () -> Configuration.fromMap(udf.getParameters());
                udfInstance.getClass().getMethod("open", UserDefinedFunctionContext.class).invoke(udfInstance, userDefinedFunctionContext);
            }
            catch (ReflectiveOperationException e) {
                throw new RuntimeException("Failed to instantiate UDF function " + udf, e);
            }
        }
    }

    private void destroyUdf() {
        if (this.udfDescriptors == null || this.udfFunctionInstances == null) {
            return;
        }
        for (int i = 0; i < this.udfDescriptors.size(); ++i) {
            UserDefinedFunctionDescriptor udf = this.udfDescriptors.get(i);
            try {
                if (!udf.isCdcPipelineUdf()) continue;
                Object udfInstance = this.udfFunctionInstances.get(i);
                udfInstance.getClass().getMethod("close", new Class[0]).invoke(udfInstance, new Object[0]);
                continue;
            }
            catch (ReflectiveOperationException e) {
                throw new RuntimeException("Failed to destroy UDF " + udf, e);
            }
        }
        this.udfDescriptors.clear();
        this.udfFunctionInstances.clear();
    }

    private <T> Stream<T> optionalToStream(Optional<T> optional) {
        return optional.map(Stream::of).orElseGet(Stream::empty);
    }
}

