/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.schema;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
import org.apache.flink.cdc.runtime.operators.schema.metrics.SchemaOperatorMetrics;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SchemaOperator
extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event>,
Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class);
    private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1L);
    private final List<RouteRule> routingRules;
    private transient List<Tuple3<Selectors, String, String>> routes;
    private transient TaskOperatorEventGateway toCoordinator;
    private transient SchemaEvolutionClient schemaEvolutionClient;
    private transient LoadingCache<TableId, Schema> originalSchema;
    private transient LoadingCache<TableId, Schema> evolvedSchema;
    private transient LoadingCache<TableId, Boolean> schemaDivergesMap;
    private transient LoadingCache<TableId, List<TableId>> tableIdMappingCache;
    private final long rpcTimeOutInMillis;
    private final SchemaChangeBehavior schemaChangeBehavior;
    private transient SchemaOperatorMetrics schemaOperatorMetrics;
    private transient int subTaskId;

    @VisibleForTesting
    public SchemaOperator(List<RouteRule> routingRules) {
        this.routingRules = routingRules;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.rpcTimeOutInMillis = PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis();
        this.schemaChangeBehavior = SchemaChangeBehavior.EVOLVE;
    }

    @VisibleForTesting
    public SchemaOperator(List<RouteRule> routingRules, Duration rpcTimeOut) {
        this.routingRules = routingRules;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
        this.schemaChangeBehavior = SchemaChangeBehavior.EVOLVE;
    }

    public SchemaOperator(List<RouteRule> routingRules, Duration rpcTimeOut, SchemaChangeBehavior schemaChangeBehavior) {
        this.routingRules = routingRules;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
        this.schemaChangeBehavior = schemaChangeBehavior;
    }

    public void open() throws Exception {
        super.open();
        this.schemaOperatorMetrics = new SchemaOperatorMetrics((MetricGroup)this.getRuntimeContext().getMetricGroup(), this.schemaChangeBehavior);
        this.subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Event>> output) {
        super.setup(containingTask, config, output);
        this.toCoordinator = containingTask.getEnvironment().getOperatorCoordinatorEventGateway();
        this.routes = this.routingRules.stream().map(rule -> {
            String tableInclusions = rule.sourceTable;
            Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
            return new Tuple3((Object)selectors, (Object)rule.sinkTable, (Object)rule.replaceSymbol);
        }).collect(Collectors.toList());
        this.schemaEvolutionClient = new SchemaEvolutionClient(this.toCoordinator, this.getOperatorID());
        this.evolvedSchema = CacheBuilder.newBuilder().expireAfterAccess(CACHE_EXPIRE_DURATION).build((CacheLoader)new CacheLoader<TableId, Schema>(){

            public Schema load(TableId tableId) {
                return SchemaOperator.this.getLatestEvolvedSchema(tableId);
            }
        });
        this.originalSchema = CacheBuilder.newBuilder().expireAfterAccess(CACHE_EXPIRE_DURATION).build((CacheLoader)new CacheLoader<TableId, Schema>(){

            public Schema load(TableId tableId) throws Exception {
                return SchemaOperator.this.getLatestOriginalSchema(tableId);
            }
        });
        this.schemaDivergesMap = CacheBuilder.newBuilder().expireAfterAccess(CACHE_EXPIRE_DURATION).build((CacheLoader)new CacheLoader<TableId, Boolean>(){

            public Boolean load(TableId tableId) throws Exception {
                return SchemaOperator.this.checkSchemaDiverges(tableId);
            }
        });
        this.tableIdMappingCache = CacheBuilder.newBuilder().expireAfterAccess(CACHE_EXPIRE_DURATION).build((CacheLoader)new CacheLoader<TableId, List<TableId>>(){

            public List<TableId> load(TableId tableId) {
                return SchemaOperator.this.getRoutedTables(tableId);
            }
        });
    }

    public void processElement(StreamRecord<Event> streamRecord) throws InterruptedException, TimeoutException, ExecutionException {
        Event event = (Event)streamRecord.getValue();
        if (event instanceof SchemaChangeEvent) {
            this.processSchemaChangeEvents((SchemaChangeEvent)event);
        } else if (event instanceof DataChangeEvent) {
            this.processDataChangeEvents(streamRecord, (DataChangeEvent)event);
        } else {
            throw new RuntimeException("Unknown event type in Stream record: " + event);
        }
    }

    private void processSchemaChangeEvents(SchemaChangeEvent event) throws InterruptedException, TimeoutException, ExecutionException {
        TableId tableId = event.tableId();
        LOG.info("{}> Table {} received SchemaChangeEvent {} and start to be blocked.", new Object[]{this.subTaskId, tableId, event});
        this.handleSchemaChangeEvent(tableId, event);
        this.originalSchema.put((Object)tableId, (Object)this.getLatestOriginalSchema(tableId));
        this.schemaDivergesMap.put((Object)tableId, (Object)this.checkSchemaDiverges(tableId));
        List<TableId> optionalRoutedTable = this.getRoutedTables(tableId);
        if (!optionalRoutedTable.isEmpty()) {
            ((List)this.tableIdMappingCache.get((Object)tableId)).forEach(routed -> this.evolvedSchema.put(routed, (Object)this.getLatestEvolvedSchema((TableId)routed)));
        } else {
            this.evolvedSchema.put((Object)tableId, (Object)this.getLatestEvolvedSchema(tableId));
        }
    }

    private void processDataChangeEvents(StreamRecord<Event> streamRecord, DataChangeEvent event) {
        TableId tableId = event.tableId();
        List<TableId> optionalRoutedTable = this.getRoutedTables(tableId);
        if (!optionalRoutedTable.isEmpty()) {
            optionalRoutedTable.forEach(evolvedTableId -> this.output.collect((Object)new StreamRecord((Object)this.normalizeSchemaChangeEvents(event, (TableId)evolvedTableId, false))));
        } else if (Boolean.FALSE.equals(this.schemaDivergesMap.getIfPresent((Object)tableId))) {
            this.output.collect((Object)new StreamRecord((Object)this.normalizeSchemaChangeEvents(event, true)));
        } else {
            this.output.collect(streamRecord);
        }
    }

    private DataChangeEvent normalizeSchemaChangeEvents(DataChangeEvent event, boolean tolerantMode) {
        return this.normalizeSchemaChangeEvents(event, event.tableId(), tolerantMode);
    }

    private DataChangeEvent normalizeSchemaChangeEvents(DataChangeEvent event, TableId renamedTableId, boolean tolerantMode) {
        try {
            Schema originalSchema = (Schema)this.originalSchema.get((Object)event.tableId());
            Schema evolvedTableSchema = (Schema)this.evolvedSchema.get((Object)renamedTableId);
            if (originalSchema.equals((Object)evolvedTableSchema)) {
                return ChangeEventUtils.recreateDataChangeEvent((DataChangeEvent)event, (TableId)renamedTableId);
            }
            switch (event.op()) {
                case INSERT: {
                    return DataChangeEvent.insertEvent((TableId)renamedTableId, (RecordData)this.regenerateRecordData(event.after(), originalSchema, evolvedTableSchema, tolerantMode), (Map)event.meta());
                }
                case UPDATE: {
                    return DataChangeEvent.updateEvent((TableId)renamedTableId, (RecordData)this.regenerateRecordData(event.before(), originalSchema, evolvedTableSchema, tolerantMode), (RecordData)this.regenerateRecordData(event.after(), originalSchema, evolvedTableSchema, tolerantMode), (Map)event.meta());
                }
                case DELETE: {
                    return DataChangeEvent.deleteEvent((TableId)renamedTableId, (RecordData)this.regenerateRecordData(event.before(), originalSchema, evolvedTableSchema, tolerantMode), (Map)event.meta());
                }
                case REPLACE: {
                    return DataChangeEvent.replaceEvent((TableId)renamedTableId, (RecordData)this.regenerateRecordData(event.after(), originalSchema, evolvedTableSchema, tolerantMode), (Map)event.meta());
                }
            }
            throw new IllegalArgumentException(String.format("Unrecognized operation type \"%s\"", event.op()));
        }
        catch (Exception e) {
            throw new IllegalStateException("Unable to fill null for empty columns", e);
        }
    }

    private RecordData regenerateRecordData(RecordData recordData, Schema originalSchema, Schema routedTableSchema, boolean tolerantMode) {
        ArrayList<RecordData.FieldGetter> fieldGetters = new ArrayList<RecordData.FieldGetter>();
        for (Column column : routedTableSchema.getColumns()) {
            String columnName = column.getName();
            int columnIndex = originalSchema.getColumnNames().indexOf(columnName);
            if (columnIndex == -1) {
                fieldGetters.add(new NullFieldGetter());
                continue;
            }
            RecordData.FieldGetter fieldGetter2 = RecordData.createFieldGetter((DataType)((Column)originalSchema.getColumn(columnName).get()).getType(), (int)columnIndex);
            if (((Column)originalSchema.getColumn(columnName).get()).getType().nullable().equals((Object)column.getType().nullable())) {
                fieldGetters.add(fieldGetter2);
                continue;
            }
            fieldGetters.add(new TypeCoercionFieldGetter(column.getType(), fieldGetter2, tolerantMode));
        }
        BinaryRecordDataGenerator recordDataGenerator = new BinaryRecordDataGenerator(routedTableSchema.getColumnDataTypes().toArray(new DataType[0]));
        return recordDataGenerator.generate(fieldGetters.stream().map(fieldGetter -> fieldGetter.getFieldOrNull(recordData)).toArray());
    }

    private List<TableId> getRoutedTables(TableId originalTableId) {
        return this.routes.stream().filter(route -> ((Selectors)route.f0).isMatch(originalTableId)).map(route -> this.resolveReplacement(originalTableId, (Tuple3<Selectors, String, String>)route)).collect(Collectors.toList());
    }

    private TableId resolveReplacement(TableId originalTable, Tuple3<Selectors, String, String> route) {
        if (route.f2 != null) {
            return TableId.parse((String)((String)route.f1).replace((CharSequence)route.f2, originalTable.getTableName()));
        }
        return TableId.parse((String)((String)route.f1));
    }

    private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) throws InterruptedException, TimeoutException {
        if (this.schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION && schemaChangeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) {
            throw new RuntimeException(String.format("Refused to apply schema change event %s in EXCEPTION mode.", schemaChangeEvent));
        }
        SchemaChangeResponse response = this.requestSchemaChange(tableId, schemaChangeEvent);
        if (response.isAccepted()) {
            LOG.info("{}> Sending the FlushEvent for table {}.", (Object)this.subTaskId, (Object)tableId);
            this.output.collect((Object)new StreamRecord((Object)new FlushEvent(tableId)));
            List<SchemaChangeEvent> expectedSchemaChangeEvents = response.getSchemaChangeEvents();
            this.schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());
            SchemaChangeResultResponse schemaEvolveResponse = this.requestSchemaChangeResult();
            List<SchemaChangeEvent> finishedSchemaChangeEvents = schemaEvolveResponse.getFinishedSchemaChangeEvents();
            finishedSchemaChangeEvents.forEach(e -> this.output.collect((Object)new StreamRecord(e)));
        } else if (response.isDuplicate()) {
            LOG.info("{}> Schema change event {} has been handled in another subTask already.", (Object)this.subTaskId, (Object)schemaChangeEvent);
        } else if (response.isIgnored()) {
            LOG.info("{}> Schema change event {} has been ignored. No schema evolution needed.", (Object)this.subTaskId, (Object)schemaChangeEvent);
        } else {
            throw new IllegalStateException("Unexpected response status " + response);
        }
    }

    private SchemaChangeResponse requestSchemaChange(TableId tableId, SchemaChangeEvent schemaChangeEvent) throws InterruptedException, TimeoutException {
        SchemaChangeResponse response;
        long schemaEvolveTimeOutMillis = System.currentTimeMillis() + this.rpcTimeOutInMillis;
        while ((response = (SchemaChangeResponse)this.sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent, this.subTaskId))).isRegistryBusy()) {
            if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
                LOG.info("{}> Schema Registry is busy now, waiting for next request...", (Object)this.subTaskId);
                Thread.sleep(1000L);
                continue;
            }
            throw new TimeoutException("TimeOut when requesting schema change");
        }
        return response;
    }

    private SchemaChangeResultResponse requestSchemaChangeResult() throws InterruptedException, TimeoutException {
        Object coordinationResponse = this.sendRequestToCoordinator(new SchemaChangeResultRequest());
        long nextRpcTimeOutMillis = System.currentTimeMillis() + this.rpcTimeOutInMillis;
        while (coordinationResponse instanceof SchemaChangeProcessingResponse) {
            if (System.currentTimeMillis() < nextRpcTimeOutMillis) {
                Thread.sleep(1000L);
                coordinationResponse = this.sendRequestToCoordinator(new SchemaChangeResultRequest());
                continue;
            }
            throw new TimeoutException("TimeOut when requesting release upstream");
        }
        return (SchemaChangeResultResponse)coordinationResponse;
    }

    private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse> RESPONSE sendRequestToCoordinator(REQUEST request) {
        try {
            CompletableFuture responseFuture = this.toCoordinator.sendRequestToCoordinator(this.getOperatorID(), new SerializedValue(request));
            return (RESPONSE)CoordinationResponseUtils.unwrap((CoordinationResponse)responseFuture.get());
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed to send request to coordinator: " + request.toString(), e);
        }
    }

    private Schema getLatestEvolvedSchema(TableId tableId) {
        try {
            Optional<Schema> optionalSchema = this.schemaEvolutionClient.getLatestEvolvedSchema(tableId);
            if (!optionalSchema.isPresent()) {
                throw new IllegalStateException(String.format("Schema doesn't exist for table \"%s\"", tableId));
            }
            return optionalSchema.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(String.format("Unable to get latest schema for table \"%s\"", tableId), e);
        }
    }

    private Schema getLatestOriginalSchema(TableId tableId) {
        try {
            Optional<Schema> optionalSchema = this.schemaEvolutionClient.getLatestOriginalSchema(tableId);
            if (!optionalSchema.isPresent()) {
                throw new IllegalStateException(String.format("Schema doesn't exist for table \"%s\"", tableId));
            }
            return optionalSchema.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(String.format("Unable to get latest schema for table \"%s\"", tableId), e);
        }
    }

    private Boolean checkSchemaDiverges(TableId tableId) {
        try {
            return this.getLatestEvolvedSchema(tableId).equals((Object)this.getLatestOriginalSchema(tableId));
        }
        catch (IllegalStateException e) {
            return true;
        }
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
    }

    private static class TypeCoercionFieldGetter
    implements RecordData.FieldGetter {
        private final DataType destinationType;
        private final RecordData.FieldGetter originalFieldGetter;
        private final boolean tolerantMode;

        public TypeCoercionFieldGetter(DataType destinationType, RecordData.FieldGetter originalFieldGetter, boolean tolerantMode) {
            this.destinationType = destinationType;
            this.originalFieldGetter = originalFieldGetter;
            this.tolerantMode = tolerantMode;
        }

        private Object fail(IllegalArgumentException e) throws IllegalArgumentException {
            if (this.tolerantMode) {
                return null;
            }
            throw e;
        }

        @Nullable
        public Object getFieldOrNull(RecordData recordData) {
            Object originalField = this.originalFieldGetter.getFieldOrNull(recordData);
            if (originalField == null) {
                return null;
            }
            if (this.destinationType.is(DataTypeRoot.BIGINT)) {
                if (originalField instanceof Byte) {
                    return ((Byte)originalField).longValue();
                }
                if (originalField instanceof Short) {
                    return ((Short)originalField).longValue();
                }
                if (originalField instanceof Integer) {
                    return ((Integer)originalField).longValue();
                }
                return this.fail(new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a BIGINT column. Currently only TINYINT / SMALLINT / INT can be accepted by a BIGINT column", originalField.getClass())));
            }
            if (this.destinationType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
                if (originalField instanceof Float) {
                    return ((Float)originalField).doubleValue();
                }
                return this.fail(new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a DOUBLE column. Currently only FLOAT can be accepted by a DOUBLE column", originalField.getClass())));
            }
            if (this.destinationType.is(DataTypeRoot.VARCHAR)) {
                if (originalField instanceof StringData) {
                    return originalField;
                }
                return this.fail(new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a STRING column. Currently only CHAR / VARCHAR can be accepted by a STRING column", originalField.getClass())));
            }
            return this.fail(new IllegalArgumentException(String.format("Column type \"%s\" doesn't support type coercion", this.destinationType)));
        }
    }

    private static class NullFieldGetter
    implements RecordData.FieldGetter {
        private NullFieldGetter() {
        }

        @Nullable
        public Object getFieldOrNull(RecordData recordData) {
            return null;
        }
    }
}

