/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.repair;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.CommonRange;
import org.apache.cassandra.repair.CompletableRemoteSyncTask;
import org.apache.cassandra.repair.RepairJob;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.RepairResult;
import org.apache.cassandra.repair.RepairSessionResult;
import org.apache.cassandra.repair.SyncNodePair;
import org.apache.cassandra.repair.SystemDistributedKeyspace;
import org.apache.cassandra.repair.ValidationTask;
import org.apache.cassandra.repair.consistent.ConsistentSession;
import org.apache.cassandra.repair.consistent.LocalSession;
import org.apache.cassandra.repair.consistent.LocalSessions;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.SessionSummary;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTrees;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RepairSession
extends AbstractFuture<RepairSessionResult>
implements IEndpointStateChangeSubscriber,
IFailureDetectionEventListener,
LocalSessions.Listener {
    private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
    public final UUID parentRepairSession;
    private final UUID id;
    public final String keyspace;
    private final String[] cfnames;
    public final RepairParallelism parallelismDegree;
    public final boolean pullRepair;
    public final boolean skippedReplicas;
    public final CommonRange commonRange;
    public final boolean isIncremental;
    public final PreviewKind previewKind;
    private final AtomicBoolean isFailed = new AtomicBoolean(false);
    private final ConcurrentMap<Pair<RepairJobDesc, InetAddressAndPort>, ValidationTask> validating = new ConcurrentHashMap<Pair<RepairJobDesc, InetAddressAndPort>, ValidationTask>();
    private final ConcurrentMap<Pair<RepairJobDesc, SyncNodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<Pair<RepairJobDesc, SyncNodePair>, CompletableRemoteSyncTask>();
    public final ListeningExecutorService taskExecutor;
    public final boolean optimiseStreams;
    private volatile boolean terminated = false;

    public RepairSession(UUID parentRepairSession, UUID id, CommonRange commonRange, String keyspace, RepairParallelism parallelismDegree, boolean isIncremental, boolean pullRepair, boolean force, PreviewKind previewKind, boolean optimiseStreams, String ... cfnames) {
        assert (cfnames.length > 0) : "Repairing no column families seems pointless, doesn't it";
        this.parentRepairSession = parentRepairSession;
        this.id = id;
        this.parallelismDegree = parallelismDegree;
        this.keyspace = keyspace;
        this.cfnames = cfnames;
        boolean forceSkippedReplicas = false;
        if (force) {
            logger.debug("force flag set, removing dead endpoints");
            HashSet<InetAddressAndPort> removeCandidates = new HashSet<InetAddressAndPort>();
            for (InetAddressAndPort endpoint : commonRange.endpoints) {
                if (FailureDetector.instance.isAlive(endpoint)) continue;
                logger.info("Removing a dead node from Repair due to -force {}", (Object)endpoint);
                removeCandidates.add(endpoint);
            }
            if (!removeCandidates.isEmpty()) {
                forceSkippedReplicas = true;
                HashSet<InetAddressAndPort> filteredEndpoints = new HashSet<InetAddressAndPort>((Collection<InetAddressAndPort>)commonRange.endpoints);
                filteredEndpoints.removeAll(removeCandidates);
                commonRange = new CommonRange((Set<InetAddressAndPort>)filteredEndpoints, (Set<InetAddressAndPort>)commonRange.transEndpoints, commonRange.ranges);
            }
        }
        this.commonRange = commonRange;
        this.isIncremental = isIncremental;
        this.previewKind = previewKind;
        this.pullRepair = pullRepair;
        this.skippedReplicas = forceSkippedReplicas;
        this.optimiseStreams = optimiseStreams;
        this.taskExecutor = MoreExecutors.listeningDecorator((ExecutorService)this.createExecutor());
    }

    protected DebuggableThreadPoolExecutor createExecutor() {
        return DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask");
    }

    public UUID getId() {
        return this.id;
    }

    public Collection<Range<Token>> ranges() {
        return this.commonRange.ranges;
    }

    public Collection<InetAddressAndPort> endpoints() {
        return this.commonRange.endpoints;
    }

    public void trackValidationCompletion(Pair<RepairJobDesc, InetAddressAndPort> key, ValidationTask task) {
        this.validating.put(key, task);
    }

    public void trackSyncCompletion(Pair<RepairJobDesc, SyncNodePair> key, CompletableRemoteSyncTask task) {
        this.syncingTasks.put(key, task);
    }

    public void validationComplete(RepairJobDesc desc, InetAddressAndPort endpoint, MerkleTrees trees) {
        ValidationTask task = (ValidationTask)this.validating.remove(Pair.create(desc, endpoint));
        if (task == null) {
            assert (this.terminated);
            return;
        }
        String message = String.format("Received merkle tree for %s from %s", desc.columnFamily, endpoint);
        logger.info("{} {}", (Object)this.previewKind.logPrefix(this.getId()), (Object)message);
        Tracing.traceRepair(message, new Object[0]);
        task.treesReceived(trees);
    }

    public void syncComplete(RepairJobDesc desc, SyncNodePair nodes, boolean success, List<SessionSummary> summaries) {
        CompletableRemoteSyncTask task = (CompletableRemoteSyncTask)this.syncingTasks.remove(Pair.create(desc, nodes));
        if (task == null) {
            assert (this.terminated);
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("{} Repair completed between {} and {} on {}", new Object[]{this.previewKind.logPrefix(this.getId()), nodes.coordinator, nodes.peer, desc.columnFamily});
        }
        task.syncComplete(success, summaries);
    }

    @VisibleForTesting
    Map<Pair<RepairJobDesc, SyncNodePair>, CompletableRemoteSyncTask> getSyncingTasks() {
        return Collections.unmodifiableMap(this.syncingTasks);
    }

    private String repairedNodes() {
        StringBuilder sb = new StringBuilder();
        sb.append(FBUtilities.getBroadcastAddressAndPort());
        for (InetAddressAndPort ep : this.commonRange.endpoints) {
            sb.append(", ").append(ep);
        }
        return sb.toString();
    }

    public void start(ListeningExecutorService executor) {
        if (this.terminated) {
            return;
        }
        logger.info("{} new session: will sync {} on range {} for {}.{}", new Object[]{this.previewKind.logPrefix(this.getId()), this.repairedNodes(), this.commonRange, this.keyspace, Arrays.toString(this.cfnames)});
        Tracing.traceRepair("Syncing range {}", this.commonRange);
        if (!this.previewKind.isPreview()) {
            SystemDistributedKeyspace.startRepairs(this.getId(), this.parentRepairSession, this.keyspace, this.cfnames, this.commonRange);
        }
        if (this.commonRange.endpoints.isEmpty()) {
            String message = String.format("No neighbors to repair with on range %s: session completed", this.commonRange);
            logger.info("{} {}", (Object)this.previewKind.logPrefix(this.getId()), (Object)message);
            Tracing.traceRepair(message, new Object[0]);
            this.set(new RepairSessionResult(this.id, this.keyspace, this.commonRange.ranges, Lists.newArrayList(), this.skippedReplicas));
            if (!this.previewKind.isPreview()) {
                SystemDistributedKeyspace.failRepairs(this.getId(), this.keyspace, this.cfnames, new RuntimeException(message));
            }
            return;
        }
        for (InetAddressAndPort endpoint : this.commonRange.endpoints) {
            if (FailureDetector.instance.isAlive(endpoint) || this.skippedReplicas) continue;
            String message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);
            logger.error("{} {}", (Object)this.previewKind.logPrefix(this.getId()), (Object)message);
            IOException e = new IOException(message);
            this.setException(e);
            if (!this.previewKind.isPreview()) {
                SystemDistributedKeyspace.failRepairs(this.getId(), this.keyspace, this.cfnames, e);
            }
            return;
        }
        ArrayList<RepairJob> jobs = new ArrayList<RepairJob>(this.cfnames.length);
        for (String cfname : this.cfnames) {
            RepairJob job = new RepairJob(this, cfname);
            executor.execute((Runnable)job);
            jobs.add(job);
        }
        Futures.addCallback((ListenableFuture)Futures.allAsList(jobs), (FutureCallback)new FutureCallback<List<RepairResult>>(){

            public void onSuccess(List<RepairResult> results) {
                logger.info("{} {}", (Object)RepairSession.this.previewKind.logPrefix(RepairSession.this.getId()), (Object)"Session completed successfully");
                Tracing.traceRepair("Completed sync of range {}", RepairSession.this.commonRange);
                RepairSession.this.set(new RepairSessionResult(RepairSession.this.id, RepairSession.this.keyspace, RepairSession.this.commonRange.ranges, results, RepairSession.this.skippedReplicas));
                RepairSession.this.taskExecutor.shutdown();
                RepairSession.this.terminate();
            }

            public void onFailure(Throwable t) {
                logger.error("{} Session completed with the following error", (Object)RepairSession.this.previewKind.logPrefix(RepairSession.this.getId()), (Object)t);
                Tracing.traceRepair("Session completed with the following error: {}", t);
                RepairSession.this.forceShutdown(t);
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    public void terminate() {
        this.terminated = true;
        this.validating.clear();
        this.syncingTasks.clear();
    }

    public void forceShutdown(Throwable reason) {
        this.setException(reason);
        this.taskExecutor.shutdownNow();
        this.terminate();
    }

    @Override
    public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {
    }

    @Override
    public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {
    }

    @Override
    public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {
    }

    @Override
    public void onAlive(InetAddressAndPort endpoint, EndpointState state) {
    }

    @Override
    public void onDead(InetAddressAndPort endpoint, EndpointState state) {
    }

    @Override
    public void onRemove(InetAddressAndPort endpoint) {
        this.convict(endpoint, Double.MAX_VALUE);
    }

    @Override
    public void onRestart(InetAddressAndPort endpoint, EndpointState epState) {
        this.convict(endpoint, Double.MAX_VALUE);
    }

    @Override
    public void convict(InetAddressAndPort endpoint, double phi) {
        if (!this.commonRange.endpoints.contains((Object)endpoint)) {
            return;
        }
        if (phi < 2.0 * DatabaseDescriptor.getPhiConvictThreshold()) {
            return;
        }
        if (!this.isFailed.compareAndSet(false, true)) {
            return;
        }
        IOException exception = new IOException(String.format("Endpoint %s died", endpoint));
        logger.error("{} session completed with the following error", (Object)this.previewKind.logPrefix(this.getId()), (Object)exception);
        this.forceShutdown(exception);
    }

    @Override
    public void onIRStateChange(LocalSession session) {
        if (this.previewKind == PreviewKind.REPAIRED && session.getState() == ConsistentSession.State.FINALIZED && this.includesTables((Set<TableId>)session.tableIds)) {
            for (Range range : session.ranges) {
                if (!((AbstractBounds)range).intersects(this.ranges())) continue;
                logger.error("{} An intersecting incremental repair with session id = {} finished, preview repair might not be accurate", (Object)this.previewKind.logPrefix(this.getId()), (Object)session.sessionID);
                this.forceShutdown(new Exception("An incremental repair with session id " + session.sessionID + " finished during this preview repair runtime"));
                return;
            }
        }
    }

    private boolean includesTables(Set<TableId> tableIds) {
        Keyspace ks = Keyspace.open(this.keyspace);
        if (ks != null) {
            for (String table : this.cfnames) {
                ColumnFamilyStore cfs = ks.getColumnFamilyStore(table);
                if (!tableIds.contains(cfs.metadata.id)) continue;
                return true;
            }
        }
        return false;
    }
}

