/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.schema;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.DistributedSchema;
import org.apache.cassandra.schema.Keyspaces;
import org.apache.cassandra.schema.MigrationCoordinator;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaAnnouncementDiagnostics;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.SchemaKeyspace;
import org.apache.cassandra.schema.SchemaPullVerbHandler;
import org.apache.cassandra.schema.SchemaPushVerbHandler;
import org.apache.cassandra.schema.SchemaTransformation;
import org.apache.cassandra.schema.SchemaUpdateHandler;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Awaitable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultSchemaUpdateHandler
implements SchemaUpdateHandler,
IEndpointStateChangeSubscriber {
    private static final Logger logger = LoggerFactory.getLogger(DefaultSchemaUpdateHandler.class);
    @VisibleForTesting
    final MigrationCoordinator migrationCoordinator;
    private final boolean requireSchemas;
    private final BiConsumer<SchemaTransformation.SchemaTransformationResult, Boolean> updateCallback;
    private volatile DistributedSchema schema = DistributedSchema.EMPTY;
    private volatile AsyncPromise<Void> requestedReset;

    private MigrationCoordinator createMigrationCoordinator(MessagingService messagingService) {
        return new MigrationCoordinator(messagingService, Stage.MIGRATION.executor(), ScheduledExecutors.scheduledTasks, 3, Gossiper.instance, this::getSchemaVersionForCoordinator, this::applyMutationsFromCoordinator);
    }

    public DefaultSchemaUpdateHandler(BiConsumer<SchemaTransformation.SchemaTransformationResult, Boolean> updateCallback) {
        this(null, MessagingService.instance(), !CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK.getBoolean(), updateCallback);
    }

    public DefaultSchemaUpdateHandler(MigrationCoordinator migrationCoordinator, MessagingService messagingService, boolean requireSchemas, BiConsumer<SchemaTransformation.SchemaTransformationResult, Boolean> updateCallback) {
        this.requireSchemas = requireSchemas;
        this.updateCallback = updateCallback;
        this.migrationCoordinator = migrationCoordinator == null ? this.createMigrationCoordinator(messagingService) : migrationCoordinator;
        Gossiper.instance.register(this);
        SchemaPushVerbHandler.instance.register(msg -> {
            DefaultSchemaUpdateHandler defaultSchemaUpdateHandler = this;
            synchronized (defaultSchemaUpdateHandler) {
                if (this.requestedReset == null) {
                    this.applyMutations((Collection)msg.payload);
                }
            }
        });
        SchemaPullVerbHandler.instance.register(msg -> {
            try {
                messagingService.send(msg.responseWith(this.getSchemaMutations()), msg.from());
            }
            catch (RuntimeException ex) {
                logger.error("Failed to send schema mutations to " + msg.from(), (Throwable)ex);
            }
        });
    }

    @Override
    public synchronized void start() {
        if (StorageService.instance.isReplacing()) {
            this.onRemove(DatabaseDescriptor.getReplaceAddress());
        }
        SchemaKeyspace.saveSystemKeyspacesSchema();
        this.migrationCoordinator.start();
    }

    @Override
    public boolean waitUntilReady(Duration timeout) {
        logger.debug("Waiting for schema to be ready (max {})", (Object)timeout);
        boolean schemasReceived = this.migrationCoordinator.awaitSchemaRequests(timeout.toMillis());
        if (schemasReceived) {
            return true;
        }
        logger.warn("There are nodes in the cluster with a different schema version than us, from which we did not merge schemas: our version: ({}), outstanding versions -> endpoints: {}. Use -D{}=true to ignore this, -D{}=<ep1[,epN]> to skip specific endpoints, or -D{}=<ver1[,verN]> to skip specific schema versions", new Object[]{Schema.instance.getVersion(), this.migrationCoordinator.outstandingVersions(), CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK.getKey(), CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_ENDPOINTS.getKey(), CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_VERSIONS.getKey()});
        if (this.requireSchemas) {
            logger.error("Didn't receive schemas for all known versions within the {}. Use -D{}=true to skip this check.", (Object)timeout, (Object)CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK.getKey());
            return false;
        }
        return true;
    }

    @Override
    public void onRemove(InetAddressAndPort endpoint) {
        this.migrationCoordinator.removeAndIgnoreEndpoint(endpoint);
    }

    @Override
    public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {
        EndpointState epState;
        if (state == ApplicationState.SCHEMA && (epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint)) != null && !Gossiper.instance.isDeadState(epState) && StorageService.instance.getTokenMetadata().isMember(endpoint)) {
            this.migrationCoordinator.reportEndpointVersion(endpoint, UUID.fromString(value.value));
        }
    }

    @Override
    public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {
    }

    @Override
    public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {
    }

    @Override
    public void onAlive(InetAddressAndPort endpoint, EndpointState state) {
    }

    @Override
    public void onDead(InetAddressAndPort endpoint, EndpointState state) {
    }

    @Override
    public void onRestart(InetAddressAndPort endpoint, EndpointState state) {
    }

    private synchronized SchemaTransformation.SchemaTransformationResult applyMutations(Collection<Mutation> schemaMutations) {
        DistributedSchema before = this.schema;
        SchemaKeyspace.applyChanges(schemaMutations);
        Set<String> affectedKeyspaces = SchemaKeyspace.affectedKeyspaces(schemaMutations);
        Keyspaces updatedKeyspaces = SchemaKeyspace.fetchKeyspaces(affectedKeyspaces);
        Set<String> removedKeyspaces = affectedKeyspaces.stream().filter(ks -> !updatedKeyspaces.containsKeyspace((String)ks)).collect(Collectors.toSet());
        Keyspaces afterKeyspaces = before.getKeyspaces().withAddedOrReplaced(updatedKeyspaces).without(removedKeyspaces);
        Keyspaces.KeyspacesDiff diff = Keyspaces.diff(before.getKeyspaces(), afterKeyspaces);
        UUID version = SchemaKeyspace.calculateSchemaDigest();
        DistributedSchema after = new DistributedSchema(afterKeyspaces, version);
        SchemaTransformation.SchemaTransformationResult update = new SchemaTransformation.SchemaTransformationResult(before, after, diff);
        logger.info("Applying schema change due to received mutations: {}", (Object)update);
        this.updateSchema(update, false);
        return update;
    }

    @Override
    public synchronized SchemaTransformation.SchemaTransformationResult apply(SchemaTransformation transformation, boolean local) {
        DistributedSchema before = this.schema;
        Keyspaces afterKeyspaces = transformation.apply(before.getKeyspaces());
        Keyspaces.KeyspacesDiff diff = Keyspaces.diff(before.getKeyspaces(), afterKeyspaces);
        if (diff.isEmpty()) {
            return new SchemaTransformation.SchemaTransformationResult(before, before, diff);
        }
        Collection<Mutation> mutations = SchemaKeyspace.convertSchemaDiffToMutations(diff, transformation.fixedTimestampMicros().orElse(FBUtilities.timestampMicros()));
        SchemaKeyspace.applyChanges(mutations);
        DistributedSchema after = new DistributedSchema(afterKeyspaces, SchemaKeyspace.calculateSchemaDigest());
        SchemaTransformation.SchemaTransformationResult update = new SchemaTransformation.SchemaTransformationResult(before, after, diff);
        this.updateSchema(update, local);
        if (!local) {
            this.migrationCoordinator.executor.submit(() -> {
                Pair<Set<InetAddressAndPort>, Set<InetAddressAndPort>> endpoints = this.migrationCoordinator.pushSchemaMutations(mutations);
                SchemaAnnouncementDiagnostics.schemaTransformationAnnounced(endpoints.left(), endpoints.right(), transformation);
            });
        }
        return update;
    }

    private void updateSchema(SchemaTransformation.SchemaTransformationResult update, boolean local) {
        if (!update.diff.isEmpty()) {
            this.schema = update.after;
            logger.debug("Schema updated: {}", (Object)update);
            this.updateCallback.accept(update, true);
            if (!local) {
                this.migrationCoordinator.announce(update.after.getVersion());
            }
        } else {
            logger.debug("Schema update is empty - skipping");
        }
    }

    private synchronized void reload() {
        DistributedSchema before = this.schema;
        DistributedSchema after = new DistributedSchema(SchemaKeyspace.fetchNonSystemKeyspaces(), SchemaKeyspace.calculateSchemaDigest());
        Keyspaces.KeyspacesDiff diff = Keyspaces.diff(before.getKeyspaces(), after.getKeyspaces());
        SchemaTransformation.SchemaTransformationResult update = new SchemaTransformation.SchemaTransformationResult(before, after, diff);
        this.updateSchema(update, false);
    }

    @Override
    public void reset(boolean local) {
        if (local) {
            this.reload();
        } else {
            this.migrationCoordinator.reset();
            if (!this.migrationCoordinator.awaitSchemaRequests(CassandraRelevantProperties.MIGRATION_DELAY.getLong())) {
                logger.error("Timeout exceeded when waiting for schema from other nodes");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Awaitable clear() {
        DefaultSchemaUpdateHandler defaultSchemaUpdateHandler = this;
        synchronized (defaultSchemaUpdateHandler) {
            if (this.requestedReset == null) {
                this.requestedReset = new AsyncPromise();
                this.migrationCoordinator.reset();
            }
            return this.requestedReset;
        }
    }

    private UUID getSchemaVersionForCoordinator() {
        if (this.requestedReset != null) {
            return SchemaConstants.emptyVersion;
        }
        return this.schema.getVersion();
    }

    private synchronized void applyMutationsFromCoordinator(InetAddressAndPort from, Collection<Mutation> mutations) {
        if (this.requestedReset != null && !mutations.isEmpty()) {
            this.schema = DistributedSchema.EMPTY;
            SchemaKeyspace.truncate();
            this.requestedReset.setSuccess((Object)null);
            this.requestedReset = null;
        }
        this.applyMutations(mutations);
    }

    private synchronized Collection<Mutation> getSchemaMutations() {
        if (this.requestedReset != null) {
            return Collections.emptyList();
        }
        return SchemaKeyspace.convertSchemaToMutations();
    }

    public Map<UUID, Set<InetAddressAndPort>> getOutstandingSchemaVersions() {
        return this.migrationCoordinator.outstandingVersions();
    }
}

