/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.schema.coordinator;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaDerivation;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SchemaRegistryRequestHandler
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryRequestHandler.class);
    private final MetadataApplier metadataApplier;
    private final Set<Integer> activeSinkWriters;
    private final SchemaManager schemaManager;
    private final SchemaDerivation schemaDerivation;
    private volatile RequestStatus schemaChangeStatus;
    private final List<Integer> pendingSubTaskIds;
    private final Object schemaChangeRequestLock;
    private volatile Throwable currentChangeException;
    private volatile List<SchemaChangeEvent> currentDerivedSchemaChangeEvents;
    private volatile List<SchemaChangeEvent> currentFinishedSchemaChanges;
    private volatile List<SchemaChangeEvent> currentIgnoredSchemaChanges;
    private final Set<Integer> flushedSinkWriters;
    private final ExecutorService schemaChangeThreadPool;
    private final SchemaChangeBehavior schemaChangeBehavior;
    private final OperatorCoordinator.Context context;

    public SchemaRegistryRequestHandler(MetadataApplier metadataApplier, SchemaManager schemaManager, SchemaDerivation schemaDerivation, SchemaChangeBehavior schemaChangeBehavior, OperatorCoordinator.Context context) {
        this.metadataApplier = metadataApplier;
        this.schemaManager = schemaManager;
        this.schemaDerivation = schemaDerivation;
        this.schemaChangeBehavior = schemaChangeBehavior;
        this.context = context;
        this.activeSinkWriters = ConcurrentHashMap.newKeySet();
        this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
        this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
        this.currentDerivedSchemaChangeEvents = new ArrayList<SchemaChangeEvent>();
        this.currentFinishedSchemaChanges = new ArrayList<SchemaChangeEvent>();
        this.currentIgnoredSchemaChanges = new ArrayList<SchemaChangeEvent>();
        this.schemaChangeStatus = RequestStatus.IDLE;
        this.pendingSubTaskIds = new ArrayList<Integer>();
        this.schemaChangeRequestLock = new Object();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleSchemaChangeRequest(SchemaChangeRequest request, CompletableFuture<CoordinationResponse> response) {
        int requestSubTaskId = request.getSubTaskId();
        Object object = this.schemaChangeRequestLock;
        synchronized (object) {
            if (this.schemaChangeStatus == RequestStatus.IDLE) {
                if (this.pendingSubTaskIds.isEmpty()) {
                    LOG.info("Received schema change event request {} from table {} from subTask {}. Pending list is empty, handling this.", request.getSchemaChangeEvent(), request.getTableId().toString(), requestSubTaskId);
                } else if (this.pendingSubTaskIds.get(0) == requestSubTaskId) {
                    LOG.info("Received schema change event request {} from table {} from subTask {}. It is on the first of the pending list, handling this.", request.getSchemaChangeEvent(), request.getTableId().toString(), requestSubTaskId);
                    this.pendingSubTaskIds.remove(0);
                } else {
                    LOG.info("Received schema change event request {} from table {} from subTask {}. It is not the first of the pending list ({}).", request.getSchemaChangeEvent(), request.getTableId().toString(), requestSubTaskId, this.pendingSubTaskIds);
                    if (!this.pendingSubTaskIds.contains(requestSubTaskId)) {
                        this.pendingSubTaskIds.add(requestSubTaskId);
                    }
                    response.complete(CoordinationResponseUtils.wrap(SchemaChangeResponse.busy()));
                    return;
                }
                SchemaChangeEvent event = request.getSchemaChangeEvent();
                if (this.schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
                    LOG.info("Event {} has been addressed before, ignoring it.", (Object)event);
                    this.clearCurrentSchemaChangeRequest();
                    LOG.info("SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.", (Object)request);
                    response.complete(CoordinationResponseUtils.wrap(SchemaChangeResponse.duplicate()));
                    return;
                }
                this.schemaManager.applyOriginalSchemaChange(event);
                List<SchemaChangeEvent> derivedSchemaChangeEvents = this.calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
                if (derivedSchemaChangeEvents.isEmpty()) {
                    LOG.info("Event {} is omitted from sending to downstream, ignoring it.", (Object)event);
                    this.clearCurrentSchemaChangeRequest();
                    LOG.info("SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.", (Object)request);
                    response.complete(CoordinationResponseUtils.wrap(SchemaChangeResponse.ignored()));
                    return;
                }
                LOG.info("SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.");
                this.schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH;
                this.currentDerivedSchemaChangeEvents = new ArrayList<SchemaChangeEvent>(derivedSchemaChangeEvents);
                response.complete(CoordinationResponseUtils.wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
            } else {
                LOG.info("Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).", request, requestSubTaskId, this.pendingSubTaskIds);
                if (!this.pendingSubTaskIds.contains(requestSubTaskId)) {
                    this.pendingSubTaskIds.add(requestSubTaskId);
                }
                response.complete(CoordinationResponseUtils.wrap(SchemaChangeResponse.busy()));
            }
        }
    }

    private void applySchemaChange(TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) {
        for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {
            if (changeEvent.getType() != SchemaChangeEventType.CREATE_TABLE && this.schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
                this.currentIgnoredSchemaChanges.add(changeEvent);
                continue;
            }
            if (!this.metadataApplier.acceptsSchemaEvolutionType(changeEvent.getType())) {
                LOG.info("Ignored schema change {} to table {}.", (Object)changeEvent, (Object)tableId);
                this.currentIgnoredSchemaChanges.add(changeEvent);
                continue;
            }
            try {
                this.metadataApplier.applySchemaChange(changeEvent);
                LOG.info("Applied schema change {} to table {}.", (Object)changeEvent, (Object)tableId);
                this.schemaManager.applyEvolvedSchemaChange(changeEvent);
                this.currentFinishedSchemaChanges.add(changeEvent);
            }
            catch (Throwable t) {
                LOG.error("Failed to apply schema change {} to table {}. Caused by: {}", changeEvent, tableId, t);
                if (!this.shouldIgnoreException(t)) {
                    this.currentChangeException = t;
                    break;
                }
                LOG.warn("Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}", (Object)changeEvent, (Object)t);
            }
        }
        Preconditions.checkState(this.schemaChangeStatus == RequestStatus.APPLYING, "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not " + (Object)((Object)this.schemaChangeStatus));
        this.schemaChangeStatus = RequestStatus.FINISHED;
        LOG.info("SchemaChangeStatus switched from APPLYING to FINISHED for request {}.", (Object)this.currentDerivedSchemaChangeEvents);
    }

    public void registerSinkWriter(int sinkSubtask) {
        LOG.info("Register sink subtask {}.", (Object)sinkSubtask);
        this.activeSinkWriters.add(sinkSubtask);
    }

    public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) {
        this.flushedSinkWriters.add(sinkSubtask);
        if (this.activeSinkWriters.size() < parallelism) {
            LOG.info("Not all active sink writers have been registered. Current {}, expected {}.", (Object)this.activeSinkWriters.size(), (Object)parallelism);
            return;
        }
        if (this.flushedSinkWriters.equals(this.activeSinkWriters)) {
            Preconditions.checkState(this.schemaChangeStatus == RequestStatus.WAITING_FOR_FLUSH, "Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not " + (Object)((Object)this.schemaChangeStatus));
            this.schemaChangeStatus = RequestStatus.APPLYING;
            LOG.info("All sink subtask have flushed for table {}. Start to apply schema change.", (Object)tableId.toString());
            this.schemaChangeThreadPool.submit(() -> this.applySchemaChange(tableId, this.currentDerivedSchemaChangeEvents));
        }
    }

    public void getSchemaChangeResult(CompletableFuture<CoordinationResponse> response) {
        Preconditions.checkState(this.schemaChangeStatus != RequestStatus.IDLE, "Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");
        if (this.schemaChangeStatus == RequestStatus.FINISHED) {
            this.schemaChangeStatus = RequestStatus.IDLE;
            LOG.info("SchemaChangeStatus switched from FINISHED to IDLE for request {}", (Object)this.currentDerivedSchemaChangeEvents);
            List<SchemaChangeEvent> finishedEvents = this.clearCurrentSchemaChangeRequest();
            SchemaChangeResultResponse resultResponse = new SchemaChangeResultResponse(finishedEvents);
            response.complete(CoordinationResponseUtils.wrap(resultResponse));
        } else {
            response.complete(CoordinationResponseUtils.wrap(new SchemaChangeProcessingResponse()));
        }
    }

    @Override
    public void close() throws IOException {
        if (this.schemaChangeThreadPool != null) {
            this.schemaChangeThreadPool.shutdown();
        }
    }

    private List<SchemaChangeEvent> calculateDerivedSchemaChangeEvents(SchemaChangeEvent event) {
        if (SchemaChangeBehavior.LENIENT.equals((Object)this.schemaChangeBehavior)) {
            return this.schemaDerivation.applySchemaChange(event).stream().flatMap(evt -> this.lenientizeSchemaChangeEvent((SchemaChangeEvent)evt).stream()).collect(Collectors.toList());
        }
        return this.schemaDerivation.applySchemaChange(event);
    }

    private List<SchemaChangeEvent> lenientizeSchemaChangeEvent(SchemaChangeEvent event) {
        if (event instanceof CreateTableEvent) {
            return Collections.singletonList(event);
        }
        TableId tableId = event.tableId();
        Schema evolvedSchema = this.schemaManager.getLatestEvolvedSchema(tableId).orElseThrow(() -> new IllegalStateException("Evolved schema does not exist, not ready for schema change event " + event));
        switch (event.getType()) {
            case ADD_COLUMN: {
                AddColumnEvent addColumnEvent = (AddColumnEvent)event;
                return Collections.singletonList(new AddColumnEvent(tableId, addColumnEvent.getAddedColumns().stream().map(col -> new AddColumnEvent.ColumnWithPosition(Column.physicalColumn(col.getAddColumn().getName(), col.getAddColumn().getType().nullable(), col.getAddColumn().getComment(), col.getAddColumn().getDefaultValueExpression()))).collect(Collectors.toList())));
            }
            case DROP_COLUMN: {
                DropColumnEvent dropColumnEvent = (DropColumnEvent)event;
                Map<String, DataType> convertNullableColumns = dropColumnEvent.getDroppedColumnNames().stream().map(evolvedSchema::getColumn).flatMap(e -> e.map(Stream::of).orElse(Stream.empty())).filter(col -> !col.getType().isNullable()).collect(Collectors.toMap(Column::getName, column -> column.getType().nullable()));
                if (convertNullableColumns.isEmpty()) {
                    return Collections.emptyList();
                }
                return Collections.singletonList(new AlterColumnTypeEvent(tableId, convertNullableColumns));
            }
            case RENAME_COLUMN: {
                RenameColumnEvent renameColumnEvent = (RenameColumnEvent)event;
                ArrayList<AddColumnEvent.ColumnWithPosition> appendColumns = new ArrayList<AddColumnEvent.ColumnWithPosition>();
                HashMap<String, DataType> convertNullableColumns = new HashMap<String, DataType>();
                renameColumnEvent.getNameMapping().forEach((key, value) -> {
                    Column column = evolvedSchema.getColumn((String)key).orElseThrow(() -> new IllegalArgumentException("Non-existed column " + key + " in evolved schema."));
                    if (!column.getType().isNullable()) {
                        convertNullableColumns.put((String)key, column.getType().nullable());
                    }
                    appendColumns.add(new AddColumnEvent.ColumnWithPosition(Column.physicalColumn(value, column.getType().nullable(), column.getComment(), column.getDefaultValueExpression())));
                });
                ArrayList<SchemaChangeEvent> events = new ArrayList<SchemaChangeEvent>();
                events.add(new AddColumnEvent(tableId, appendColumns));
                if (!convertNullableColumns.isEmpty()) {
                    events.add(new AlterColumnTypeEvent(tableId, convertNullableColumns));
                }
                return events;
            }
        }
        return Collections.singletonList(event);
    }

    private boolean shouldIgnoreException(Throwable throwable) {
        return throwable instanceof UnsupportedSchemaChangeEventException && this.schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE;
    }

    private List<SchemaChangeEvent> clearCurrentSchemaChangeRequest() {
        if (this.currentChangeException != null) {
            this.context.failJob((Throwable)new RuntimeException("Failed to apply schema change.", this.currentChangeException));
        }
        ArrayList<SchemaChangeEvent> finishedSchemaChanges = new ArrayList<SchemaChangeEvent>(this.currentFinishedSchemaChanges);
        this.flushedSinkWriters.clear();
        this.currentDerivedSchemaChangeEvents.clear();
        this.currentFinishedSchemaChanges.clear();
        this.currentIgnoredSchemaChanges.clear();
        this.currentChangeException = null;
        return finishedSchemaChanges;
    }

    private static enum RequestStatus {
        IDLE,
        WAITING_FOR_FLUSH,
        APPLYING,
        FINISHED;

    }
}

