/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.schema.coordinator;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaDerivation;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@NotThreadSafe
public class SchemaRegistryRequestHandler
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryRequestHandler.class);
    private final MetadataApplier metadataApplier;
    private final Set<Integer> activeSinkWriters;
    private final SchemaManager schemaManager;
    private final SchemaDerivation schemaDerivation;
    private final List<PendingSchemaChange> pendingSchemaChanges;
    private final Set<Integer> flushedSinkWriters;
    private boolean isSchemaChangeApplying;
    private Exception schemaChangeException;
    private final ExecutorService schemaChangeThreadPool;

    public SchemaRegistryRequestHandler(MetadataApplier metadataApplier, SchemaManager schemaManager, SchemaDerivation schemaDerivation) {
        this.metadataApplier = metadataApplier;
        this.activeSinkWriters = new HashSet<Integer>();
        this.flushedSinkWriters = new HashSet<Integer>();
        this.pendingSchemaChanges = new LinkedList<PendingSchemaChange>();
        this.schemaManager = schemaManager;
        this.schemaDerivation = schemaDerivation;
        this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
        this.isSchemaChangeApplying = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void applySchemaChange(TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) {
        this.isSchemaChangeApplying = true;
        this.schemaChangeException = null;
        try {
            for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {
                this.metadataApplier.applySchemaChange(changeEvent);
                LOG.debug("Apply schema change {} to table {}.", (Object)changeEvent, (Object)tableId);
            }
            PendingSchemaChange waitFlushSuccess = this.pendingSchemaChanges.get(0);
            if (RequestStatus.RECEIVED_RELEASE_REQUEST.equals((Object)waitFlushSuccess.getStatus())) {
                this.startNextSchemaChangeRequest();
            }
        }
        catch (Exception e) {
            this.schemaChangeException = e;
        }
        finally {
            this.isSchemaChangeApplying = false;
        }
    }

    public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(SchemaChangeRequest request) {
        if (this.pendingSchemaChanges.isEmpty()) {
            LOG.info("Received schema change event request from table {}. Start to buffer requests for others.", (Object)request.getTableId().toString());
            if (request.getSchemaChangeEvent() instanceof CreateTableEvent && this.schemaManager.schemaExists(request.getTableId())) {
                return CompletableFuture.completedFuture(CoordinationResponseUtils.wrap(new SchemaChangeResponse(Collections.emptyList())));
            }
            this.schemaManager.applySchemaChange(request.getSchemaChangeEvent());
            List<SchemaChangeEvent> derivedSchemaChangeEvents = this.schemaDerivation.applySchemaChange(request.getSchemaChangeEvent());
            CompletableFuture<CoordinationResponse> response = CompletableFuture.completedFuture(CoordinationResponseUtils.wrap(new SchemaChangeResponse(derivedSchemaChangeEvents)));
            if (!derivedSchemaChangeEvents.isEmpty()) {
                PendingSchemaChange pendingSchemaChange = new PendingSchemaChange(request, response);
                pendingSchemaChange.derivedSchemaChangeEvents = derivedSchemaChangeEvents;
                this.pendingSchemaChanges.add(pendingSchemaChange);
                this.pendingSchemaChanges.get(0).startToWaitForReleaseRequest();
            }
            return response;
        }
        LOG.info("There are already processing requests. Wait for processing.");
        CompletableFuture<CoordinationResponse> response = new CompletableFuture<CoordinationResponse>();
        this.pendingSchemaChanges.add(new PendingSchemaChange(request, response));
        return response;
    }

    public CompletableFuture<CoordinationResponse> handleReleaseUpstreamRequest() {
        CompletableFuture<CoordinationResponse> response = this.pendingSchemaChanges.get(0).getResponseFuture();
        if (response.isDone() && !this.isSchemaChangeApplying) {
            this.startNextSchemaChangeRequest();
        } else {
            this.pendingSchemaChanges.get(0).receiveReleaseRequest();
        }
        return response;
    }

    public void registerSinkWriter(int sinkSubtask) {
        LOG.info("Register sink subtask {}.", (Object)sinkSubtask);
        this.activeSinkWriters.add(sinkSubtask);
    }

    public void flushSuccess(TableId tableId, int sinkSubtask) throws InterruptedException {
        this.flushedSinkWriters.add(sinkSubtask);
        if (this.flushedSinkWriters.equals(this.activeSinkWriters)) {
            LOG.info("All sink subtask have flushed for table {}. Start to apply schema change.", (Object)tableId.toString());
            PendingSchemaChange waitFlushSuccess = this.pendingSchemaChanges.get(0);
            this.schemaChangeThreadPool.submit(() -> this.applySchemaChange(tableId, waitFlushSuccess.derivedSchemaChangeEvents));
            Thread.sleep(1000L);
            if (this.schemaChangeException != null) {
                throw new RuntimeException("failed to apply schema change.", this.schemaChangeException);
            }
            if (this.isSchemaChangeApplying) {
                waitFlushSuccess.getResponseFuture().complete(CoordinationResponseUtils.wrap(new SchemaChangeProcessingResponse()));
            } else {
                waitFlushSuccess.getResponseFuture().complete(CoordinationResponseUtils.wrap(new ReleaseUpstreamResponse()));
            }
        }
    }

    private void startNextSchemaChangeRequest() {
        this.pendingSchemaChanges.remove(0);
        this.flushedSinkWriters.clear();
        while (!this.pendingSchemaChanges.isEmpty()) {
            PendingSchemaChange pendingSchemaChange = this.pendingSchemaChanges.get(0);
            SchemaChangeRequest request = pendingSchemaChange.changeRequest;
            if (request.getSchemaChangeEvent() instanceof CreateTableEvent && this.schemaManager.schemaExists(request.getTableId())) {
                pendingSchemaChange.getResponseFuture().complete(CoordinationResponseUtils.wrap(new SchemaChangeResponse(Collections.emptyList())));
                this.pendingSchemaChanges.remove(0);
                continue;
            }
            this.schemaManager.applySchemaChange(request.getSchemaChangeEvent());
            List<SchemaChangeEvent> derivedSchemaChangeEvents = this.schemaDerivation.applySchemaChange(request.getSchemaChangeEvent());
            pendingSchemaChange.getResponseFuture().complete(CoordinationResponseUtils.wrap(new SchemaChangeResponse(derivedSchemaChangeEvents)));
            if (derivedSchemaChangeEvents.isEmpty()) continue;
            pendingSchemaChange.derivedSchemaChangeEvents = derivedSchemaChangeEvents;
            pendingSchemaChange.startToWaitForReleaseRequest();
            break;
        }
    }

    public CompletableFuture<CoordinationResponse> refreshPendingLists() {
        this.pendingSchemaChanges.clear();
        this.flushedSinkWriters.clear();
        return CompletableFuture.completedFuture(CoordinationResponseUtils.wrap(new RefreshPendingListsResponse()));
    }

    public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
        if (this.schemaChangeException != null) {
            throw new RuntimeException("failed to apply schema change.", this.schemaChangeException);
        }
        if (this.isSchemaChangeApplying) {
            return CompletableFuture.supplyAsync(() -> CoordinationResponseUtils.wrap(new SchemaChangeProcessingResponse()));
        }
        return CompletableFuture.supplyAsync(() -> CoordinationResponseUtils.wrap(new ReleaseUpstreamResponse()));
    }

    @Override
    public void close() throws IOException {
        if (this.schemaChangeThreadPool != null) {
            this.schemaChangeThreadPool.shutdown();
        }
    }

    static enum RequestStatus {
        PENDING,
        WAIT_RELEASE_REQUEST,
        RECEIVED_RELEASE_REQUEST;

    }

    private static class PendingSchemaChange {
        private final SchemaChangeRequest changeRequest;
        private List<SchemaChangeEvent> derivedSchemaChangeEvents;
        private CompletableFuture<CoordinationResponse> responseFuture;
        private RequestStatus status;

        public PendingSchemaChange(SchemaChangeRequest changeRequest, CompletableFuture<CoordinationResponse> responseFuture) {
            this.changeRequest = changeRequest;
            this.responseFuture = responseFuture;
            this.status = RequestStatus.PENDING;
        }

        public SchemaChangeRequest getChangeRequest() {
            return this.changeRequest;
        }

        public CompletableFuture<CoordinationResponse> getResponseFuture() {
            return this.responseFuture;
        }

        public RequestStatus getStatus() {
            return this.status;
        }

        public void startToWaitForReleaseRequest() {
            if (!this.responseFuture.isDone()) {
                throw new IllegalStateException("Cannot start to wait for flush success before the SchemaChangeRequest is done.");
            }
            this.responseFuture = new CompletableFuture();
            this.status = RequestStatus.WAIT_RELEASE_REQUEST;
        }

        public void receiveReleaseRequest() {
            this.status = RequestStatus.RECEIVED_RELEASE_REQUEST;
        }
    }
}

