/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.schema.distributed;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.SchemaMergingUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry;
import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent;
import org.apache.flink.cdc.runtime.operators.schema.common.event.GetOriginalSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeResponse;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.shaded.guava31.com.google.common.collect.HashBasedTable;
import org.apache.flink.shaded.guava31.com.google.common.collect.HashMultimap;
import org.apache.flink.shaded.guava31.com.google.common.collect.Multimap;
import org.apache.flink.shaded.guava31.com.google.common.collect.Table;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchemaCoordinator
extends SchemaRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaCoordinator.class);
    private transient AtomicReference<RequestStatus> evolvingStatus;
    private transient Map<Integer, Tuple2<SchemaChangeRequest, CompletableFuture<CoordinationResponse>>> pendingRequests;
    protected transient Set<Integer> flushedSinkWriters;
    private transient Table<TableId, Integer, Schema> upstreamSchemaTable;
    private transient Multimap<Tuple2<Integer, SchemaChangeEvent>, Integer> alreadyHandledSchemaChangeEvents;
    private final ExecutorService schemaChangeThreadPool = Executors.newSingleThreadExecutor();

    public SchemaCoordinator(String operatorName, OperatorCoordinator.Context context, ExecutorService coordinatorExecutor, MetadataApplier metadataApplier, List<RouteRule> routingRules, SchemaChangeBehavior schemaChangeBehavior, Duration rpcTimeout) {
        super(context, operatorName, coordinatorExecutor, metadataApplier, routingRules, schemaChangeBehavior, rpcTimeout);
    }

    @Override
    public void start() throws Exception {
        super.start();
        this.evolvingStatus = new AtomicReference<RequestStatus>(RequestStatus.IDLE);
        this.pendingRequests = new ConcurrentHashMap<Integer, Tuple2<SchemaChangeRequest, CompletableFuture<CoordinationResponse>>>();
        this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
        this.upstreamSchemaTable = HashBasedTable.create();
        this.alreadyHandledSchemaChangeEvents = HashMultimap.create();
        LOG.info("Started SchemaRegistry for {}. Parallelism: {}", (Object)this.operatorName, (Object)this.currentParallelism);
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.schemaChangeThreadPool != null && !this.schemaChangeThreadPool.isShutdown()) {
            this.schemaChangeThreadPool.shutdownNow();
        }
    }

    @Override
    protected void snapshot(CompletableFuture<byte[]> resultFuture) throws Exception {
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
             DataOutputStream out = new DataOutputStream(baos);){
            int schemaManagerSerializerVersion = SchemaManager.SERIALIZER.getVersion();
            out.writeInt(schemaManagerSerializerVersion);
            byte[] serializedSchemaManager = SchemaManager.SERIALIZER.serialize(this.schemaManager);
            out.writeInt(serializedSchemaManager.length);
            out.write(serializedSchemaManager);
            resultFuture.complete(baos.toByteArray());
        }
    }

    @Override
    protected void restore(byte[] checkpointData) throws Exception {
        try (ByteArrayInputStream bais = new ByteArrayInputStream(checkpointData);
             DataInputStream in = new DataInputStream(bais);){
            int schemaManagerSerializerVersion = in.readInt();
            int length = in.readInt();
            byte[] serializedSchemaManager = new byte[length];
            in.readFully(serializedSchemaManager);
            this.schemaManager = SchemaManager.SERIALIZER.deserialize(schemaManagerSerializerVersion, serializedSchemaManager);
        }
    }

    @Override
    protected void handleGetOriginalSchemaRequest(GetOriginalSchemaRequest request, CompletableFuture<CoordinationResponse> responseFuture) {
        throw new UnsupportedOperationException("In distributed topology, there's no centralized upstream schema table since they may evolve independently in various partitions.");
    }

    @Override
    protected void handleCustomCoordinationRequest(CoordinationRequest request, CompletableFuture<CoordinationResponse> responseFuture) throws Exception {
        if (!(request instanceof SchemaChangeRequest)) {
            throw new UnsupportedOperationException("Unknown coordination request type: " + request);
        }
        this.handleSchemaEvolveRequest((SchemaChangeRequest)request, responseFuture);
    }

    @Override
    protected void handleFlushSuccessEvent(FlushSuccessEvent event) throws Exception {
        LOG.info("Sink subtask {} succeed flushing.", (Object)event.getSinkSubTaskId());
        this.flushedSinkWriters.add(event.getSinkSubTaskId());
    }

    @Override
    protected void handleUnrecoverableError(String taskDescription, Throwable t) {
        super.handleUnrecoverableError(taskDescription, t);
        LOG.info("Current upstream table state: {}", this.upstreamSchemaTable);
        this.pendingRequests.forEach((index, tuple) -> ((CompletableFuture)tuple.f1).completeExceptionally(t));
    }

    private void handleSchemaEvolveRequest(SchemaChangeRequest request, CompletableFuture<CoordinationResponse> responseFuture) throws Exception {
        LOG.info("Coordinator received schema change request {}.", (Object)request);
        if (!request.isNoOpRequest()) {
            LOG.info("It's not an align request, will try to deduplicate.");
            int eventSourcePartitionId = request.getSourceSubTaskId();
            int handlingSinkSubTaskId = request.getSinkSubTaskId();
            SchemaChangeEvent schemaChangeEvent = request.getSchemaChangeEvent();
            Tuple2 uniqueKey = Tuple2.of((Object)eventSourcePartitionId, (Object)schemaChangeEvent);
            this.alreadyHandledSchemaChangeEvents.put((Object)uniqueKey, (Object)handlingSinkSubTaskId);
            Collection reportedSinkSubTasks = this.alreadyHandledSchemaChangeEvents.get((Object)uniqueKey);
            if (reportedSinkSubTasks.size() == 1) {
                LOG.info("It's a new request for {}, will handle it", (Object)uniqueKey);
                this.updateUpstreamSchemaTable(schemaChangeEvent.tableId(), request.getSourceSubTaskId(), schemaChangeEvent);
            } else {
                LOG.info("It's an already handled event {}. It has been handled by {}", (Object)uniqueKey, (Object)reportedSinkSubTasks);
                request = SchemaChangeRequest.createNoOpRequest(handlingSinkSubTaskId);
            }
            if (reportedSinkSubTasks.size() == this.currentParallelism) {
                LOG.info("All sink subTasks ({}) have already reported request {}. Remove it out of tracking.", (Object)reportedSinkSubTasks, (Object)uniqueKey);
                this.alreadyHandledSchemaChangeEvents.removeAll((Object)uniqueKey);
            }
        }
        this.pendingRequests.put(request.getSinkSubTaskId(), (Tuple2<SchemaChangeRequest, CompletableFuture<CoordinationResponse>>)Tuple2.of((Object)request, responseFuture));
        if (this.pendingRequests.size() == 1) {
            Preconditions.checkState((boolean)this.evolvingStatus.compareAndSet(RequestStatus.IDLE, RequestStatus.WAITING_FOR_FLUSH), (Object)("Unexpected evolving status: " + (Object)((Object)this.evolvingStatus.get())));
            LOG.info("Received the very-first schema change request {}. Switching from IDLE to WAITING_FOR_FLUSH.", (Object)request);
        }
        if (this.pendingRequests.size() == this.currentParallelism) {
            Preconditions.checkState((boolean)this.evolvingStatus.compareAndSet(RequestStatus.WAITING_FOR_FLUSH, RequestStatus.EVOLVING), (Object)("Unexpected evolving status: " + (Object)((Object)this.evolvingStatus.get())));
            LOG.info("Received the last required schema change request {}. Switching from WAITING_FOR_FLUSH to EVOLVING.", (Object)request);
            this.schemaChangeThreadPool.submit(() -> {
                try {
                    this.startSchemaChange();
                }
                catch (Throwable t) {
                    this.failJob("Schema change applying task", new FlinkRuntimeException("Failed to apply schema change event.", t));
                    throw new FlinkRuntimeException("Failed to apply schema change event.", t);
                }
            });
        }
    }

    private void updateUpstreamSchemaTable(TableId tableId, int sourcePartition, SchemaChangeEvent schemaChangeEvent) {
        Schema oldSchema = (Schema)this.upstreamSchemaTable.get((Object)tableId, (Object)sourcePartition);
        this.upstreamSchemaTable.put((Object)tableId, (Object)sourcePartition, (Object)SchemaUtils.applySchemaChangeEvent((Schema)oldSchema, (SchemaChangeEvent)schemaChangeEvent));
    }

    private void startSchemaChange() throws TimeoutException {
        LOG.info("Starting to evolve schema.");
        this.loopWhen(() -> this.flushedSinkWriters.size() < this.currentParallelism, () -> LOG.info("Not all sink writers have successfully flushed. Expected {}, actual {}", (Object)this.currentParallelism, this.flushedSinkWriters), this.rpcTimeout, Duration.ofMillis(100L));
        LOG.info("All flushed. Going to evolve schema for pending requests: {}", this.pendingRequests);
        this.flushedSinkWriters.clear();
        Tuple2<Set<TableId>, List<SchemaChangeEvent>> deduceSummary = this.deduceEvolvedSchemaChanges();
        ArrayList<SchemaChangeEvent> successfullyAppliedSchemaChangeEvents = new ArrayList<SchemaChangeEvent>();
        for (SchemaChangeEvent appliedSchemaChangeEvent : (List)deduceSummary.f1) {
            if (!this.applyAndUpdateEvolvedSchemaChange(appliedSchemaChangeEvent)) continue;
            successfullyAppliedSchemaChangeEvents.add(appliedSchemaChangeEvent);
        }
        Set affectedTableIds = (Set)deduceSummary.f0;
        HashMap evolvedSchemaView = new HashMap();
        for (TableId tableId : affectedTableIds) {
            this.schemaManager.getLatestEvolvedSchema(tableId).ifPresent(schema -> evolvedSchemaView.put(tableId, schema));
        }
        ArrayList<Tuple2<SchemaChangeRequest, CompletableFuture<CoordinationResponse>>> futures = new ArrayList<Tuple2<SchemaChangeRequest, CompletableFuture<CoordinationResponse>>>(this.pendingRequests.values());
        this.pendingRequests.clear();
        LOG.info("Finished schema evolving. Switching from EVOLVING to IDLE.");
        Preconditions.checkState((boolean)this.evolvingStatus.compareAndSet(RequestStatus.EVOLVING, RequestStatus.IDLE), (Object)"RequestStatus should be EVOLVING when schema evolving finishes.");
        futures.forEach(tuple -> {
            LOG.info("Coordinator finishes pending future from {}", (Object)((SchemaChangeRequest)tuple.f0).getSinkSubTaskId());
            ((CompletableFuture)tuple.f1).complete(CoordinationResponseUtils.wrap(new SchemaChangeResponse(evolvedSchemaView, successfullyAppliedSchemaChangeEvents)));
        });
    }

    private Tuple2<Set<TableId>, List<SchemaChangeEvent>> deduceEvolvedSchemaChanges() {
        List validSchemaChangeRequests = this.pendingRequests.values().stream().map(e -> (SchemaChangeRequest)e.f0).filter(request -> !request.isNoOpRequest()).collect(Collectors.toList());
        LOG.info("Step 1 - Start deducing evolved schema change events for {}", validSchemaChangeRequests);
        Set<TableId> affectedSinkTableIds = SchemaDerivator.getAffectedEvolvedTables(this.router, validSchemaChangeRequests.stream().map(rsr -> rsr.getSchemaChangeEvent().tableId()).collect(Collectors.toSet()));
        LOG.info("Step 2 - Affected sink tables are: {}", affectedSinkTableIds);
        ArrayList<SchemaChangeEvent> evolvedSchemaChanges = new ArrayList<SchemaChangeEvent>();
        for (TableId affectedSinkTableId : affectedSinkTableIds) {
            Schema currentSinkSchema = this.schemaManager.getLatestEvolvedSchema(affectedSinkTableId).orElse(null);
            LOG.info("Step 3.1 - For affected sink table {} with schema {}...", (Object)affectedSinkTableId, (Object)currentSinkSchema);
            Set<TableId> upstreamDependencies = SchemaDerivator.reverseLookupDependingUpstreamTables(this.router, affectedSinkTableId, this.upstreamSchemaTable);
            Preconditions.checkState((!upstreamDependencies.isEmpty() ? 1 : 0) != 0, (Object)"An affected sink table's upstream dependency cannot be empty.");
            LOG.info("Step 3.2 - upstream dependency tables are: {}", upstreamDependencies);
            Set<Schema> toBeMergedSchemas = SchemaDerivator.reverseLookupDependingUpstreamSchemas(this.router, affectedSinkTableId, this.upstreamSchemaTable);
            LOG.info("Step 3.3 - Upstream dependency schemas are: {}.", toBeMergedSchemas);
            Schema mergedSchema = currentSinkSchema;
            for (Schema toBeMergedSchema : toBeMergedSchemas) {
                mergedSchema = SchemaMergingUtils.getLeastCommonSchema((Schema)mergedSchema, (Schema)toBeMergedSchema);
            }
            LOG.info("Step 3.4 - Deduced widest schema is: {}.", (Object)mergedSchema);
            List localEvolvedSchemaChanges = SchemaMergingUtils.getSchemaDifference((TableId)affectedSinkTableId, (Schema)currentSinkSchema, (Schema)mergedSchema);
            LOG.info("Step 3.5 - Corresponding schema changes are: {}.", (Object)localEvolvedSchemaChanges);
            List<SchemaChangeEvent> normalizedEvents = SchemaDerivator.normalizeSchemaChangeEvents(currentSinkSchema, localEvolvedSchemaChanges, this.behavior, this.metadataApplier);
            LOG.info("Step 3.6 - After being normalized with {} behavior, final schema change events are: {}", (Object)this.behavior, normalizedEvents);
            evolvedSchemaChanges.addAll(normalizedEvents);
        }
        return Tuple2.of(affectedSinkTableIds, evolvedSchemaChanges);
    }

    private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChangeEvent) {
        try {
            this.metadataApplier.applySchemaChange(schemaChangeEvent);
            this.schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
            LOG.info("Successfully applied schema change event {} to external system.", (Object)schemaChangeEvent);
            return true;
        }
        catch (Throwable t) {
            this.handleUnrecoverableError("Apply schema change event - " + schemaChangeEvent, (Throwable)new FlinkRuntimeException("Failed to apply schema change event " + schemaChangeEvent + ".", t));
            this.context.failJob(t);
            throw t;
        }
    }

    @VisibleForTesting
    public void emplaceOriginalSchema(TableId tableId, Integer subTaskId, Schema schema) {
        this.upstreamSchemaTable.put((Object)tableId, (Object)subTaskId, (Object)schema);
    }

    private static enum RequestStatus {
        IDLE,
        WAITING_FOR_FLUSH,
        EVOLVING;

    }
}

