/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.repair.consistent;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.KeyspaceRepairManager;
import org.apache.cassandra.repair.NoSuchRepairSessionException;
import org.apache.cassandra.repair.consistent.ConsistentSession;
import org.apache.cassandra.repair.consistent.LocalSession;
import org.apache.cassandra.repair.consistent.LocalSessionInfo;
import org.apache.cassandra.repair.consistent.RepairedState;
import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
import org.apache.cassandra.repair.consistent.admin.PendingStat;
import org.apache.cassandra.repair.consistent.admin.PendingStats;
import org.apache.cassandra.repair.messages.FailSession;
import org.apache.cassandra.repair.messages.FinalizeCommit;
import org.apache.cassandra.repair.messages.FinalizePromise;
import org.apache.cassandra.repair.messages.FinalizePropose;
import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.StatusRequest;
import org.apache.cassandra.repair.messages.StatusResponse;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalSessions {
    private static final Logger logger = LoggerFactory.getLogger(LocalSessions.class);
    private static final Set<Listener> listeners = new CopyOnWriteArraySet<Listener>();
    static final int CHECK_STATUS_TIMEOUT = CassandraRelevantProperties.REPAIR_STATUS_CHECK_TIMEOUT_SECONDS.getInt();
    static final int AUTO_FAIL_TIMEOUT = CassandraRelevantProperties.REPAIR_FAIL_TIMEOUT_SECONDS.getInt();
    static final int AUTO_DELETE_TIMEOUT = CassandraRelevantProperties.REPAIR_DELETE_TIMEOUT_SECONDS.getInt();
    public static final int CLEANUP_INTERVAL = CassandraRelevantProperties.REPAIR_CLEANUP_INTERVAL_SECONDS.getInt();
    private final String keyspace = "system";
    private final String table = "repairs";
    private boolean started = false;
    private volatile ImmutableMap<TimeUUID, LocalSession> sessions = ImmutableMap.of();
    private volatile ImmutableMap<TableId, RepairedState> repairedStates = ImmutableMap.of();

    private static Set<TableId> uuidToTableId(Set<UUID> src) {
        return ImmutableSet.copyOf((Iterable)Iterables.transform(src, TableId::fromUUID));
    }

    private static Set<UUID> tableIdToUuid(Set<TableId> src) {
        return ImmutableSet.copyOf((Iterable)Iterables.transform(src, TableId::asUUID));
    }

    @VisibleForTesting
    int getNumSessions() {
        return this.sessions.size();
    }

    @VisibleForTesting
    protected InetAddressAndPort getBroadcastAddressAndPort() {
        return FBUtilities.getBroadcastAddressAndPort();
    }

    @VisibleForTesting
    protected boolean isAlive(InetAddressAndPort address) {
        return FailureDetector.instance.isAlive(address);
    }

    @VisibleForTesting
    protected boolean isNodeInitialized() {
        return StorageService.instance.isInitialized();
    }

    public List<Map<String, String>> sessionInfo(boolean all, Set<Range<Token>> ranges) {
        Object currentSessions = this.sessions.values();
        if (!all) {
            currentSessions = Iterables.filter((Iterable)currentSessions, s -> !s.isCompleted());
        }
        if (!ranges.isEmpty()) {
            currentSessions = Iterables.filter((Iterable)currentSessions, s -> s.intersects(ranges));
        }
        return Lists.newArrayList((Iterable)Iterables.transform((Iterable)currentSessions, LocalSessionInfo::sessionToMap));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private RepairedState getRepairedState(TableId tid) {
        if (!this.repairedStates.containsKey((Object)tid)) {
            LocalSessions localSessions = this;
            synchronized (localSessions) {
                if (!this.repairedStates.containsKey((Object)tid)) {
                    this.repairedStates = ImmutableMap.builder().putAll(this.repairedStates).put((Object)tid, (Object)new RepairedState()).build();
                }
            }
        }
        return (RepairedState)Verify.verifyNotNull((Object)((RepairedState)this.repairedStates.get((Object)tid)));
    }

    private void maybeUpdateRepairedState(LocalSession session) {
        if (!this.shouldStoreSession(session)) {
            return;
        }
        for (TableId tid : session.tableIds) {
            RepairedState state = this.getRepairedState(tid);
            state.add((Collection<Range<Token>>)session.ranges, session.repairedAt);
        }
    }

    private boolean shouldStoreSession(LocalSession session) {
        if (session.getState() != ConsistentSession.State.FINALIZED) {
            return false;
        }
        return session.repairedAt != 0L;
    }

    private boolean isSuperseded(LocalSession session) {
        for (TableId tid : session.tableIds) {
            RepairedState state = (RepairedState)this.repairedStates.get((Object)tid);
            if (state == null) {
                return false;
            }
            long minRepaired = state.minRepairedAt((Collection<Range<Token>>)session.ranges);
            if (minRepaired > session.repairedAt) continue;
            return false;
        }
        return true;
    }

    public RepairedState.Stats getRepairedStats(TableId tid, Collection<Range<Token>> ranges) {
        RepairedState state = (RepairedState)this.repairedStates.get((Object)tid);
        if (state == null) {
            return RepairedState.Stats.EMPTY;
        }
        return state.getRepairedStats(ranges);
    }

    public PendingStats getPendingStats(TableId tid, Collection<Range<Token>> ranges) {
        ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
        Preconditions.checkArgument((cfs != null ? 1 : 0) != 0);
        PendingStat.Builder pending = new PendingStat.Builder();
        PendingStat.Builder finalized = new PendingStat.Builder();
        PendingStat.Builder failed = new PendingStat.Builder();
        Map<TimeUUID, PendingStat> stats = cfs.getPendingRepairStats();
        block4: for (Map.Entry<TimeUUID, PendingStat> entry : stats.entrySet()) {
            TimeUUID sessionID = entry.getKey();
            PendingStat stat = entry.getValue();
            Verify.verify((boolean)sessionID.equals((TimeUUID)Iterables.getOnlyElement(stat.sessions)));
            LocalSession session = (LocalSession)this.sessions.get((Object)sessionID);
            Verify.verifyNotNull((Object)session);
            if (!Iterables.any(ranges, r -> ((AbstractBounds)r).intersects(session.ranges))) continue;
            switch (session.getState()) {
                case FINALIZED: {
                    finalized.addStat(stat);
                    continue block4;
                }
                case FAILED: {
                    failed.addStat(stat);
                    continue block4;
                }
            }
            pending.addStat(stat);
        }
        return new PendingStats(cfs.getKeyspaceName(), cfs.name, pending.build(), finalized.build(), failed.build());
    }

    public CleanupSummary cleanup(TableId tid, Collection<Range<Token>> ranges, boolean force) {
        Iterable candidates = Iterables.filter((Iterable)this.sessions.values(), ls -> ls.isCompleted() && ls.tableIds.contains((Object)tid) && Range.intersects((Iterable<Range<Token>>)ls.ranges, ranges));
        ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
        HashSet sessionIds = Sets.newHashSet((Iterable)Iterables.transform((Iterable)candidates, s -> s.sessionID));
        return cfs.releaseRepairData(sessionIds, force);
    }

    public void cancelSession(TimeUUID sessionID, boolean force) {
        logger.info("Cancelling local repair session {}", (Object)sessionID);
        LocalSession session = this.getSession(sessionID);
        Preconditions.checkArgument((session != null ? 1 : 0) != 0, (String)"Session {} does not exist", (Object)sessionID);
        Preconditions.checkArgument((force || session.coordinator.equals(this.getBroadcastAddressAndPort()) ? 1 : 0) != 0, (String)"Cancel session %s from it's coordinator (%s) or use --force", (Object)sessionID, (Object)session.coordinator);
        this.setStateAndSave(session, ConsistentSession.State.FAILED);
        Message<FailSession> message = Message.out(Verb.FAILED_SESSION_MSG, new FailSession(sessionID));
        for (InetAddressAndPort participant : session.participants) {
            if (participant.equals(this.getBroadcastAddressAndPort())) continue;
            this.sendMessage(participant, message);
        }
    }

    public synchronized void start() {
        Preconditions.checkArgument((!this.started ? 1 : 0) != 0, (Object)"LocalSessions.start can only be called once");
        Preconditions.checkArgument((boolean)this.sessions.isEmpty(), (Object)"No sessions should be added before start");
        UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(String.format("SELECT * FROM %s.%s", "system", "repairs"), 1000, new Object[0]);
        HashMap<TimeUUID, LocalSession> loadedSessions = new HashMap<TimeUUID, LocalSession>();
        HashMap<TableId, List> initialLevels = new HashMap<TableId, List>();
        for (UntypedResultSet.Row row : rows) {
            try {
                LocalSession session = this.load(row);
                loadedSessions.put(session.sessionID, session);
                if (!this.shouldStoreSession(session)) continue;
                for (TableId tid : session.tableIds) {
                    initialLevels.computeIfAbsent(tid, t -> new ArrayList()).add(new RepairedState.Level((Collection<Range<Token>>)session.ranges, session.repairedAt));
                }
            }
            catch (IllegalArgumentException | NullPointerException e) {
                logger.warn("Unable to load malformed repair session {}, removing", row.has("parent_id") ? row.getTimeUUID("parent_id") : null);
                if (!row.has("parent_id")) continue;
                this.deleteRow(row.getTimeUUID("parent_id"));
            }
        }
        for (Map.Entry entry : initialLevels.entrySet()) {
            this.getRepairedState((TableId)entry.getKey()).addAll((List)entry.getValue());
        }
        this.sessions = ImmutableMap.copyOf(loadedSessions);
        this.failOngoingRepairs();
        this.started = true;
    }

    public synchronized void stop() {
        if (!this.started) {
            return;
        }
        this.started = false;
        this.failOngoingRepairs();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void failOngoingRepairs() {
        UnmodifiableIterator unmodifiableIterator = this.sessions.values().iterator();
        block6: while (unmodifiableIterator.hasNext()) {
            LocalSession session;
            LocalSession localSession = session = (LocalSession)unmodifiableIterator.next();
            synchronized (localSession) {
                switch (session.getState()) {
                    case FINALIZED: 
                    case FAILED: 
                    case FINALIZE_PROMISED: {
                        continue block6;
                    }
                }
                logger.info("Found repair session {} with state = {} - failing the repair", (Object)session.sessionID, (Object)session.getState());
                this.failSession(session, true);
            }
        }
    }

    public boolean isStarted() {
        return this.started;
    }

    private static boolean shouldCheckStatus(LocalSession session, long now) {
        return !session.isCompleted() && now > session.getLastUpdate() + (long)CHECK_STATUS_TIMEOUT;
    }

    private static boolean shouldFail(LocalSession session, long now) {
        return !session.isCompleted() && now > session.getLastUpdate() + (long)AUTO_FAIL_TIMEOUT;
    }

    private static boolean shouldDelete(LocalSession session, long now) {
        return session.isCompleted() && now > session.getLastUpdate() + (long)AUTO_DELETE_TIMEOUT;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cleanup() {
        logger.trace("Running LocalSessions.cleanup");
        if (!this.isNodeInitialized()) {
            logger.trace("node not initialized, aborting local session cleanup");
            return;
        }
        HashSet currentSessions = new HashSet(this.sessions.values());
        Iterator iterator = currentSessions.iterator();
        while (iterator.hasNext()) {
            LocalSession session;
            LocalSession localSession = session = (LocalSession)iterator.next();
            synchronized (localSession) {
                long now = FBUtilities.nowInSeconds();
                if (LocalSessions.shouldFail(session, now)) {
                    logger.warn("Auto failing timed out repair session {}", (Object)session);
                    this.failSession(session.sessionID, false);
                } else if (LocalSessions.shouldDelete(session, now)) {
                    if (session.getState() == ConsistentSession.State.FINALIZED && !this.isSuperseded(session)) {
                        logger.info("Skipping delete of FINALIZED LocalSession {} because it has not been superseded by a more recent session", (Object)session.sessionID);
                    } else if (!this.sessionHasData(session)) {
                        logger.info("Auto deleting repair session {}", (Object)session);
                        this.deleteSession(session.sessionID);
                    } else {
                        logger.warn("Skipping delete of LocalSession {} because it still contains sstables", (Object)session.sessionID);
                    }
                } else if (LocalSessions.shouldCheckStatus(session, now)) {
                    this.sendStatusRequest(session);
                }
            }
        }
    }

    private static ByteBuffer serializeRange(Range<Token> range) {
        int size = (int)Token.serializer.serializedSize((Token)range.left, 0);
        size += (int)Token.serializer.serializedSize((Token)range.right, 0);
        DataOutputBuffer buffer = new DataOutputBuffer(size);
        try {
            Token.serializer.serialize((Token)range.left, (DataOutputPlus)buffer, 0);
            Token.serializer.serialize((Token)range.right, (DataOutputPlus)buffer, 0);
            ByteBuffer byteBuffer = buffer.buffer();
            buffer.close();
            return byteBuffer;
        }
        catch (Throwable throwable) {
            try {
                try {
                    buffer.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static Set<ByteBuffer> serializeRanges(Set<Range<Token>> ranges) {
        HashSet<ByteBuffer> buffers = new HashSet<ByteBuffer>(ranges.size());
        ranges.forEach(r -> buffers.add(LocalSessions.serializeRange(r)));
        return buffers;
    }

    private static Range<Token> deserializeRange(ByteBuffer bb) {
        DataInputBuffer in = new DataInputBuffer(bb, false);
        try {
            IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
            Token left = Token.serializer.deserialize(in, partitioner, 0);
            Token right = Token.serializer.deserialize(in, partitioner, 0);
            Range<Token> range = new Range<Token>(left, right);
            in.close();
            return range;
        }
        catch (Throwable throwable) {
            try {
                try {
                    in.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static Set<Range<Token>> deserializeRanges(Set<ByteBuffer> buffers) {
        HashSet<Range<Token>> ranges = new HashSet<Range<Token>>(buffers.size());
        buffers.forEach(bb -> ranges.add(LocalSessions.deserializeRange(bb)));
        return ranges;
    }

    @VisibleForTesting
    void save(LocalSession session) {
        String query = "INSERT INTO %s.%s (parent_id, started_at, last_update, repaired_at, state, coordinator, coordinator_port, participants, participants_wp,ranges, cfids) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
        QueryProcessor.executeInternal(String.format(query, "system", "repairs"), session.sessionID, Date.from(Instant.ofEpochSecond(session.startedAt)), Date.from(Instant.ofEpochSecond(session.getLastUpdate())), Date.from(Instant.ofEpochMilli(session.repairedAt)), session.getState().ordinal(), session.coordinator.getAddress(), session.coordinator.getPort(), session.participants.stream().map(participant -> participant.getAddress()).collect(Collectors.toSet()), session.participants.stream().map(participant -> participant.getHostAddressAndPort()).collect(Collectors.toSet()), LocalSessions.serializeRanges((Set<Range<Token>>)session.ranges), LocalSessions.tableIdToUuid((Set<TableId>)session.tableIds));
        this.maybeUpdateRepairedState(session);
    }

    private static int dateToSeconds(Date d) {
        return Ints.checkedCast((long)TimeUnit.MILLISECONDS.toSeconds(d.getTime()));
    }

    private LocalSession load(UntypedResultSet.Row row) {
        LocalSession.Builder builder = LocalSession.builder();
        builder.withState(ConsistentSession.State.valueOf(row.getInt("state")));
        builder.withSessionID(row.getTimeUUID("parent_id"));
        InetAddressAndPort coordinator = InetAddressAndPort.getByAddressOverrideDefaults(row.getInetAddress("coordinator"), row.getInt("coordinator_port"));
        builder.withCoordinator(coordinator);
        builder.withTableIds(LocalSessions.uuidToTableId(row.getSet("cfids", UUIDType.instance)));
        builder.withRepairedAt(row.getTimestamp("repaired_at").getTime());
        builder.withRanges(LocalSessions.deserializeRanges(row.getSet("ranges", BytesType.instance)));
        Set<String> participants = row.getSet("participants_wp", UTF8Type.instance);
        builder.withParticipants(participants.stream().map(participant -> {
            try {
                return InetAddressAndPort.getByName(participant);
            }
            catch (UnknownHostException e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toSet()));
        builder.withStartedAt(LocalSessions.dateToSeconds(row.getTimestamp("started_at")));
        builder.withLastUpdate(LocalSessions.dateToSeconds(row.getTimestamp("last_update")));
        return this.buildSession(builder);
    }

    private void deleteRow(TimeUUID sessionID) {
        String query = "DELETE FROM %s.%s WHERE parent_id=?";
        QueryProcessor.executeInternal(String.format(query, "system", "repairs"), sessionID);
    }

    private void syncTable() {
        TableId tid = Schema.instance.getTableMetadata((String)"system", (String)"repairs").id;
        ColumnFamilyStore cfm = Schema.instance.getColumnFamilyStoreInstance(tid);
        cfm.forceBlockingFlush(ColumnFamilyStore.FlushReason.INTERNALLY_FORCED);
    }

    @VisibleForTesting
    LocalSession loadUnsafe(TimeUUID sessionId) {
        String query = "SELECT * FROM %s.%s WHERE parent_id=?";
        UntypedResultSet result = QueryProcessor.executeInternal(String.format(query, "system", "repairs"), sessionId);
        if (result.isEmpty()) {
            return null;
        }
        UntypedResultSet.Row row = result.one();
        return this.load(row);
    }

    @VisibleForTesting
    protected LocalSession buildSession(LocalSession.Builder builder) {
        return new LocalSession(builder);
    }

    public LocalSession getSession(TimeUUID sessionID) {
        return (LocalSession)this.sessions.get((Object)sessionID);
    }

    @VisibleForTesting
    synchronized void putSessionUnsafe(LocalSession session) {
        this.putSession(session);
        this.save(session);
    }

    private synchronized void putSession(LocalSession session) {
        Preconditions.checkArgument((!this.sessions.containsKey((Object)session.sessionID) ? 1 : 0) != 0, (String)"LocalSession %s already exists", (Object)session.sessionID);
        Preconditions.checkArgument((boolean)this.started, (Object)"sessions cannot be added before LocalSessions is started");
        this.sessions = ImmutableMap.builder().putAll(this.sessions).put((Object)session.sessionID, (Object)session).build();
    }

    private synchronized void removeSession(TimeUUID sessionID) {
        Preconditions.checkArgument((sessionID != null ? 1 : 0) != 0);
        HashMap<TimeUUID, LocalSession> temp = new HashMap<TimeUUID, LocalSession>((Map<TimeUUID, LocalSession>)this.sessions);
        temp.remove(sessionID);
        this.sessions = ImmutableMap.copyOf(temp);
    }

    @VisibleForTesting
    LocalSession createSessionUnsafe(TimeUUID sessionId, ActiveRepairService.ParentRepairSession prs, Set<InetAddressAndPort> peers) {
        LocalSession.Builder builder = LocalSession.builder();
        builder.withState(ConsistentSession.State.PREPARING);
        builder.withSessionID(sessionId);
        builder.withCoordinator(prs.coordinator);
        builder.withTableIds(prs.getTableIds());
        builder.withRepairedAt(prs.repairedAt);
        builder.withRanges(prs.getRanges());
        builder.withParticipants(peers);
        long now = FBUtilities.nowInSeconds();
        builder.withStartedAt(now);
        builder.withLastUpdate(now);
        return this.buildSession(builder);
    }

    protected ActiveRepairService.ParentRepairSession getParentRepairSession(TimeUUID sessionID) throws NoSuchRepairSessionException {
        return ActiveRepairService.instance.getParentRepairSession(sessionID);
    }

    protected void sendMessage(InetAddressAndPort destination, Message<? extends RepairMessage> message) {
        logger.trace("sending {} to {}", message.payload, (Object)destination);
        MessagingService.instance().send(message, destination);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void setStateAndSave(LocalSession session, ConsistentSession.State state) {
        LocalSession localSession = session;
        synchronized (localSession) {
            Preconditions.checkArgument((boolean)session.getState().canTransitionTo(state), (String)"Invalid state transition %s -> %s", (Object)((Object)session.getState()), (Object)((Object)state));
            logger.trace("Changing LocalSession state from {} -> {} for {}", new Object[]{session.getState(), state, session.sessionID});
            boolean wasCompleted = session.isCompleted();
            session.setState(state);
            session.setLastUpdate();
            this.save(session);
            if (session.isCompleted() && !wasCompleted) {
                this.sessionCompleted(session);
            }
            for (Listener listener : listeners) {
                listener.onIRStateChange(session);
            }
        }
    }

    public void failSession(TimeUUID sessionID) {
        this.failSession(sessionID, true);
    }

    public void failSession(TimeUUID sessionID, boolean sendMessage) {
        this.failSession(this.getSession(sessionID), sendMessage);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void failSession(LocalSession session, boolean sendMessage) {
        if (session != null) {
            LocalSession localSession = session;
            synchronized (localSession) {
                if (session.getState() == ConsistentSession.State.FINALIZED) {
                    logger.error("Can't change the state of session {} from FINALIZED to FAILED", (Object)session.sessionID, (Object)new RuntimeException());
                    return;
                }
                if (session.getState() != ConsistentSession.State.FAILED) {
                    logger.info("Failing local repair session {}", (Object)session.sessionID);
                    this.setStateAndSave(session, ConsistentSession.State.FAILED);
                }
            }
            if (sendMessage) {
                this.sendMessage(session.coordinator, Message.out(Verb.FAILED_SESSION_MSG, new FailSession(session.sessionID)));
            }
        }
    }

    public synchronized void deleteSession(TimeUUID sessionID) {
        logger.info("Deleting local repair session {}", (Object)sessionID);
        LocalSession session = this.getSession(sessionID);
        Preconditions.checkArgument((boolean)session.isCompleted(), (Object)"Cannot delete incomplete sessions");
        this.deleteRow(sessionID);
        this.removeSession(sessionID);
    }

    @VisibleForTesting
    Future<List<Void>> prepareSession(KeyspaceRepairManager repairManager, TimeUUID sessionID, Collection<ColumnFamilyStore> tables, RangesAtEndpoint tokenRanges, ExecutorService executor, BooleanSupplier isCancelled) {
        return repairManager.prepareIncrementalRepair(sessionID, tables, tokenRanges, executor, isCancelled);
    }

    RangesAtEndpoint filterLocalRanges(String keyspace, Set<Range<Token>> ranges) {
        RangesAtEndpoint localRanges = StorageService.instance.getLocalReplicas(keyspace);
        RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(localRanges.endpoint());
        for (Range<Token> range : ranges) {
            for (Replica replica : localRanges) {
                if (replica.range().equals(range)) {
                    builder.add(replica);
                    continue;
                }
                if (!replica.contains(range)) continue;
                builder.add(replica.decorateSubrange(range));
            }
        }
        return builder.build();
    }

    public void handlePrepareMessage(InetAddressAndPort from, PrepareConsistentRequest request) {
        ActiveRepairService.ParentRepairSession parentSession;
        logger.trace("received {} from {}", (Object)request, (Object)from);
        final TimeUUID sessionID = request.parentSession;
        final InetAddressAndPort coordinator = request.coordinator;
        Set<InetAddressAndPort> peers = request.participants;
        try {
            parentSession = this.getParentRepairSession(sessionID);
        }
        catch (Throwable e) {
            logger.error("Error retrieving ParentRepairSession for session {}, responding with failure", (Object)sessionID);
            this.sendMessage(coordinator, Message.out(Verb.PREPARE_CONSISTENT_RSP, new PrepareConsistentResponse(sessionID, this.getBroadcastAddressAndPort(), false)));
            return;
        }
        final LocalSession session = this.createSessionUnsafe(sessionID, parentSession, peers);
        this.putSessionUnsafe(session);
        logger.info("Beginning local incremental repair session {}", (Object)session);
        final Object executor = ExecutorFactory.Global.executorFactory().pooled("Repair-" + sessionID, parentSession.getColumnFamilyStores().size());
        KeyspaceRepairManager repairManager = parentSession.getKeyspace().getRepairManager();
        RangesAtEndpoint tokenRanges = this.filterLocalRanges(parentSession.getKeyspace().getName(), parentSession.getRanges());
        Future<List<Void>> repairPreparation = this.prepareSession(repairManager, sessionID, parentSession.getColumnFamilyStores(), tokenRanges, (ExecutorService)executor, () -> session.getState() != ConsistentSession.State.PREPARING);
        repairPreparation.addCallback(new FutureCallback<List<Void>>(){

            public void onSuccess(@Nullable List<Void> result) {
                try {
                    logger.info("Prepare phase for incremental repair session {} completed", (Object)sessionID);
                    if (!LocalSessions.this.prepareSessionExceptFailed(session)) {
                        logger.info("Session {} failed before anticompaction completed", (Object)sessionID);
                    }
                    Message<PrepareConsistentResponse> message = Message.out(Verb.PREPARE_CONSISTENT_RSP, new PrepareConsistentResponse(sessionID, LocalSessions.this.getBroadcastAddressAndPort(), session.getState() != ConsistentSession.State.FAILED));
                    LocalSessions.this.sendMessage(coordinator, message);
                }
                finally {
                    executor.shutdown();
                }
            }

            public void onFailure(Throwable t) {
                try {
                    if (Throwables.anyCauseMatches(t, throwable -> throwable instanceof CompactionInterruptedException)) {
                        logger.info("Anticompaction interrupted for session {}: {}", (Object)sessionID, (Object)t.getMessage());
                    } else if (Throwables.anyCauseMatches(t, throwable -> throwable instanceof NoSuchRepairSessionException)) {
                        logger.warn("No such repair session: {}", (Object)sessionID);
                    } else {
                        logger.error("Prepare phase for incremental repair session {} failed", (Object)sessionID, (Object)t);
                    }
                    LocalSessions.this.sendMessage(coordinator, Message.out(Verb.PREPARE_CONSISTENT_RSP, new PrepareConsistentResponse(sessionID, LocalSessions.this.getBroadcastAddressAndPort(), false)));
                    LocalSessions.this.failSession(sessionID, false);
                }
                finally {
                    executor.shutdown();
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean prepareSessionExceptFailed(LocalSession session) {
        LocalSession localSession = session;
        synchronized (localSession) {
            if (session.getState() == ConsistentSession.State.FAILED) {
                return false;
            }
            this.setStateAndSave(session, ConsistentSession.State.PREPARED);
            return true;
        }
    }

    public void maybeSetRepairing(TimeUUID sessionID) {
        LocalSession session = this.getSession(sessionID);
        if (session != null && session.getState() != ConsistentSession.State.REPAIRING) {
            logger.info("Setting local incremental repair session {} to REPAIRING", (Object)session);
            this.setStateAndSave(session, ConsistentSession.State.REPAIRING);
        }
    }

    public void handleFinalizeProposeMessage(InetAddressAndPort from, FinalizePropose propose) {
        logger.trace("received {} from {}", (Object)propose, (Object)from);
        TimeUUID sessionID = propose.sessionID;
        LocalSession session = this.getSession(sessionID);
        if (session == null) {
            logger.info("Received FinalizePropose message for unknown repair session {}, responding with failure", (Object)sessionID);
            this.sendMessage(from, Message.out(Verb.FAILED_SESSION_MSG, new FailSession(sessionID)));
            return;
        }
        try {
            this.setStateAndSave(session, ConsistentSession.State.FINALIZE_PROMISED);
            this.syncTable();
            this.sendMessage(from, Message.out(Verb.FINALIZE_PROMISE_MSG, new FinalizePromise(sessionID, this.getBroadcastAddressAndPort(), true)));
            logger.info("Received FinalizePropose message for incremental repair session {}, responded with FinalizePromise", (Object)sessionID);
        }
        catch (IllegalArgumentException e) {
            logger.error("Error handling FinalizePropose message for {}", (Object)session, (Object)e);
            this.failSession(sessionID);
        }
    }

    @VisibleForTesting
    protected void sessionCompleted(LocalSession session) {
        for (TableId tid : session.tableIds) {
            ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
            if (cfs == null) continue;
            cfs.getRepairManager().incrementalSessionCompleted(session.sessionID);
        }
    }

    public void handleFinalizeCommitMessage(InetAddressAndPort from, FinalizeCommit commit) {
        logger.trace("received {} from {}", (Object)commit, (Object)from);
        TimeUUID sessionID = commit.sessionID;
        LocalSession session = this.getSession(sessionID);
        if (session == null) {
            logger.warn("Ignoring FinalizeCommit message for unknown repair session {}", (Object)sessionID);
            return;
        }
        this.setStateAndSave(session, ConsistentSession.State.FINALIZED);
        logger.info("Finalized local repair session {}", (Object)sessionID);
    }

    public void handleFailSessionMessage(InetAddressAndPort from, FailSession msg) {
        logger.trace("received {} from {}", (Object)msg, (Object)from);
        this.failSession(msg.sessionID, false);
    }

    public void sendStatusRequest(LocalSession session) {
        logger.info("Attempting to learn the outcome of unfinished local incremental repair session {}", (Object)session.sessionID);
        Message<StatusRequest> request = Message.out(Verb.STATUS_REQ, new StatusRequest(session.sessionID));
        for (InetAddressAndPort participant : session.participants) {
            if (this.getBroadcastAddressAndPort().equals(participant) || !this.isAlive(participant)) continue;
            this.sendMessage(participant, request);
        }
    }

    public void handleStatusRequest(InetAddressAndPort from, StatusRequest request) {
        logger.trace("received {} from {}", (Object)request, (Object)from);
        TimeUUID sessionID = request.sessionID;
        LocalSession session = this.getSession(sessionID);
        if (session == null) {
            logger.warn("Received status request message for unknown session {}", (Object)sessionID);
            this.sendMessage(from, Message.out(Verb.STATUS_RSP, new StatusResponse(sessionID, ConsistentSession.State.FAILED)));
        } else {
            this.sendMessage(from, Message.out(Verb.STATUS_RSP, new StatusResponse(sessionID, session.getState())));
            logger.info("Responding to status response message for incremental repair session {} with local state {}", (Object)sessionID, (Object)session.getState());
        }
    }

    public void handleStatusResponse(InetAddressAndPort from, StatusResponse response) {
        logger.trace("received {} from {}", (Object)response, (Object)from);
        TimeUUID sessionID = response.sessionID;
        LocalSession session = this.getSession(sessionID);
        if (session == null) {
            logger.warn("Received StatusResponse message for unknown repair session {}", (Object)sessionID);
            return;
        }
        if (response.state == ConsistentSession.State.FINALIZED || response.state == ConsistentSession.State.FAILED) {
            this.setStateAndSave(session, response.state);
            logger.info("Unfinished local incremental repair session {} set to state {}", (Object)sessionID, (Object)response.state);
        } else {
            logger.info("Received StatusResponse for repair session {} with state {}, which is not actionable. Doing nothing.", (Object)sessionID, (Object)response.state);
        }
    }

    public boolean isSessionInProgress(TimeUUID sessionID) {
        LocalSession session = this.getSession(sessionID);
        return session != null && session.getState() != ConsistentSession.State.FINALIZED && session.getState() != ConsistentSession.State.FAILED;
    }

    public boolean isSessionFinalized(TimeUUID sessionID) {
        LocalSession session = this.getSession(sessionID);
        return session != null && session.getState() == ConsistentSession.State.FINALIZED;
    }

    public boolean sessionExists(TimeUUID sessionID) {
        return this.getSession(sessionID) != null;
    }

    @VisibleForTesting
    protected boolean sessionHasData(LocalSession session) {
        Predicate<TableId> predicate = tid -> {
            ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance((TableId)tid);
            return cfs != null && cfs.getCompactionStrategyManager().hasDataForPendingRepair(session.sessionID);
        };
        return Iterables.any((Iterable)session.tableIds, predicate::test);
    }

    public long getFinalSessionRepairedAt(TimeUUID sessionID) {
        LocalSession session = this.getSession(sessionID);
        if (session == null || session.getState() == ConsistentSession.State.FAILED) {
            return 0L;
        }
        if (session.getState() == ConsistentSession.State.FINALIZED) {
            return session.repairedAt;
        }
        throw new IllegalStateException("Cannot get final repaired at value for in progress session: " + session);
    }

    public static void registerListener(Listener listener) {
        listeners.add(listener);
    }

    public static void unregisterListener(Listener listener) {
        listeners.remove(listener);
    }

    public static interface Listener {
        public void onIRStateChange(LocalSession var1);
    }
}

