/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.transform;

import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.transform.PostTransformChangeInfo;
import org.apache.flink.cdc.runtime.operators.transform.PostTransformer;
import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn;
import org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler;
import org.apache.flink.cdc.runtime.operators.transform.TransformFilter;
import org.apache.flink.cdc.runtime.operators.transform.TransformFilterProcessor;
import org.apache.flink.cdc.runtime.operators.transform.TransformProjection;
import org.apache.flink.cdc.runtime.operators.transform.TransformProjectionProcessor;
import org.apache.flink.cdc.runtime.operators.transform.TransformRule;
import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

public class PostTransformOperator
extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event>,
Serializable {
    private static final long serialVersionUID = 1L;
    private final String timezone;
    private final List<TransformRule> transformRules;
    private transient List<PostTransformer> transforms;
    private final Map<TableId, PostTransformChangeInfo> postTransformChangeInfoMap;
    private final List<Tuple2<String, String>> udfFunctions;
    private List<UserDefinedFunctionDescriptor> udfDescriptors;
    private transient Map<String, Object> udfFunctionInstances;
    private transient Map<Tuple2<TableId, TransformProjection>, TransformProjectionProcessor> transformProjectionProcessorMap;
    private transient Map<Tuple2<TableId, TransformFilter>, TransformFilterProcessor> transformFilterProcessorMap;
    private final Map<TableId, Boolean> hasAsteriskMap;
    private final Map<TableId, List<String>> projectedColumnsMap;

    public static Builder newBuilder() {
        return new Builder();
    }

    private PostTransformOperator(List<TransformRule> transformRules, String timezone, List<Tuple2<String, String>> udfFunctions) {
        this.transformRules = transformRules;
        this.timezone = timezone;
        this.postTransformChangeInfoMap = new ConcurrentHashMap<TableId, PostTransformChangeInfo>();
        this.transformFilterProcessorMap = new ConcurrentHashMap<Tuple2<TableId, TransformFilter>, TransformFilterProcessor>();
        this.transformProjectionProcessorMap = new ConcurrentHashMap<Tuple2<TableId, TransformProjection>, TransformProjectionProcessor>();
        this.udfFunctions = udfFunctions;
        this.udfFunctionInstances = new ConcurrentHashMap<String, Object>();
        this.hasAsteriskMap = new HashMap<TableId, Boolean>();
        this.projectedColumnsMap = new HashMap<TableId, List<String>>();
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Event>> output) {
        super.setup(containingTask, config, output);
        this.udfDescriptors = this.udfFunctions.stream().map(udf -> new UserDefinedFunctionDescriptor((String)udf.f0, (String)udf.f1)).collect(Collectors.toList());
    }

    public void open() throws Exception {
        super.open();
        this.transforms = this.transformRules.stream().map(tuple3 -> {
            String tableInclusions = tuple3.getTableInclusions();
            String projection = tuple3.getProjection();
            String filterExpression = tuple3.getFilter();
            Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
            return new PostTransformer(selectors, TransformProjection.of(projection).orElse(null), TransformFilter.of(filterExpression, this.udfDescriptors).orElse(null));
        }).collect(Collectors.toList());
        this.transformProjectionProcessorMap = new ConcurrentHashMap<Tuple2<TableId, TransformProjection>, TransformProjectionProcessor>();
        this.transformFilterProcessorMap = new ConcurrentHashMap<Tuple2<TableId, TransformFilter>, TransformFilterProcessor>();
        this.udfFunctionInstances = new ConcurrentHashMap<String, Object>();
        this.udfDescriptors.forEach(udf -> {
            try {
                Class<?> clazz = Class.forName(udf.getClasspath());
                this.udfFunctionInstances.put(udf.getName(), clazz.newInstance());
            }
            catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
                throw new RuntimeException("Failed to instantiate UDF function " + udf);
            }
        });
        this.initializeUdf();
    }

    public void finish() throws Exception {
        super.finish();
        this.clearOperator();
    }

    public void close() throws Exception {
        super.close();
        this.clearOperator();
        this.destroyUdf();
        this.udfFunctionInstances.clear();
    }

    public void processElement(StreamRecord<Event> element) throws Exception {
        Optional<DataChangeEvent> dataChangeEventOptional;
        Event event = (Event)element.getValue();
        if (event instanceof SchemaChangeEvent) {
            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent)event;
            this.transformProjectionProcessorMap.keySet().removeIf(e -> Objects.equals(e.f0, schemaChangeEvent.tableId()));
            this.transformFilterProcessorMap.keySet().removeIf(e -> Objects.equals(e.f0, schemaChangeEvent.tableId()));
            this.cacheSchema(schemaChangeEvent).ifPresent(e -> this.output.collect((Object)new StreamRecord(e)));
        } else if (event instanceof DataChangeEvent && (dataChangeEventOptional = this.processDataChangeEvent((DataChangeEvent)event)).isPresent()) {
            this.output.collect((Object)new StreamRecord((Object)dataChangeEventOptional.get()));
        }
    }

    private Optional<SchemaChangeEvent> cacheSchema(SchemaChangeEvent event) throws Exception {
        Schema schema;
        TableId tableId = event.tableId();
        List<String> columnNamesBeforeChange = Collections.emptyList();
        if (event instanceof CreateTableEvent) {
            CreateTableEvent createTableEvent = (CreateTableEvent)event;
            Set projectedColumnsSet = this.transforms.stream().filter(t -> t.getSelectors().isMatch(tableId)).flatMap(rule -> TransformParser.generateProjectionColumns(rule.getProjection().map(TransformProjection::getProjection).orElse(null), createTableEvent.getSchema().getColumns(), this.udfDescriptors).stream()).map(ProjectionColumn::getColumnName).collect(Collectors.toSet());
            boolean notTransformed = this.transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId));
            if (notTransformed) {
                this.hasAsteriskMap.put(tableId, true);
            } else {
                boolean hasAsterisk = this.transforms.stream().filter(t -> t.getSelectors().isMatch(tableId)).anyMatch(t -> TransformParser.hasAsterisk(t.getProjection().map(TransformProjection::getProjection).orElse(null)));
                this.hasAsteriskMap.put(tableId, hasAsterisk);
            }
            this.projectedColumnsMap.put(tableId, createTableEvent.getSchema().getColumnNames().stream().filter(projectedColumnsSet::contains).collect(Collectors.toList()));
        } else {
            columnNamesBeforeChange = this.getPostTransformChangeInfo(tableId).getPreTransformedSchema().getColumnNames();
        }
        if (event instanceof CreateTableEvent) {
            CreateTableEvent createTableEvent = (CreateTableEvent)event;
            schema = createTableEvent.getSchema();
        } else {
            schema = SchemaUtils.applySchemaChangeEvent(this.getPostTransformChangeInfo(tableId).getPreTransformedSchema(), event);
        }
        Schema projectedSchema = this.transformSchema(tableId, schema);
        this.postTransformChangeInfoMap.put(tableId, PostTransformChangeInfo.of(tableId, projectedSchema, schema));
        if (event instanceof CreateTableEvent) {
            return Optional.of(new CreateTableEvent(tableId, projectedSchema));
        }
        if (this.hasAsteriskMap.getOrDefault(tableId, true).booleanValue()) {
            return SchemaUtils.transformSchemaChangeEvent(true, columnNamesBeforeChange, event);
        }
        return SchemaUtils.transformSchemaChangeEvent(false, this.projectedColumnsMap.get(tableId), event);
    }

    private PostTransformChangeInfo getPostTransformChangeInfo(TableId tableId) {
        PostTransformChangeInfo tableInfo = this.postTransformChangeInfoMap.get(tableId);
        if (tableInfo == null) {
            throw new RuntimeException("Schema for " + tableId + " not found. This shouldn't happen.");
        }
        return tableInfo;
    }

    private Schema transformSchema(TableId tableId, Schema schema) throws Exception {
        ArrayList<Schema> newSchemas = new ArrayList<Schema>();
        for (PostTransformer transform : this.transforms) {
            TransformProjection transformProjection;
            Selectors selectors = transform.getSelectors();
            if (!selectors.isMatch(tableId) || !transform.getProjection().isPresent() || !(transformProjection = transform.getProjection().get()).isValid()) continue;
            if (!this.transformProjectionProcessorMap.containsKey(Tuple2.of((Object)tableId, (Object)transformProjection))) {
                this.transformProjectionProcessorMap.put((Tuple2<TableId, TransformProjection>)Tuple2.of((Object)tableId, (Object)transformProjection), TransformProjectionProcessor.of(transformProjection, this.timezone, this.udfDescriptors, this.getUdfFunctionInstances()));
            }
            TransformProjectionProcessor postTransformProcessor = this.transformProjectionProcessorMap.get(Tuple2.of((Object)tableId, (Object)transformProjection));
            newSchemas.add(postTransformProcessor.processSchemaChangeEvent(schema));
        }
        if (newSchemas.isEmpty()) {
            return schema;
        }
        return SchemaUtils.inferWiderSchema(newSchemas);
    }

    private List<Object> getUdfFunctionInstances() {
        return this.udfDescriptors.stream().map(e -> this.udfFunctionInstances.get(e.getName())).collect(Collectors.toList());
    }

    private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataChangeEvent) throws Exception {
        TableId tableId = dataChangeEvent.tableId();
        PostTransformChangeInfo tableInfo = this.getPostTransformChangeInfo(tableId);
        ArrayList<Optional<DataChangeEvent>> transformedDataChangeEventOptionalList = new ArrayList<Optional<DataChangeEvent>>();
        long epochTime = System.currentTimeMillis();
        for (PostTransformer postTransformer : this.transforms) {
            Selectors selectors = postTransformer.getSelectors();
            if (!selectors.isMatch(tableId)) continue;
            Optional<DataChangeEvent> dataChangeEventOptional = Optional.of(dataChangeEvent);
            Optional<TransformProjection> transformProjectionOptional = postTransformer.getProjection();
            Optional<TransformFilter> transformFilterOptional = postTransformer.getFilter();
            if (transformFilterOptional.isPresent() && transformFilterOptional.get().isVaild()) {
                TransformFilter transformFilter = transformFilterOptional.get();
                if (!this.transformFilterProcessorMap.containsKey(Tuple2.of((Object)tableId, (Object)transformFilter))) {
                    this.transformFilterProcessorMap.put((Tuple2<TableId, TransformFilter>)Tuple2.of((Object)tableId, (Object)transformFilter), TransformFilterProcessor.of(tableInfo, transformFilter, this.timezone, this.udfDescriptors, this.getUdfFunctionInstances()));
                }
                TransformFilterProcessor transformFilterProcessor = this.transformFilterProcessorMap.get(Tuple2.of((Object)tableId, (Object)transformFilter));
                dataChangeEventOptional = this.processFilter(transformFilterProcessor, dataChangeEventOptional.get(), epochTime);
            }
            if (dataChangeEventOptional.isPresent() && transformProjectionOptional.isPresent() && transformProjectionOptional.get().isValid()) {
                TransformProjection transformProjection = transformProjectionOptional.get();
                if (!this.transformProjectionProcessorMap.containsKey(Tuple2.of((Object)tableId, (Object)transformProjection)) || !this.transformProjectionProcessorMap.get(Tuple2.of((Object)tableId, (Object)transformProjection)).hasTableInfo()) {
                    this.transformProjectionProcessorMap.put((Tuple2<TableId, TransformProjection>)Tuple2.of((Object)tableId, (Object)transformProjection), TransformProjectionProcessor.of(tableInfo, transformProjection, this.timezone, this.udfDescriptors, this.getUdfFunctionInstances()));
                }
                TransformProjectionProcessor postTransformProcessor = this.transformProjectionProcessorMap.get(Tuple2.of((Object)tableId, (Object)transformProjection));
                dataChangeEventOptional = this.processProjection(postTransformProcessor, dataChangeEventOptional.get(), epochTime);
            }
            transformedDataChangeEventOptionalList.add(dataChangeEventOptional);
        }
        if (transformedDataChangeEventOptionalList.isEmpty()) {
            return this.processPostProjection(tableInfo, dataChangeEvent);
        }
        for (Optional optional : transformedDataChangeEventOptionalList) {
            if (!optional.isPresent()) continue;
            return this.processPostProjection(tableInfo, (DataChangeEvent)optional.get());
        }
        return Optional.empty();
    }

    private Optional<DataChangeEvent> processFilter(TransformFilterProcessor transformFilterProcessor, DataChangeEvent dataChangeEvent, long epochTime) throws Exception {
        BinaryRecordData before = (BinaryRecordData)dataChangeEvent.before();
        BinaryRecordData after = (BinaryRecordData)dataChangeEvent.after();
        if (after != null) {
            if (transformFilterProcessor.process(after, epochTime, this.opTypeToRowKind(dataChangeEvent.op(), '+'))) {
                return Optional.of(dataChangeEvent);
            }
            return Optional.empty();
        }
        if (before != null) {
            if (transformFilterProcessor.process(before, epochTime, this.opTypeToRowKind(dataChangeEvent.op(), '-'))) {
                return Optional.of(dataChangeEvent);
            }
            return Optional.empty();
        }
        return Optional.empty();
    }

    private Optional<DataChangeEvent> processProjection(TransformProjectionProcessor postTransformProcessor, DataChangeEvent dataChangeEvent, long epochTime) {
        BinaryRecordData before = (BinaryRecordData)dataChangeEvent.before();
        BinaryRecordData after = (BinaryRecordData)dataChangeEvent.after();
        if (before != null) {
            BinaryRecordData projectedBefore = postTransformProcessor.processData(before, epochTime, this.opTypeToRowKind(dataChangeEvent.op(), '-'));
            dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore);
        }
        if (after != null) {
            BinaryRecordData projectedAfter = postTransformProcessor.processData(after, epochTime, this.opTypeToRowKind(dataChangeEvent.op(), '+'));
            dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter);
        }
        return Optional.of(dataChangeEvent);
    }

    private Optional<DataChangeEvent> processPostProjection(PostTransformChangeInfo tableInfo, DataChangeEvent dataChangeEvent) throws Exception {
        BinaryRecordData before = (BinaryRecordData)dataChangeEvent.before();
        BinaryRecordData after = (BinaryRecordData)dataChangeEvent.after();
        if (before != null) {
            BinaryRecordData projectedBefore = this.projectRecord(tableInfo, before);
            dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore);
        }
        if (after != null) {
            BinaryRecordData projectedAfter = this.projectRecord(tableInfo, after);
            dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter);
        }
        return Optional.of(dataChangeEvent);
    }

    private BinaryRecordData projectRecord(PostTransformChangeInfo tableInfo, BinaryRecordData recordData) {
        RecordData.FieldGetter[] fieldGetters;
        ArrayList<Object> valueList = new ArrayList<Object>();
        for (RecordData.FieldGetter fieldGetter : fieldGetters = tableInfo.getPostTransformedFieldGetters()) {
            valueList.add(fieldGetter.getFieldOrNull(recordData));
        }
        return tableInfo.getRecordDataGenerator().generate(valueList.toArray(new Object[valueList.size()]));
    }

    private void clearOperator() {
        this.transforms = null;
        this.transformProjectionProcessorMap = null;
        this.transformFilterProcessorMap = null;
        TransformExpressionCompiler.cleanUp();
    }

    private void initializeUdf() {
        this.udfDescriptors.forEach(udf -> {
            try {
                if (udf.isCdcPipelineUdf()) {
                    Object udfInstance = this.udfFunctionInstances.get(udf.getName());
                    udfInstance.getClass().getMethod("open", new Class[0]).invoke(udfInstance, new Object[0]);
                }
            }
            catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException ex) {
                throw new RuntimeException("Failed to initialize UDF " + udf, ex);
            }
        });
    }

    private void destroyUdf() {
        this.udfDescriptors.forEach(udf -> {
            try {
                if (udf.isCdcPipelineUdf()) {
                    Object udfInstance = this.udfFunctionInstances.get(udf.getName());
                    udfInstance.getClass().getMethod("close", new Class[0]).invoke(udfInstance, new Object[0]);
                }
            }
            catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException ex) {
                throw new RuntimeException("Failed to destroy UDF " + udf, ex);
            }
        });
    }

    private String opTypeToRowKind(OperationType opType, char beforeOrAfter) {
        return String.format("%c%c", Character.valueOf(beforeOrAfter), Character.valueOf(opType.name().charAt(0)));
    }

    public static class Builder {
        private final List<TransformRule> transformRules = new ArrayList<TransformRule>();
        private String timezone;
        private final List<Tuple2<String, String>> udfFunctions = new ArrayList<Tuple2<String, String>>();

        public Builder addTransform(String tableInclusions, @Nullable String projection, @Nullable String filter, String primaryKey, String partitionKey, String tableOptions) {
            this.transformRules.add(new TransformRule(tableInclusions, projection, filter, primaryKey, partitionKey, tableOptions));
            return this;
        }

        public Builder addTransform(String tableInclusions, @Nullable String projection, @Nullable String filter) {
            this.transformRules.add(new TransformRule(tableInclusions, projection, filter, "", "", ""));
            return this;
        }

        public Builder addTimezone(String timezone) {
            this.timezone = PipelineOptions.PIPELINE_LOCAL_TIME_ZONE.defaultValue().equals(timezone) ? ZoneId.systemDefault().toString() : timezone;
            return this;
        }

        public Builder addUdfFunctions(List<Tuple2<String, String>> udfFunctions) {
            this.udfFunctions.addAll(udfFunctions);
            return this;
        }

        public PostTransformOperator build() {
            return new PostTransformOperator(this.transformRules, this.timezone, this.udfFunctions);
        }
    }
}

