/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.schema.coordinator;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaDerivation;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler;
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent;
import org.apache.flink.cdc.runtime.operators.schema.event.GetSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.GetSchemaResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchemaRegistry
implements OperatorCoordinator,
CoordinationRequestHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistry.class);
    private final OperatorCoordinator.Context context;
    private final String operatorName;
    private final Map<Integer, Throwable> failedReasons;
    private final MetadataApplier metadataApplier;
    private final List<Tuple2<Selectors, TableId>> routes;
    private SchemaRegistryRequestHandler requestHandler;
    private SchemaManager schemaManager = new SchemaManager();
    private SchemaDerivation schemaDerivation;

    public SchemaRegistry(String operatorName, OperatorCoordinator.Context context, MetadataApplier metadataApplier, List<Tuple2<Selectors, TableId>> routes) {
        this.context = context;
        this.operatorName = operatorName;
        this.failedReasons = new HashMap<Integer, Throwable>();
        this.metadataApplier = metadataApplier;
        this.routes = routes;
        this.schemaManager = new SchemaManager();
        this.schemaDerivation = new SchemaDerivation(this.schemaManager, routes, new HashMap<TableId, Set<TableId>>());
        this.requestHandler = new SchemaRegistryRequestHandler(metadataApplier, this.schemaManager, this.schemaDerivation);
    }

    public void start() throws Exception {
        LOG.info("Starting SchemaRegistry for {}.", (Object)this.operatorName);
        this.failedReasons.clear();
        LOG.info("Started SchemaRegistry for {}.", (Object)this.operatorName);
    }

    public void close() throws Exception {
        LOG.info("SchemaRegistry for {} closed.", (Object)this.operatorName);
        this.requestHandler.close();
    }

    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception {
        if (event instanceof FlushSuccessEvent) {
            FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent)event;
            LOG.info("Sink subtask {} succeed flushing for table {}.", (Object)flushSuccessEvent.getSubtask(), (Object)flushSuccessEvent.getTableId().toString());
            this.requestHandler.flushSuccess(flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask());
        } else if (event instanceof SinkWriterRegisterEvent) {
            this.requestHandler.registerSinkWriter(((SinkWriterRegisterEvent)event).getSubtask());
        } else {
            throw new FlinkException("Unrecognized Operator Event: " + event);
        }
    }

    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception {
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
             DataOutputStream out = new DataOutputStream(baos);){
            int schemaManagerSerializerVersion = SchemaManager.SERIALIZER.getVersion();
            out.writeInt(schemaManagerSerializerVersion);
            byte[] serializedSchemaManager = SchemaManager.SERIALIZER.serialize(this.schemaManager);
            out.writeInt(serializedSchemaManager.length);
            out.write(serializedSchemaManager);
            SchemaDerivation.serializeDerivationMapping(this.schemaDerivation, out);
            resultFuture.complete(baos.toByteArray());
        }
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {
        if (request instanceof SchemaChangeRequest) {
            SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest)request;
            return this.requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
        }
        if (request instanceof ReleaseUpstreamRequest) {
            return this.requestHandler.handleReleaseUpstreamRequest();
        }
        if (request instanceof GetSchemaRequest) {
            return CompletableFuture.completedFuture(CoordinationResponseUtils.wrap(this.handleGetSchemaRequest((GetSchemaRequest)request)));
        }
        if (request instanceof SchemaChangeResultRequest) {
            return this.requestHandler.getSchemaChangeResult();
        }
        if (request instanceof RefreshPendingListsRequest) {
            return this.requestHandler.refreshPendingLists();
        }
        throw new IllegalArgumentException("Unrecognized CoordinationRequest type: " + request);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
        if (checkpointData == null) {
            return;
        }
        try (ByteArrayInputStream bais = new ByteArrayInputStream(checkpointData);
             DataInputStream in = new DataInputStream(bais);){
            int schemaManagerSerializerVersion = in.readInt();
            switch (schemaManagerSerializerVersion) {
                case 0: {
                    int length = in.readInt();
                    byte[] serializedSchemaManager = new byte[length];
                    in.readFully(serializedSchemaManager);
                    this.schemaManager = SchemaManager.SERIALIZER.deserialize(schemaManagerSerializerVersion, serializedSchemaManager);
                    this.schemaDerivation = new SchemaDerivation(this.schemaManager, this.routes, Collections.emptyMap());
                    this.requestHandler = new SchemaRegistryRequestHandler(this.metadataApplier, this.schemaManager, this.schemaDerivation);
                    return;
                }
                case 1: {
                    int length = in.readInt();
                    byte[] serializedSchemaManager = new byte[length];
                    in.readFully(serializedSchemaManager);
                    this.schemaManager = SchemaManager.SERIALIZER.deserialize(schemaManagerSerializerVersion, serializedSchemaManager);
                    Map<TableId, Set<TableId>> derivationMapping = SchemaDerivation.deserializerDerivationMapping(in);
                    this.schemaDerivation = new SchemaDerivation(this.schemaManager, this.routes, derivationMapping);
                    this.requestHandler = new SchemaRegistryRequestHandler(this.metadataApplier, this.schemaManager, this.schemaDerivation);
                    return;
                }
                default: {
                    throw new IOException("Unrecognized serialization version " + schemaManagerSerializerVersion);
                }
            }
        }
    }

    public void subtaskReset(int subtask, long checkpointId) {
        Throwable rootCause = this.failedReasons.get(subtask);
        LOG.error(String.format("Subtask %d reset at checkpoint %d.", subtask, checkpointId), rootCause);
    }

    public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable throwable) {
        this.failedReasons.put(subtask, throwable);
    }

    public void executionAttemptReady(int subtask, int attemptNumber, OperatorCoordinator.SubtaskGateway subtaskGateway) {
    }

    private GetSchemaResponse handleGetSchemaRequest(GetSchemaRequest getSchemaRequest) {
        LOG.info("Handling schema request: {}", (Object)getSchemaRequest);
        int schemaVersion = getSchemaRequest.getSchemaVersion();
        TableId tableId = getSchemaRequest.getTableId();
        if (schemaVersion == -1) {
            return new GetSchemaResponse(this.schemaManager.getLatestSchema(tableId).orElse(null));
        }
        try {
            return new GetSchemaResponse(this.schemaManager.getSchema(tableId, schemaVersion));
        }
        catch (IllegalArgumentException iae) {
            LOG.warn("Some client is requesting an non-existed schema for table {} with version {}", (Object)tableId, (Object)schemaVersion);
            return new GetSchemaResponse(null);
        }
    }
}

