/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.repair;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RepairException;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.repair.CommonRange;
import org.apache.cassandra.repair.CompletableRemoteSyncTask;
import org.apache.cassandra.repair.RepairJob;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.RepairResult;
import org.apache.cassandra.repair.RepairSessionResult;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.repair.SyncNodePair;
import org.apache.cassandra.repair.ValidationTask;
import org.apache.cassandra.repair.consistent.ConsistentSession;
import org.apache.cassandra.repair.consistent.LocalSession;
import org.apache.cassandra.repair.consistent.LocalSessions;
import org.apache.cassandra.repair.messages.SyncResponse;
import org.apache.cassandra.repair.messages.ValidationResponse;
import org.apache.cassandra.repair.state.SessionState;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTrees;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RepairSession
extends AsyncFuture<RepairSessionResult>
implements IEndpointStateChangeSubscriber,
IFailureDetectionEventListener,
LocalSessions.Listener {
    private static final Logger logger = LoggerFactory.getLogger(RepairSession.class);
    public final SessionState state;
    public final RepairParallelism parallelismDegree;
    public final boolean pullRepair;
    public final boolean isIncremental;
    public final PreviewKind previewKind;
    public final boolean repairPaxos;
    public final boolean paxosOnly;
    private final AtomicBoolean isFailed = new AtomicBoolean(false);
    private final ConcurrentMap<Pair<RepairJobDesc, InetAddressAndPort>, ValidationTask> validating = new ConcurrentHashMap<Pair<RepairJobDesc, InetAddressAndPort>, ValidationTask>();
    private final ConcurrentMap<Pair<RepairJobDesc, SyncNodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<Pair<RepairJobDesc, SyncNodePair>, CompletableRemoteSyncTask>();
    public final SafeExecutor taskExecutor;
    public final boolean optimiseStreams;
    public final SharedContext ctx;
    private volatile List<RepairJob> jobs = Collections.emptyList();
    private volatile boolean terminated = false;

    public RepairSession(SharedContext ctx, TimeUUID parentRepairSession, CommonRange commonRange, String keyspace, RepairParallelism parallelismDegree, boolean isIncremental, boolean pullRepair, PreviewKind previewKind, boolean optimiseStreams, boolean repairPaxos, boolean paxosOnly, String ... cfnames) {
        this.ctx = ctx;
        this.repairPaxos = repairPaxos;
        this.paxosOnly = paxosOnly;
        assert (cfnames.length > 0) : "Repairing no column families seems pointless, doesn't it";
        this.state = new SessionState(ctx.clock(), parentRepairSession, keyspace, cfnames, commonRange);
        this.parallelismDegree = parallelismDegree;
        this.isIncremental = isIncremental;
        this.previewKind = previewKind;
        this.pullRepair = pullRepair;
        this.optimiseStreams = optimiseStreams;
        this.taskExecutor = new SafeExecutor(this.createExecutor(ctx));
    }

    @VisibleForTesting
    protected ExecutorPlus createExecutor(SharedContext ctx) {
        return ctx.executorFactory().pooled("RepairJobTask", Integer.MAX_VALUE);
    }

    public TimeUUID getId() {
        return (TimeUUID)this.state.id;
    }

    public Collection<Range<Token>> ranges() {
        return this.state.commonRange.ranges;
    }

    public Collection<InetAddressAndPort> endpoints() {
        return this.state.commonRange.endpoints;
    }

    public synchronized void trackValidationCompletion(Pair<RepairJobDesc, InetAddressAndPort> key, ValidationTask task) {
        if (this.terminated) {
            task.abort(new RuntimeException("Session terminated"));
            return;
        }
        this.validating.put(key, task);
    }

    public synchronized void trackSyncCompletion(Pair<RepairJobDesc, SyncNodePair> key, CompletableRemoteSyncTask task) {
        if (this.terminated) {
            return;
        }
        this.syncingTasks.put(key, task);
    }

    public void validationComplete(RepairJobDesc desc, Message<ValidationResponse> message) {
        InetAddressAndPort endpoint = message.from();
        MerkleTrees trees = ((ValidationResponse)message.payload).trees;
        ValidationTask task = (ValidationTask)this.validating.remove(Pair.create(desc, endpoint));
        this.ctx.messaging().send(message.emptyResponse(), message.from());
        if (task == null) {
            if (trees != null) {
                trees.release();
            }
            return;
        }
        String msg = String.format("Received merkle tree for %s from %s", desc.columnFamily, endpoint);
        logger.info("{} {}", (Object)this.previewKind.logPrefix(this.getId()), (Object)msg);
        Tracing.traceRepair(msg, new Object[0]);
        task.treesReceived(trees);
    }

    public void syncComplete(RepairJobDesc desc, Message<SyncResponse> message) {
        SyncNodePair nodes = ((SyncResponse)message.payload).nodes;
        CompletableRemoteSyncTask task = (CompletableRemoteSyncTask)this.syncingTasks.remove(Pair.create(desc, nodes));
        this.ctx.messaging().send(message.emptyResponse(), message.from());
        if (task == null) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("{} Repair completed between {} and {} on {}", new Object[]{this.previewKind.logPrefix(this.getId()), nodes.coordinator, nodes.peer, desc.columnFamily});
        }
        task.syncComplete(((SyncResponse)message.payload).success, ((SyncResponse)message.payload).summaries);
    }

    @VisibleForTesting
    Map<Pair<RepairJobDesc, SyncNodePair>, CompletableRemoteSyncTask> getSyncingTasks() {
        return Collections.unmodifiableMap(this.syncingTasks);
    }

    private String repairedNodes() {
        StringBuilder sb = new StringBuilder();
        sb.append(FBUtilities.getBroadcastAddressAndPort());
        for (InetAddressAndPort ep : this.state.commonRange.endpoints) {
            sb.append(", ").append(ep);
        }
        return sb.toString();
    }

    public void start(ExecutorPlus executor) {
        this.state.phase.start();
        if (this.terminated) {
            return;
        }
        logger.info("{} parentSessionId = {}: new session: will sync {} on range {} for {}.{}", new Object[]{this.previewKind.logPrefix(this.getId()), this.state.parentRepairSession, this.repairedNodes(), this.state.commonRange, this.state.keyspace, Arrays.toString(this.state.cfnames)});
        Tracing.traceRepair("Syncing range {}", this.state.commonRange);
        if (!this.previewKind.isPreview() && !this.paxosOnly) {
            SystemDistributedKeyspace.startRepairs(this.getId(), this.state.parentRepairSession, this.state.keyspace, this.state.cfnames, this.state.commonRange);
        }
        if (this.state.commonRange.endpoints.isEmpty()) {
            String message = String.format("No neighbors to repair with on range %s: session completed", this.state.commonRange);
            logger.info("{} {}", (Object)this.previewKind.logPrefix(this.getId()), (Object)message);
            this.state.phase.skip(message);
            Tracing.traceRepair(message, new Object[0]);
            this.trySuccess(new RepairSessionResult((TimeUUID)this.state.id, this.state.keyspace, this.state.commonRange.ranges, Lists.newArrayList(), this.state.commonRange.hasSkippedReplicas));
            if (!this.previewKind.isPreview()) {
                SystemDistributedKeyspace.failRepairs(this.getId(), this.state.keyspace, this.state.cfnames, new RuntimeException(message));
            }
            return;
        }
        for (InetAddressAndPort endpoint : this.state.commonRange.endpoints) {
            if (this.ctx.failureDetector().isAlive(endpoint) || this.state.commonRange.hasSkippedReplicas) continue;
            String message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);
            this.state.phase.fail(message);
            logger.error("{} {}", (Object)this.previewKind.logPrefix(this.getId()), (Object)message);
            IOException e = new IOException(message);
            this.tryFailure(e);
            if (!this.previewKind.isPreview()) {
                SystemDistributedKeyspace.failRepairs(this.getId(), this.state.keyspace, this.state.cfnames, e);
            }
            return;
        }
        this.state.phase.jobsSubmitted();
        ArrayList<RepairJob> jobs = new ArrayList<RepairJob>(this.state.cfnames.length);
        for (String cfname : this.state.cfnames) {
            RepairJob job = new RepairJob(this, cfname);
            this.state.register(job.state);
            executor.execute(job);
            jobs.add(job);
        }
        this.jobs = jobs;
        FBUtilities.allOf(jobs).addCallback(new FutureCallback<List<RepairResult>>(){

            public void onSuccess(List<RepairResult> results) {
                RepairSession.this.state.phase.success();
                logger.info("{} {}", (Object)RepairSession.this.previewKind.logPrefix(RepairSession.this.getId()), (Object)"Session completed successfully");
                Tracing.traceRepair("Completed sync of range {}", RepairSession.this.state.commonRange);
                RepairSession.this.trySuccess(new RepairSessionResult((TimeUUID)RepairSession.this.state.id, RepairSession.this.state.keyspace, RepairSession.this.state.commonRange.ranges, results, RepairSession.this.state.commonRange.hasSkippedReplicas));
                RepairSession.this.terminate(null);
                RepairSession.this.taskExecutor.shutdown();
            }

            public void onFailure(Throwable t) {
                RepairSession.this.state.phase.fail(t);
                String msg = "{} Session completed with the following error";
                if (Throwables.anyCauseMatches(t, RepairException::shouldWarn)) {
                    logger.warn(msg + ": {}", (Object)RepairSession.this.previewKind.logPrefix(RepairSession.this.getId()), (Object)t.getMessage());
                } else {
                    logger.error(msg, (Object)RepairSession.this.previewKind.logPrefix(RepairSession.this.getId()), (Object)t);
                }
                Tracing.traceRepair("Session completed with the following error: {}", t);
                RepairSession.this.forceShutdown(t);
            }
        }, (Executor)this.taskExecutor);
    }

    public synchronized void terminate(@Nullable Throwable reason) {
        this.terminated = true;
        List<RepairJob> jobs = this.jobs;
        if (jobs != null) {
            for (RepairJob job : jobs) {
                job.abort(reason);
            }
        }
        this.jobs = null;
        this.validating.clear();
        this.syncingTasks.clear();
    }

    public void forceShutdown(Throwable reason) {
        this.tryFailure(reason);
        this.terminate(reason);
        this.taskExecutor.shutdown();
    }

    @Override
    public void onRemove(InetAddressAndPort endpoint) {
        this.convict(endpoint, Double.MAX_VALUE);
    }

    @Override
    public void onRestart(InetAddressAndPort endpoint, EndpointState epState) {
        this.convict(endpoint, Double.MAX_VALUE);
    }

    @Override
    public void convict(InetAddressAndPort endpoint, double phi) {
        if (!this.state.commonRange.endpoints.contains((Object)endpoint)) {
            return;
        }
        if (phi < 2.0 * DatabaseDescriptor.getPhiConvictThreshold()) {
            return;
        }
        if (!this.isFailed.compareAndSet(false, true)) {
            return;
        }
        IOException exception = new IOException(String.format("Endpoint %s died", endpoint));
        logger.error("{} session completed with the following error", (Object)this.previewKind.logPrefix(this.getId()), (Object)exception);
        this.forceShutdown(exception);
    }

    @Override
    public void onIRStateChange(LocalSession session) {
        if (this.previewKind == PreviewKind.REPAIRED && session.getState() == ConsistentSession.State.FINALIZED && this.includesTables((Set<TableId>)session.tableIds)) {
            for (Range range : session.ranges) {
                if (!((AbstractBounds)range).intersects(this.ranges())) continue;
                logger.warn("{} An intersecting incremental repair with session id = {} finished, preview repair might not be accurate", (Object)this.previewKind.logPrefix(this.getId()), (Object)session.sessionID);
                this.forceShutdown(RepairException.warn("An incremental repair with session id " + session.sessionID + " finished during this preview repair runtime"));
                return;
            }
        }
    }

    private boolean includesTables(Set<TableId> tableIds) {
        Keyspace ks = Keyspace.open(this.state.keyspace);
        if (ks != null) {
            for (String table : this.state.cfnames) {
                ColumnFamilyStore cfs = ks.getColumnFamilyStore(table);
                if (!tableIds.contains(cfs.metadata.id)) continue;
                return true;
            }
        }
        return false;
    }

    private static class SafeExecutor
    implements Executor {
        private final ExecutorPlus delegate;

        private SafeExecutor(ExecutorPlus delegate) {
            this.delegate = delegate;
        }

        @Override
        public void execute(Runnable command) {
            try {
                this.delegate.execute(command);
            }
            catch (RejectedExecutionException e) {
                Stage.INTERNAL_RESPONSE.execute(command);
            }
        }

        public void shutdown() {
            this.delegate.shutdown();
        }
    }
}

