/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.repair;

import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RepairException;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.repair.AbstractRepairTask;
import org.apache.cassandra.repair.CommonRange;
import org.apache.cassandra.repair.CoordinatedRepairResult;
import org.apache.cassandra.repair.IncrementalRepairTask;
import org.apache.cassandra.repair.NormalRepairTask;
import org.apache.cassandra.repair.PreviewRepairTask;
import org.apache.cassandra.repair.RepairNotifier;
import org.apache.cassandra.repair.SomeRepairFailedException;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.repair.state.CoordinatorState;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.progress.ProgressEvent;
import org.apache.cassandra.utils.progress.ProgressEventNotifier;
import org.apache.cassandra.utils.progress.ProgressEventType;
import org.apache.cassandra.utils.progress.ProgressListener;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RepairRunnable
implements Runnable,
ProgressEventNotifier,
RepairNotifier {
    private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class);
    private static final AtomicInteger THREAD_COUNTER = new AtomicInteger(1);
    public final CoordinatorState state;
    private final StorageService storageService;
    private final String tag;
    private final List<ProgressListener> listeners = new ArrayList<ProgressListener>();
    private final AtomicReference<Throwable> firstError = new AtomicReference<Object>(null);
    private TraceState traceState;

    public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace) {
        this.state = new CoordinatorState(cmd, keyspace, options);
        this.storageService = storageService;
        this.tag = "repair:" + cmd;
        ActiveRepairService.instance.register(this.state);
    }

    @Override
    public void addProgressListener(ProgressListener listener) {
        this.listeners.add(listener);
    }

    @Override
    public void removeProgressListener(ProgressListener listener) {
        this.listeners.remove(listener);
    }

    protected void fireProgressEvent(ProgressEvent event) {
        for (ProgressListener listener : this.listeners) {
            listener.progress(this.tag, event);
        }
    }

    @Override
    public void notification(String msg) {
        logger.info(msg);
        this.fireProgressEvent(this.jmxEvent(ProgressEventType.NOTIFICATION, msg));
    }

    @Override
    public void notifyError(Throwable error) {
        if (error instanceof SomeRepairFailedException) {
            return;
        }
        if (Throwables.anyCauseMatches(error, RepairException::shouldWarn)) {
            logger.warn("Repair {} aborted: {}", this.state.id, (Object)error.getMessage());
            if (logger.isDebugEnabled()) {
                logger.debug("Repair {} aborted: ", this.state.id, (Object)error);
            }
        } else {
            logger.error("Repair {} failed:", this.state.id, (Object)error);
        }
        StorageMetrics.repairExceptions.inc();
        String errorMessage = String.format("Repair command #%d failed with error %s", this.state.cmd, error.getMessage());
        this.fireProgressEvent(this.jmxEvent(ProgressEventType.ERROR, errorMessage));
        this.firstError.compareAndSet(null, error);
        this.maybeStoreParentRepairFailure(error);
    }

    @Override
    public void notifyProgress(String message) {
        logger.info(message);
        this.fireProgressEvent(this.jmxEvent(ProgressEventType.PROGRESS, message));
    }

    private void skip(String msg) {
        this.state.phase.skip(msg);
        this.notification("Repair " + this.state.id + " skipped: " + msg);
        this.success(msg);
    }

    private void success(String msg) {
        this.state.phase.success(msg);
        this.fireProgressEvent(this.jmxEvent(ProgressEventType.SUCCESS, msg));
        ActiveRepairService.instance.recordRepairStatus(this.state.cmd, ActiveRepairService.ParentRepairStatus.COMPLETED, (List<String>)ImmutableList.of((Object)msg));
        this.complete(null);
    }

    private void fail(String reason) {
        if (reason == null) {
            Throwable error = this.firstError.get();
            reason = error != null ? error.getMessage() : "Some repair failed";
        }
        this.state.phase.fail(reason);
        String completionMessage = String.format("Repair command #%d finished with error", this.state.cmd);
        ActiveRepairService.instance.recordRepairStatus(this.state.cmd, ActiveRepairService.ParentRepairStatus.FAILED, (List<String>)ImmutableList.of((Object)reason, (Object)completionMessage));
        this.complete(completionMessage);
    }

    private void complete(String msg) {
        long durationMillis = this.state.getDurationMillis();
        if (msg == null) {
            String duration = DurationFormatUtils.formatDurationWords((long)durationMillis, (boolean)true, (boolean)true);
            msg = String.format("Repair command #%d finished in %s", this.state.cmd, duration);
        }
        this.fireProgressEvent(this.jmxEvent(ProgressEventType.COMPLETE, msg));
        logger.info(this.state.options.getPreviewKind().logPrefix((TimeUUID)this.state.id) + msg);
        ActiveRepairService.instance.removeParentRepairSession((TimeUUID)this.state.id);
        TraceState localState = this.traceState;
        if (this.state.options.isTraced() && localState != null) {
            for (ProgressListener listener : this.listeners) {
                localState.removeProgressListener(listener);
            }
            Tracing.instance.set(localState);
            Tracing.traceRepair(msg, new Object[0]);
            Tracing.instance.stopSession();
        }
        Keyspace.open((String)this.state.keyspace).metric.repairTime.update(durationMillis, TimeUnit.MILLISECONDS);
    }

    @Override
    public void run() {
        try {
            this.runMayThrow();
        }
        catch (SkipRepairException e) {
            this.skip(e.getMessage());
        }
        catch (Error | Exception e) {
            this.notifyError(e);
            this.fail(e.getMessage());
        }
    }

    private void runMayThrow() throws Exception {
        this.state.phase.setup();
        ActiveRepairService.instance.recordRepairStatus(this.state.cmd, ActiveRepairService.ParentRepairStatus.IN_PROGRESS, (List<String>)ImmutableList.of());
        List<ColumnFamilyStore> columnFamilies = this.getColumnFamilies();
        String[] cfnames = (String[])columnFamilies.stream().map(cfs -> cfs.name).toArray(String[]::new);
        this.traceState = this.maybeCreateTraceState(columnFamilies);
        this.notifyStarting();
        NeighborsAndRanges neighborsAndRanges = this.getNeighborsAndRanges();
        this.state.phase.start(columnFamilies, neighborsAndRanges);
        this.maybeStoreParentRepairStart(cfnames);
        this.prepare(columnFamilies, neighborsAndRanges.participants, neighborsAndRanges.shouldExcludeDeadParticipants);
        this.repair(cfnames, neighborsAndRanges);
    }

    private List<ColumnFamilyStore> getColumnFamilies() throws IOException {
        String[] columnFamilies = this.state.options.getColumnFamilies().toArray(new String[this.state.options.getColumnFamilies().size()]);
        Iterable<ColumnFamilyStore> validColumnFamilies = this.storageService.getValidColumnFamilies(false, false, this.state.keyspace, columnFamilies);
        if (Iterables.isEmpty(validColumnFamilies)) {
            throw new SkipRepairException(String.format("%s Empty keyspace, skipping repair: %s", this.state.id, this.state.keyspace));
        }
        return Lists.newArrayList(validColumnFamilies);
    }

    private TraceState maybeCreateTraceState(Iterable<ColumnFamilyStore> columnFamilyStores) {
        if (!this.state.options.isTraced()) {
            return null;
        }
        StringBuilder cfsb = new StringBuilder();
        for (ColumnFamilyStore cfs : columnFamilyStores) {
            cfsb.append(", ").append(cfs.getKeyspaceName()).append(".").append(cfs.name);
        }
        TimeUUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
        TraceState traceState = Tracing.instance.begin("repair", (Map<String, String>)ImmutableMap.of((Object)"keyspace", (Object)this.state.keyspace, (Object)"columnFamilies", (Object)cfsb.substring(2)));
        traceState.enableActivityNotification(this.tag);
        for (ProgressListener listener : this.listeners) {
            traceState.addProgressListener(listener);
        }
        Thread queryThread = this.createQueryThread(sessionId);
        queryThread.setName("RepairTracePolling");
        return traceState;
    }

    private void notifyStarting() {
        String message = String.format("Starting repair command #%d (%s), repairing keyspace %s with %s", this.state.cmd, this.state.id, this.state.keyspace, this.state.options);
        logger.info(message);
        Tracing.traceRepair(message, new Object[0]);
        this.fireProgressEvent(this.jmxEvent(ProgressEventType.START, message));
    }

    private NeighborsAndRanges getNeighborsAndRanges() throws RepairException {
        HashSet allNeighbors = new HashSet();
        ArrayList<CommonRange> commonRanges = new ArrayList<CommonRange>();
        Set<Range<Token>> keyspaceLocalRanges = this.storageService.getLocalReplicas(this.state.keyspace).ranges();
        for (Range<Token> range : this.state.options.getRanges()) {
            EndpointsForRange neighbors = ActiveRepairService.getNeighbors(this.state.keyspace, keyspaceLocalRanges, range, this.state.options.getDataCenters(), this.state.options.getHosts());
            if (neighbors.isEmpty()) {
                if (this.state.options.ignoreUnreplicatedKeyspaces()) {
                    logger.info("{} Found no neighbors for range {} for {} - ignoring since repairing with --ignore-unreplicated-keyspaces", new Object[]{this.state.id, range, this.state.keyspace});
                    continue;
                }
                throw RepairException.warn(String.format("Nothing to repair for %s in %s - aborting", range, this.state.keyspace));
            }
            RepairRunnable.addRangeToNeighbors(commonRanges, range, neighbors);
            allNeighbors.addAll(neighbors.endpoints());
        }
        if (this.state.options.ignoreUnreplicatedKeyspaces() && allNeighbors.isEmpty()) {
            throw new SkipRepairException(String.format("Nothing to repair for %s in %s - unreplicated keyspace is ignored since repair was called with --ignore-unreplicated-keyspaces", this.state.options.getRanges(), this.state.keyspace));
        }
        boolean shouldExcludeDeadParticipants = this.state.options.isForcedRepair();
        if (shouldExcludeDeadParticipants) {
            HashSet actualNeighbors = Sets.newHashSet((Iterable)Iterables.filter(allNeighbors, FailureDetector.instance::isAlive));
            shouldExcludeDeadParticipants = !allNeighbors.equals(actualNeighbors);
            allNeighbors = actualNeighbors;
        }
        return new NeighborsAndRanges(shouldExcludeDeadParticipants, allNeighbors, commonRanges);
    }

    private void maybeStoreParentRepairStart(String[] cfnames) {
        if (!this.state.options.isPreview()) {
            SystemDistributedKeyspace.startParentRepair((TimeUUID)this.state.id, this.state.keyspace, cfnames, this.state.options);
        }
    }

    private void maybeStoreParentRepairSuccess(Collection<Range<Token>> successfulRanges) {
        if (!this.state.options.isPreview()) {
            SystemDistributedKeyspace.successfulParentRepair((TimeUUID)this.state.id, successfulRanges);
        }
    }

    private void maybeStoreParentRepairFailure(Throwable error) {
        if (!this.state.options.isPreview()) {
            SystemDistributedKeyspace.failParentRepair((TimeUUID)this.state.id, error);
        }
    }

    private void prepare(List<ColumnFamilyStore> columnFamilies, Set<InetAddressAndPort> allNeighbors, boolean force) {
        this.state.phase.prepareStart();
        try (Timer.Context ignore = Keyspace.open((String)this.state.keyspace).metric.repairPrepareTime.time();){
            ActiveRepairService.instance.prepareForRepair((TimeUUID)this.state.id, FBUtilities.getBroadcastAddressAndPort(), allNeighbors, this.state.options, force, columnFamilies);
        }
        this.state.phase.prepareComplete();
    }

    private void repair(String[] cfnames, NeighborsAndRanges neighborsAndRanges) {
        AbstractRepairTask task = this.state.options.isPreview() ? new PreviewRepairTask(this.state.options, this.state.keyspace, this, (TimeUUID)this.state.id, neighborsAndRanges.filterCommonRanges(this.state.keyspace, cfnames), cfnames) : (this.state.options.isIncremental() ? new IncrementalRepairTask(this.state.options, this.state.keyspace, this, (TimeUUID)this.state.id, neighborsAndRanges, cfnames) : new NormalRepairTask(this.state.options, this.state.keyspace, this, (TimeUUID)this.state.id, neighborsAndRanges.filterCommonRanges(this.state.keyspace, cfnames), cfnames));
        ExecutorPlus executor = this.createExecutor();
        this.state.phase.repairSubmitted();
        Future<CoordinatedRepairResult> f = task.perform(executor);
        f.addCallback((result, failure) -> {
            this.state.phase.repairCompleted();
            try {
                if (failure != null) {
                    this.notifyError((Throwable)failure);
                    this.fail(failure.getMessage());
                } else {
                    this.maybeStoreParentRepairSuccess(result.successfulRanges);
                    if (result.hasFailed()) {
                        this.fail(null);
                    } else {
                        this.success(task.successMessage());
                        ActiveRepairService.instance.cleanUp((TimeUUID)this.state.id, neighborsAndRanges.participants);
                    }
                }
            }
            finally {
                executor.shutdown();
            }
        });
    }

    private ExecutorPlus createExecutor() {
        return ExecutorFactory.Global.executorFactory().localAware().withJmxInternal().pooled("Repair#" + this.state.cmd, this.state.options.getJobThreads());
    }

    private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors) {
        Set<InetAddressAndPort> endpoints = neighbors.endpoints();
        Set<InetAddressAndPort> transEndpoints = ((EndpointsForRange)neighbors.filter(Replica::isTransient)).endpoints();
        for (CommonRange commonRange : neighborRangeList) {
            if (!commonRange.matchesEndpoints(endpoints, transEndpoints)) continue;
            commonRange.ranges.add(range);
            return;
        }
        ArrayList<Range<Token>> ranges = new ArrayList<Range<Token>>();
        ranges.add(range);
        neighborRangeList.add(new CommonRange(endpoints, transEndpoints, ranges));
    }

    private Thread createQueryThread(final TimeUUID sessionId) {
        return ExecutorFactory.Global.executorFactory().startThread("Repair-Runnable-" + THREAD_COUNTER.incrementAndGet(), new WrappedRunnable(){

            @Override
            public void runMayThrow() throws Exception {
                TraceState.Status status;
                TraceState state = Tracing.instance.get(sessionId);
                if (state == null) {
                    throw new Exception("no tracestate");
                }
                String format = "select event_id, source, source_port, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;";
                String query = String.format(format, "system_traces", "events");
                SelectStatement statement = (SelectStatement)QueryProcessor.parseStatement(query).prepare(ClientState.forInternalCalls());
                ByteBuffer sessionIdBytes = sessionId.toBytes();
                InetAddressAndPort source = FBUtilities.getBroadcastAddressAndPort();
                HashSet[] seen = new HashSet[]{new HashSet(), new HashSet()};
                int si = 0;
                long tlast = Clock.Global.currentTimeMillis();
                long minWaitMillis = 125L;
                long maxWaitMillis = 1024000L;
                long timeout = minWaitMillis;
                boolean shouldDouble = false;
                while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED) {
                    if (status == TraceState.Status.IDLE) {
                        timeout = shouldDouble ? Math.min(timeout * 2L, maxWaitMillis) : timeout;
                        shouldDouble = !shouldDouble;
                    } else {
                        timeout = minWaitMillis;
                        shouldDouble = false;
                    }
                    ByteBuffer tminBytes = TimeUUID.minAtUnixMillis(tlast - 1000L).toBytes();
                    long tcur = Clock.Global.currentTimeMillis();
                    ByteBuffer tmaxBytes = TimeUUID.maxAtUnixMillis(tcur).toBytes();
                    QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList((Object[])new ByteBuffer[]{sessionIdBytes, tminBytes, tmaxBytes}));
                    ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options, Clock.Global.nanoTime());
                    UntypedResultSet result = UntypedResultSet.create(rows.result);
                    for (UntypedResultSet.Row r : result) {
                        InetAddressAndPort eventNode;
                        int port = DatabaseDescriptor.getStoragePort();
                        if (r.has("source_port")) {
                            port = r.getInt("source_port");
                        }
                        if (source.equals(eventNode = InetAddressAndPort.getByAddressOverrideDefaults(r.getInetAddress("source"), port))) continue;
                        UUID uuid = r.getUUID("event_id");
                        if (uuid.timestamp() > (tcur - 1000L) * 10000L) {
                            seen[si].add(uuid);
                        }
                        if (seen[si == 0 ? 1 : 0].contains(uuid)) continue;
                        String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity"));
                        RepairRunnable.this.notification(message);
                    }
                    tlast = tcur;
                    si = si == 0 ? 1 : 0;
                    seen[si].clear();
                }
            }
        });
    }

    private ProgressEvent jmxEvent(ProgressEventType type, String msg) {
        int length = CoordinatorState.State.values().length + 1;
        int currentState = this.state.getCurrentState();
        return new ProgressEvent(type, currentState == -1 ? 0 : (currentState == -2 ? length : currentState), length, msg);
    }

    public static final class NeighborsAndRanges {
        final boolean shouldExcludeDeadParticipants;
        public final Set<InetAddressAndPort> participants;
        public final List<CommonRange> commonRanges;

        public NeighborsAndRanges(boolean shouldExcludeDeadParticipants, Set<InetAddressAndPort> participants, List<CommonRange> commonRanges) {
            this.shouldExcludeDeadParticipants = shouldExcludeDeadParticipants;
            this.participants = participants;
            this.commonRanges = commonRanges;
        }

        public List<CommonRange> filterCommonRanges(String keyspace, String[] tableNames) {
            if (!this.shouldExcludeDeadParticipants) {
                return this.commonRanges;
            }
            logger.debug("force flag set, removing dead endpoints if possible");
            ArrayList<CommonRange> filtered = new ArrayList<CommonRange>(this.commonRanges.size());
            for (CommonRange commonRange : this.commonRanges) {
                ImmutableSet endpoints = ImmutableSet.copyOf((Iterable)Iterables.filter(commonRange.endpoints, this.participants::contains));
                ImmutableSet transEndpoints = ImmutableSet.copyOf((Iterable)Iterables.filter(commonRange.transEndpoints, this.participants::contains));
                Preconditions.checkState((boolean)endpoints.containsAll((Collection<?>)transEndpoints), (Object)"transEndpoints must be a subset of endpoints");
                if (!endpoints.isEmpty()) {
                    Sets.SetView skippedReplicas = Sets.difference(commonRange.endpoints, (Set)endpoints);
                    skippedReplicas.forEach(endpoint -> logger.info("Removing a dead node {} from repair for ranges {} due to -force", endpoint, commonRange.ranges));
                    filtered.add(new CommonRange((Set<InetAddressAndPort>)endpoints, (Set<InetAddressAndPort>)transEndpoints, commonRange.ranges, !skippedReplicas.isEmpty()));
                    continue;
                }
                logger.warn("Skipping forced repair for ranges {} of tables {} in keyspace {}, as no neighbor nodes are live.", new Object[]{commonRange.ranges, Arrays.asList(tableNames), keyspace});
            }
            Preconditions.checkState((!filtered.isEmpty() ? 1 : 0) != 0, (Object)"Not enough live endpoints for a repair");
            return filtered;
        }
    }

    private static final class SkipRepairException
    extends RuntimeException {
        SkipRepairException(String message) {
            super(message);
        }
    }
}

