/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.transform;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.operators.transform.TableInfo;
import org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler;
import org.apache.flink.cdc.runtime.operators.transform.TransformFilter;
import org.apache.flink.cdc.runtime.operators.transform.TransformFilterProcessor;
import org.apache.flink.cdc.runtime.operators.transform.TransformProjection;
import org.apache.flink.cdc.runtime.operators.transform.TransformProjectionProcessor;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

public class TransformDataOperator
extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event> {
    private SchemaEvolutionClient schemaEvolutionClient;
    private final OperatorID schemaOperatorID;
    private final String timezone;
    private final List<Tuple3<String, String, String>> transformRules;
    private transient List<Tuple4<Selectors, Optional<TransformProjection>, Optional<TransformFilter>, Boolean>> transforms;
    private final Map<TableId, TableInfo> tableInfoMap;
    private transient Map<Tuple2<TableId, TransformProjection>, TransformProjectionProcessor> transformProjectionProcessorMap;
    private transient Map<Tuple2<TableId, TransformFilter>, TransformFilterProcessor> transformFilterProcessorMap;

    public static Builder newBuilder() {
        return new Builder();
    }

    private TransformDataOperator(List<Tuple3<String, String, String>> transformRules, OperatorID schemaOperatorID, String timezone) {
        this.transformRules = transformRules;
        this.schemaOperatorID = schemaOperatorID;
        this.timezone = timezone;
        this.tableInfoMap = new ConcurrentHashMap<TableId, TableInfo>();
        this.transformFilterProcessorMap = new ConcurrentHashMap<Tuple2<TableId, TransformFilter>, TransformFilterProcessor>();
        this.transformProjectionProcessorMap = new ConcurrentHashMap<Tuple2<TableId, TransformProjection>, TransformProjectionProcessor>();
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Event>> output) {
        super.setup(containingTask, config, output);
        this.schemaEvolutionClient = new SchemaEvolutionClient(containingTask.getEnvironment().getOperatorCoordinatorEventGateway(), this.schemaOperatorID);
    }

    public void open() throws Exception {
        super.open();
        this.transforms = this.transformRules.stream().map(tuple3 -> {
            String tableInclusions = (String)tuple3.f0;
            String projection = (String)tuple3.f1;
            String filterExpression = (String)tuple3.f2;
            Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
            return new Tuple4((Object)selectors, TransformProjection.of(projection), TransformFilter.of(filterExpression), (Object)this.containFilteredComputedColumn(projection, filterExpression));
        }).collect(Collectors.toList());
        this.transformFilterProcessorMap = new ConcurrentHashMap<Tuple2<TableId, TransformFilter>, TransformFilterProcessor>();
        this.transformProjectionProcessorMap = new ConcurrentHashMap<Tuple2<TableId, TransformProjection>, TransformProjectionProcessor>();
    }

    public void finish() throws Exception {
        super.finish();
        this.clearOperator();
    }

    public void close() throws Exception {
        super.close();
        this.clearOperator();
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        this.schemaEvolutionClient.registerSubtask(this.getRuntimeContext().getIndexOfThisSubtask());
    }

    public void processElement(StreamRecord<Event> element) throws Exception {
        Optional<DataChangeEvent> dataChangeEventOptional;
        Event event = (Event)element.getValue();
        if (event instanceof SchemaChangeEvent) {
            event = this.cacheSchema((SchemaChangeEvent)event);
            this.output.collect((Object)new StreamRecord((Object)event));
        } else if (event instanceof DataChangeEvent && (dataChangeEventOptional = this.processDataChangeEvent((DataChangeEvent)event)).isPresent()) {
            this.output.collect((Object)new StreamRecord((Object)dataChangeEventOptional.get()));
        }
    }

    private SchemaChangeEvent cacheSchema(SchemaChangeEvent event) throws Exception {
        TableId tableId = event.tableId();
        Schema newSchema = event instanceof CreateTableEvent ? ((CreateTableEvent)event).getSchema() : SchemaUtils.applySchemaChangeEvent(this.getTableInfoFromSchemaEvolutionClient(tableId).getSchema(), event);
        this.transformSchema(tableId, newSchema);
        this.tableInfoMap.put(tableId, TableInfo.of(tableId, newSchema));
        return event;
    }

    private TableInfo getTableInfoFromSchemaEvolutionClient(TableId tableId) throws Exception {
        TableInfo tableInfo = this.tableInfoMap.get(tableId);
        if (tableInfo == null) {
            Optional<Schema> schemaOptional = this.schemaEvolutionClient.getLatestSchema(tableId);
            if (schemaOptional.isPresent()) {
                tableInfo = TableInfo.of(tableId, schemaOptional.get());
            } else {
                throw new RuntimeException("Could not find schema message from SchemaRegistry for " + tableId);
            }
        }
        return tableInfo;
    }

    private void transformSchema(TableId tableId, Schema schema) throws Exception {
        for (Tuple4<Selectors, Optional<TransformProjection>, Optional<TransformFilter>, Boolean> transform : this.transforms) {
            TransformProjection transformProjection;
            Selectors selectors = (Selectors)transform.f0;
            if (!selectors.isMatch(tableId) || !((Optional)transform.f1).isPresent() || !(transformProjection = (TransformProjection)((Optional)transform.f1).get()).isValid()) continue;
            if (!this.transformProjectionProcessorMap.containsKey(Tuple2.of((Object)tableId, (Object)transformProjection))) {
                this.transformProjectionProcessorMap.put((Tuple2<TableId, TransformProjection>)Tuple2.of((Object)tableId, (Object)transformProjection), TransformProjectionProcessor.of(transformProjection));
            }
            TransformProjectionProcessor transformProjectionProcessor = this.transformProjectionProcessorMap.get(Tuple2.of((Object)tableId, (Object)transformProjection));
            transformProjectionProcessor.processSchemaChangeEvent(schema);
        }
    }

    private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataChangeEvent) throws Exception {
        TableId tableId = dataChangeEvent.tableId();
        ArrayList<Optional<DataChangeEvent>> transformedDataChangeEventOptionalList = new ArrayList<Optional<DataChangeEvent>>();
        long epochTime = System.currentTimeMillis();
        for (Tuple4<Selectors, Optional<TransformProjection>, Optional<TransformFilter>, Boolean> tuple4 : this.transforms) {
            Optional transformFilterOptional;
            Selectors selectors = (Selectors)tuple4.f0;
            Boolean isPreProjection = (Boolean)tuple4.f3;
            if (!selectors.isMatch(tableId)) continue;
            Optional<DataChangeEvent> dataChangeEventOptional = Optional.of(dataChangeEvent);
            Optional transformProjectionOptional = (Optional)tuple4.f1;
            if (isPreProjection.booleanValue() && transformProjectionOptional.isPresent() && ((TransformProjection)transformProjectionOptional.get()).isValid()) {
                TransformProjection transformProjection = (TransformProjection)transformProjectionOptional.get();
                if (!this.transformProjectionProcessorMap.containsKey(Tuple2.of((Object)tableId, (Object)transformProjection)) || !this.transformProjectionProcessorMap.get(Tuple2.of((Object)tableId, (Object)transformProjection)).hasTableInfo()) {
                    this.transformProjectionProcessorMap.put((Tuple2<TableId, TransformProjection>)Tuple2.of((Object)tableId, (Object)transformProjection), TransformProjectionProcessor.of(this.getTableInfoFromSchemaEvolutionClient(tableId), transformProjection, this.timezone));
                }
                TransformProjectionProcessor transformProjectionProcessor = this.transformProjectionProcessorMap.get(Tuple2.of((Object)tableId, (Object)transformProjection));
                dataChangeEventOptional = this.processProjection(transformProjectionProcessor, dataChangeEventOptional.get(), epochTime);
            }
            if ((transformFilterOptional = (Optional)tuple4.f2).isPresent() && ((TransformFilter)transformFilterOptional.get()).isVaild()) {
                TransformFilter transformFilter = (TransformFilter)transformFilterOptional.get();
                if (!this.transformFilterProcessorMap.containsKey(Tuple2.of((Object)tableId, (Object)transformFilter))) {
                    this.transformFilterProcessorMap.put((Tuple2<TableId, TransformFilter>)Tuple2.of((Object)tableId, (Object)transformFilter), TransformFilterProcessor.of(this.getTableInfoFromSchemaEvolutionClient(tableId), transformFilter, this.timezone));
                }
                TransformFilterProcessor transformFilterProcessor = this.transformFilterProcessorMap.get(Tuple2.of((Object)tableId, (Object)transformFilter));
                dataChangeEventOptional = this.processFilter(transformFilterProcessor, dataChangeEventOptional.get(), epochTime);
            }
            if (!isPreProjection.booleanValue() && dataChangeEventOptional.isPresent() && transformProjectionOptional.isPresent() && ((TransformProjection)transformProjectionOptional.get()).isValid()) {
                TransformProjection transformProjection = (TransformProjection)transformProjectionOptional.get();
                if (!this.transformProjectionProcessorMap.containsKey(Tuple2.of((Object)tableId, (Object)transformProjection)) || !this.transformProjectionProcessorMap.get(Tuple2.of((Object)tableId, (Object)transformProjection)).hasTableInfo()) {
                    this.transformProjectionProcessorMap.put((Tuple2<TableId, TransformProjection>)Tuple2.of((Object)tableId, (Object)transformProjection), TransformProjectionProcessor.of(this.getTableInfoFromSchemaEvolutionClient(tableId), transformProjection, this.timezone));
                }
                TransformProjectionProcessor transformProjectionProcessor = this.transformProjectionProcessorMap.get(Tuple2.of((Object)tableId, (Object)transformProjection));
                dataChangeEventOptional = this.processProjection(transformProjectionProcessor, dataChangeEventOptional.get(), epochTime);
            }
            transformedDataChangeEventOptionalList.add(dataChangeEventOptional);
        }
        if (transformedDataChangeEventOptionalList.isEmpty()) {
            return Optional.of(dataChangeEvent);
        }
        for (Optional optional : transformedDataChangeEventOptionalList) {
            if (!optional.isPresent()) continue;
            return optional;
        }
        return Optional.empty();
    }

    private Optional<DataChangeEvent> processFilter(TransformFilterProcessor transformFilterProcessor, DataChangeEvent dataChangeEvent, long epochTime) throws Exception {
        BinaryRecordData before = (BinaryRecordData)dataChangeEvent.before();
        BinaryRecordData after = (BinaryRecordData)dataChangeEvent.after();
        if (after != null) {
            if (transformFilterProcessor.process(after, epochTime)) {
                return Optional.of(dataChangeEvent);
            }
            return Optional.empty();
        }
        if (before != null) {
            if (transformFilterProcessor.process(before, epochTime)) {
                return Optional.of(dataChangeEvent);
            }
            return Optional.empty();
        }
        return Optional.empty();
    }

    private Optional<DataChangeEvent> processProjection(TransformProjectionProcessor transformProjectionProcessor, DataChangeEvent dataChangeEvent, long epochTime) throws Exception {
        BinaryRecordData before = (BinaryRecordData)dataChangeEvent.before();
        BinaryRecordData after = (BinaryRecordData)dataChangeEvent.after();
        if (before != null) {
            BinaryRecordData projectedBefore = transformProjectionProcessor.processData(before, epochTime);
            dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore);
        }
        if (after != null) {
            BinaryRecordData projectedAfter = transformProjectionProcessor.processData(after, epochTime);
            dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter);
        }
        return Optional.of(dataChangeEvent);
    }

    private boolean containFilteredComputedColumn(String projection, String filter) {
        boolean contain = false;
        if (StringUtils.isNullOrWhitespaceOnly(projection) || StringUtils.isNullOrWhitespaceOnly(filter)) {
            return contain;
        }
        List<String> computedColumnNames = TransformParser.parseComputedColumnNames(projection);
        List<String> filteredColumnNames = TransformParser.parseFilterColumnNameList(filter);
        for (String computedColumnName : computedColumnNames) {
            if (!filteredColumnNames.contains(computedColumnName)) continue;
            return true;
        }
        return contain;
    }

    private void clearOperator() {
        this.transforms = null;
        this.transformProjectionProcessorMap = null;
        this.transformFilterProcessorMap = null;
        TransformExpressionCompiler.cleanUp();
    }

    public static class Builder {
        private final List<Tuple3<String, String, String>> transformRules = new ArrayList<Tuple3<String, String, String>>();
        private OperatorID schemaOperatorID;
        private String timezone;

        public Builder addTransform(String tableInclusions, @Nullable String projection, @Nullable String filter) {
            this.transformRules.add((Tuple3<String, String, String>)Tuple3.of((Object)tableInclusions, (Object)projection, (Object)filter));
            return this;
        }

        public Builder addSchemaOperatorID(OperatorID schemaOperatorID) {
            this.schemaOperatorID = schemaOperatorID;
            return this;
        }

        public Builder addTimezone(String timezone) {
            this.timezone = PipelineOptions.PIPELINE_LOCAL_TIME_ZONE.defaultValue().equals(timezone) ? ZoneId.systemDefault().toString() : timezone;
            return this;
        }

        public TransformDataOperator build() {
            return new TransformDataOperator(this.transformRules, this.schemaOperatorID, this.timezone);
        }
    }
}

