/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.repair;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.repair.RepairSession;
import org.apache.cassandra.repair.RepairSessionResult;
import org.apache.cassandra.repair.SystemDistributedKeyspace;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.progress.ProgressEvent;
import org.apache.cassandra.utils.progress.ProgressEventNotifier;
import org.apache.cassandra.utils.progress.ProgressEventType;
import org.apache.cassandra.utils.progress.ProgressListener;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RepairRunnable
extends WrappedRunnable
implements ProgressEventNotifier {
    private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class);
    private StorageService storageService;
    private final int cmd;
    private final RepairOption options;
    private final String keyspace;
    private final List<ProgressListener> listeners = new ArrayList<ProgressListener>();

    public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace) {
        this.storageService = storageService;
        this.cmd = cmd;
        this.options = options;
        this.keyspace = keyspace;
    }

    @Override
    public void addProgressListener(ProgressListener listener) {
        this.listeners.add(listener);
    }

    @Override
    public void removeProgressListener(ProgressListener listener) {
        this.listeners.remove(listener);
    }

    protected void fireProgressEvent(String tag, ProgressEvent event) {
        for (ProgressListener listener : this.listeners) {
            listener.progress(tag, event);
        }
    }

    protected void fireErrorAndComplete(String tag, int progressCount, int totalProgress, String message) {
        this.fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, message));
        this.fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress));
    }

    @Override
    protected void runMayThrow() throws Exception {
        long repairedAt;
        TraceState traceState;
        final String tag = "repair:" + this.cmd;
        final AtomicInteger progress = new AtomicInteger();
        final int totalProgress = 3 + this.options.getRanges().size();
        String[] columnFamilies = this.options.getColumnFamilies().toArray(new String[this.options.getColumnFamilies().size()]);
        Iterable<ColumnFamilyStore> validColumnFamilies = this.storageService.getValidColumnFamilies(false, false, this.keyspace, columnFamilies);
        final long startTime = System.currentTimeMillis();
        String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", this.cmd, this.keyspace, this.options);
        logger.info(message);
        this.fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 100, message));
        if (this.options.isTraced()) {
            StringBuilder cfsb = new StringBuilder();
            for (ColumnFamilyStore columnFamilyStore : validColumnFamilies) {
                cfsb.append(", ").append(columnFamilyStore.keyspace.getName()).append(".").append(columnFamilyStore.name);
            }
            UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
            traceState = Tracing.instance.begin("repair", (Map<String, String>)ImmutableMap.of((Object)"keyspace", (Object)this.keyspace, (Object)"columnFamilies", (Object)cfsb.substring(2)));
            Tracing.traceRepair(message, new Object[0]);
            traceState.enableActivityNotification(tag);
            for (ProgressListener listener : this.listeners) {
                traceState.addProgressListener(listener);
            }
            Thread thread = this.createQueryThread(this.cmd, sessionId);
            thread.setName("RepairTracePolling");
            thread.start();
        } else {
            traceState = null;
        }
        final HashSet<InetAddress> allNeighbors = new HashSet<InetAddress>();
        ArrayList<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges = new ArrayList<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>>();
        try {
            for (Range<Token> range : this.options.getRanges()) {
                Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(this.keyspace, range, this.options.getDataCenters(), this.options.getHosts());
                this.addRangeToNeighbors(commonRanges, range, neighbors);
                allNeighbors.addAll(neighbors);
            }
            progress.incrementAndGet();
        }
        catch (IllegalArgumentException illegalArgumentException) {
            logger.error("Repair failed:", (Throwable)illegalArgumentException);
            this.fireErrorAndComplete(tag, progress.get(), totalProgress, illegalArgumentException.getMessage());
            return;
        }
        ArrayList<ColumnFamilyStore> arrayList = new ArrayList<ColumnFamilyStore>();
        try {
            Iterables.addAll(arrayList, validColumnFamilies);
            progress.incrementAndGet();
        }
        catch (IllegalArgumentException e) {
            this.fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage());
            return;
        }
        String[] cfnames = new String[arrayList.size()];
        for (int i = 0; i < arrayList.size(); ++i) {
            cfnames[i] = ((ColumnFamilyStore)arrayList.get((int)i)).name;
        }
        final UUID parentSession = UUIDGen.getTimeUUID();
        SystemDistributedKeyspace.startParentRepair(parentSession, this.keyspace, cfnames, this.options.getRanges());
        try {
            ActiveRepairService.instance.prepareForRepair(parentSession, allNeighbors, this.options, arrayList);
            repairedAt = ActiveRepairService.instance.getParentRepairSession((UUID)parentSession).repairedAt;
            progress.incrementAndGet();
        }
        catch (Throwable t) {
            SystemDistributedKeyspace.failParentRepair(parentSession, t);
            this.fireErrorAndComplete(tag, progress.get(), totalProgress, t.getMessage());
            return;
        }
        final ListeningExecutorService executor = MoreExecutors.listeningDecorator((ExecutorService)new JMXConfigurableThreadPoolExecutor(this.options.getJobThreads(), Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("Repair#" + this.cmd), "internal"));
        ArrayList<RepairSession> futures = new ArrayList<RepairSession>(this.options.getRanges().size());
        for (Pair pair : commonRanges) {
            final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, (Collection)pair.right, this.keyspace, this.options.getParallelism(), (Set)pair.left, repairedAt, executor, cfnames);
            if (session == null) continue;
            Futures.addCallback((ListenableFuture)session, (FutureCallback)new FutureCallback<RepairSessionResult>(){

                public void onSuccess(RepairSessionResult result) {
                    String message = String.format("Repair session %s for range %s finished", session.getId(), session.getRanges().toString());
                    logger.info(message);
                    RepairRunnable.this.fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, progress.incrementAndGet(), totalProgress, message));
                }

                public void onFailure(Throwable t) {
                    String message = String.format("Repair session %s for range %s failed with error %s", session.getId(), session.getRanges().toString(), t.getMessage());
                    logger.error(message, t);
                    RepairRunnable.this.fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, progress.incrementAndGet(), totalProgress, message));
                }
            });
            futures.add(session);
        }
        final ArrayList successfulRanges = new ArrayList();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        ListenableFuture allSessions = Futures.successfulAsList(futures);
        ListenableFuture anticompactionResult = Futures.transform((ListenableFuture)allSessions, (AsyncFunction)new AsyncFunction<List<RepairSessionResult>, Object>(){

            public ListenableFuture apply(List<RepairSessionResult> results) throws Exception {
                for (RepairSessionResult sessionResult : results) {
                    if (sessionResult != null) {
                        successfulRanges.addAll(sessionResult.ranges);
                        continue;
                    }
                    atomicBoolean.compareAndSet(false, true);
                }
                return ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges);
            }
        });
        Futures.addCallback((ListenableFuture)anticompactionResult, (FutureCallback)new FutureCallback<Object>(){

            public void onSuccess(Object result) {
                SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges);
                if (atomicBoolean.get()) {
                    RepairRunnable.this.fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, "Some repair failed"));
                } else {
                    RepairRunnable.this.fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress, "Repair completed successfully"));
                }
                this.repairComplete();
            }

            public void onFailure(Throwable t) {
                RepairRunnable.this.fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage()));
                SystemDistributedKeyspace.failParentRepair(parentSession, t);
                this.repairComplete();
            }

            private void repairComplete() {
                String duration = DurationFormatUtils.formatDurationWords((long)(System.currentTimeMillis() - startTime), (boolean)true, (boolean)true);
                String message = String.format("Repair command #%d finished in %s", RepairRunnable.this.cmd, duration);
                RepairRunnable.this.fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message));
                logger.info(message);
                if (RepairRunnable.this.options.isTraced() && traceState != null) {
                    for (ProgressListener listener : RepairRunnable.this.listeners) {
                        traceState.removeProgressListener(listener);
                    }
                    Tracing.instance.set(traceState);
                    Tracing.traceRepair(message, new Object[0]);
                    Tracing.instance.stopSession();
                }
                executor.shutdownNow();
            }
        });
    }

    private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors) {
        for (int i = 0; i < neighborRangeList.size(); ++i) {
            Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p = neighborRangeList.get(i);
            if (!((Set)p.left).containsAll(neighbors)) continue;
            ((Collection)p.right).add(range);
            return;
        }
        ArrayList<Range<Token>> ranges = new ArrayList<Range<Token>>();
        ranges.add(range);
        neighborRangeList.add(Pair.create(neighbors, ranges));
    }

    private Thread createQueryThread(final int cmd, final UUID sessionId) {
        return new Thread(new WrappedRunnable(){

            @Override
            public void runMayThrow() throws Exception {
                TraceState.Status status;
                TraceState state = Tracing.instance.get(sessionId);
                if (state == null) {
                    throw new Exception("no tracestate");
                }
                String format = "select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;";
                String query = String.format(format, "system_traces", "events");
                SelectStatement statement = (SelectStatement)QueryProcessor.parseStatement((String)query).prepare().statement;
                ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
                InetAddress source = FBUtilities.getBroadcastAddress();
                HashSet[] seen = new HashSet[]{new HashSet(), new HashSet()};
                int si = 0;
                long tlast = System.currentTimeMillis();
                long minWaitMillis = 125L;
                long maxWaitMillis = 1024000L;
                long timeout = minWaitMillis;
                boolean shouldDouble = false;
                while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED) {
                    if (status == TraceState.Status.IDLE) {
                        timeout = shouldDouble ? Math.min(timeout * 2L, maxWaitMillis) : timeout;
                        shouldDouble = !shouldDouble;
                    } else {
                        timeout = minWaitMillis;
                        shouldDouble = false;
                    }
                    ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000L));
                    long tcur = System.currentTimeMillis();
                    ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur));
                    QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList((Object[])new ByteBuffer[]{sessionIdBytes, tminBytes, tmaxBytes}));
                    ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options);
                    UntypedResultSet result = UntypedResultSet.create(rows.result);
                    for (UntypedResultSet.Row r : result) {
                        if (source.equals(r.getInetAddress("source"))) continue;
                        UUID uuid = r.getUUID("event_id");
                        if (uuid.timestamp() > (tcur - 1000L) * 10000L) {
                            seen[si].add(uuid);
                        }
                        if (seen[si == 0 ? 1 : 0].contains(uuid)) continue;
                        String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity"));
                        RepairRunnable.this.fireProgressEvent("repair:" + cmd, new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message));
                    }
                    tlast = tcur;
                    si = si == 0 ? 1 : 0;
                    seen[si].clear();
                }
            }
        });
    }
}

