/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.commitlog;

import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CRC32;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogArchiver;
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.db.commitlog.IntervalSet;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileSegmentInputStream;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.RebufferingInputStream;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang3.StringUtils;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitLogReplayer {
    static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors";
    private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
    private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024);
    private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
    private final Set<Keyspace> keyspacesRecovered = new NonBlockingHashSet();
    private final List<Future<?>> futures = new ArrayList();
    private final Map<UUID, AtomicInteger> invalidMutations;
    private final AtomicInteger replayedCount;
    private final Map<UUID, IntervalSet<ReplayPosition>> cfPersisted;
    private final ReplayPosition globalPosition;
    private final CRC32 checksum;
    private byte[] buffer = new byte[4096];
    private byte[] uncompressedBuffer = new byte[4096];
    private final ReplayFilter replayFilter;
    private final CommitLogArchiver archiver;

    CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, IntervalSet<ReplayPosition>> cfPersisted, ReplayFilter replayFilter) {
        this.invalidMutations = new HashMap<UUID, AtomicInteger>();
        this.replayedCount = new AtomicInteger();
        this.checksum = new CRC32();
        this.cfPersisted = cfPersisted;
        this.globalPosition = globalPosition;
        this.replayFilter = replayFilter;
        this.archiver = commitLog.archiver;
    }

    public static CommitLogReplayer construct(CommitLog commitLog) {
        HashMap<UUID, IntervalSet<ReplayPosition>> cfPersisted = new HashMap<UUID, IntervalSet<ReplayPosition>>();
        ReplayFilter replayFilter = ReplayFilter.create();
        for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) {
            ReplayPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
            if (truncatedAt != null) {
                long restoreTime = commitLog.archiver.restorePointInTime;
                long truncatedTime = SystemKeyspace.getTruncatedAt(cfs.metadata.cfId);
                if (truncatedTime > restoreTime && replayFilter.includes(cfs.metadata)) {
                    logger.info("Restore point in time is before latest truncation of table {}.{}. Clearing truncation record.", (Object)cfs.metadata.ksName, (Object)cfs.metadata.cfName);
                    SystemKeyspace.removeTruncationRecord(cfs.metadata.cfId);
                    truncatedAt = null;
                }
            }
            IntervalSet<ReplayPosition> filter = CommitLogReplayer.persistedIntervals(cfs.getLiveSSTables(), truncatedAt);
            cfPersisted.put(cfs.metadata.cfId, filter);
        }
        ReplayPosition globalPosition = CommitLogReplayer.firstNotCovered(cfPersisted.values());
        logger.debug("Global replay position is {} from columnfamilies {}", (Object)globalPosition, (Object)FBUtilities.toString(cfPersisted));
        return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter);
    }

    public void recover(File[] clogs) throws IOException {
        for (int i = 0; i < clogs.length; ++i) {
            this.recover(clogs[i], i + 1 == clogs.length);
        }
    }

    public static IntervalSet<ReplayPosition> persistedIntervals(Iterable<SSTableReader> onDisk, ReplayPosition truncatedAt) {
        IntervalSet.Builder<ReplayPosition> builder = new IntervalSet.Builder<ReplayPosition>();
        for (SSTableReader reader : onDisk) {
            builder.addAll(reader.getSSTableMetadata().commitLogIntervals);
        }
        if (truncatedAt != null) {
            builder.add(ReplayPosition.NONE, truncatedAt);
        }
        return builder.build();
    }

    public static ReplayPosition firstNotCovered(Collection<IntervalSet<ReplayPosition>> ranges) {
        return ranges.stream().map(intervals -> (ReplayPosition)Iterables.getFirst(intervals.ends(), (Object)ReplayPosition.NONE)).min((Comparator<ReplayPosition>)Ordering.natural()).get();
    }

    public int blockForWrites() {
        for (Map.Entry<UUID, AtomicInteger> entry : this.invalidMutations.entrySet()) {
            logger.warn(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey()));
        }
        FBUtilities.waitOnFutures(this.futures);
        logger.trace("Finished waiting on mutations from recovery");
        this.futures.clear();
        boolean flushingSystem = false;
        for (Keyspace keyspace : this.keyspacesRecovered) {
            if (keyspace.getName().equals("system")) {
                flushingSystem = true;
            }
            this.futures.addAll(keyspace.flush());
        }
        if (!flushingSystem) {
            this.futures.add((Future<?>)Keyspace.open("system").getColumnFamilyStore("batches").forceFlush());
        }
        FBUtilities.waitOnFutures(this.futures);
        return this.replayedCount.get();
    }

    private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException {
        if ((long)offset > reader.length() - 8L) {
            return -1;
        }
        reader.seek(offset);
        CRC32 crc = new CRC32();
        FBUtilities.updateChecksumInt(crc, (int)(descriptor.id & 0xFFFFFFFFL));
        FBUtilities.updateChecksumInt(crc, (int)(descriptor.id >>> 32));
        FBUtilities.updateChecksumInt(crc, (int)reader.getPosition());
        int end = reader.readInt();
        long filecrc = (long)reader.readInt() & 0xFFFFFFFFL;
        if (crc.getValue() != filecrc) {
            if (end != 0 || filecrc != 0L) {
                CommitLogReplayer.handleReplayError(false, "Encountered bad header at position %d of commit log %s, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
            }
            return -1;
        }
        if (end < offset || (long)end > reader.length()) {
            CommitLogReplayer.handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC", offset, reader.getPath());
            return -1;
        }
        return end;
    }

    private boolean shouldReplay(UUID cfId, ReplayPosition position) {
        return !this.cfPersisted.get(cfId).contains(position);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void recover(File file, boolean tolerateTruncation) throws IOException {
        CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
        try (ChannelProxy channel = new ChannelProxy(file);
             RandomAccessReader reader = RandomAccessReader.open(channel);){
            int end;
            if (desc.version < 4) {
                if (this.logAndCheckIfShouldSkip(file, desc)) {
                    return;
                }
                if (this.globalPosition.segment == desc.id) {
                    reader.seek(this.globalPosition.position);
                }
                this.replaySyncSection(reader, (int)reader.length(), desc, desc.fileName(), tolerateTruncation);
                return;
            }
            long segmentId = desc.id;
            try {
                desc = CommitLogDescriptor.readHeader(reader);
            }
            catch (IOException e) {
                desc = null;
            }
            if (desc == null) {
                CommitLogReplayer.handleReplayError(false, "Could not read commit log descriptor in file %s", file);
                return;
            }
            if (segmentId != desc.id) {
                CommitLogReplayer.handleReplayError(false, "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file);
            }
            if (this.logAndCheckIfShouldSkip(file, desc)) {
                return;
            }
            ICompressor compressor = null;
            if (desc.compression != null) {
                try {
                    compressor = CompressionParams.createCompressor(desc.compression);
                }
                catch (ConfigurationException e) {
                    CommitLogReplayer.handleReplayError(false, "Unknown compression: %s", e.getMessage());
                    if (reader != null) {
                        if (var7_9 != null) {
                            try {
                                reader.close();
                            }
                            catch (Throwable throwable) {
                                var7_9.addSuppressed(throwable);
                            }
                        } else {
                            reader.close();
                        }
                    }
                    if (channel == null) return;
                    if (var5_5 == null) {
                        channel.close();
                        return;
                    }
                    try {
                        channel.close();
                        return;
                    }
                    catch (Throwable throwable) {
                        var5_5.addSuppressed(throwable);
                        return;
                    }
                }
            }
            assert (reader.length() <= Integer.MAX_VALUE);
            int replayEnd = end = (int)reader.getFilePointer();
            while ((end = this.readSyncMarker(desc, end, reader, tolerateTruncation)) >= 0) {
                int replayPos = replayEnd + 8;
                if (logger.isTraceEnabled()) {
                    logger.trace("Replaying {} between {} and {}", new Object[]{file, reader.getFilePointer(), end});
                }
                if (compressor != null) {
                    int uncompressedLength = reader.readInt();
                    replayEnd = replayPos + uncompressedLength;
                } else {
                    replayEnd = end;
                }
                if (segmentId == this.globalPosition.segment && replayEnd < this.globalPosition.position) continue;
                RebufferingInputStream sectionReader = reader;
                String errorContext = desc.fileName();
                boolean tolerateErrorsInSection = tolerateTruncation;
                if (compressor != null) {
                    tolerateErrorsInSection &= (long)end == reader.length() || end < 0;
                    int start = (int)reader.getFilePointer();
                    try {
                        int compressedLength = end - start;
                        if (logger.isTraceEnabled()) {
                            logger.trace("Decompressing {} between replay positions {} and {}", new Object[]{file, replayPos, replayEnd});
                        }
                        if (compressedLength > this.buffer.length) {
                            this.buffer = new byte[(int)(1.2 * (double)compressedLength)];
                        }
                        reader.readFully(this.buffer, 0, compressedLength);
                        int uncompressedLength = replayEnd - replayPos;
                        if (uncompressedLength > this.uncompressedBuffer.length) {
                            this.uncompressedBuffer = new byte[(int)(1.2 * (double)uncompressedLength)];
                        }
                        compressedLength = compressor.uncompress(this.buffer, 0, compressedLength, this.uncompressedBuffer, 0);
                        sectionReader = new FileSegmentInputStream(ByteBuffer.wrap(this.uncompressedBuffer), reader.getPath(), (long)replayPos);
                        errorContext = "compressed section at " + start + " in " + errorContext;
                    }
                    catch (IOException | ArrayIndexOutOfBoundsException e) {
                        CommitLogReplayer.handleReplayError(tolerateErrorsInSection, "Unexpected exception decompressing section at %d: %s", start, e);
                        continue;
                    }
                }
                if (this.replaySyncSection((FileDataInput)((Object)sectionReader), replayEnd, desc, errorContext, tolerateErrorsInSection)) continue;
            }
            logger.debug("Finished reading {}", (Object)file);
            return;
        }
    }

    public boolean logAndCheckIfShouldSkip(File file, CommitLogDescriptor desc) {
        logger.debug("Replaying {} (CL version {}, messaging version {}, compression {})", new Object[]{file.getPath(), desc.version, desc.getMessagingVersion(), desc.compression});
        if (this.globalPosition.segment > desc.id) {
            logger.trace("skipping replay of fully-flushed {}", (Object)file);
            return true;
        }
        return false;
    }

    private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc, String errorContext, boolean tolerateErrors) throws IOException {
        while (reader.getFilePointer() < (long)end && !reader.isEOF()) {
            long claimedCRC32;
            int serializedSize;
            long mutationStart = reader.getFilePointer();
            if (logger.isTraceEnabled()) {
                logger.trace("Reading mutation at {}", (Object)mutationStart);
            }
            try {
                serializedSize = reader.readInt();
                if (serializedSize == 0) {
                    logger.trace("Encountered end of segment marker at {}", (Object)reader.getFilePointer());
                    return false;
                }
                if (serializedSize < 10) {
                    CommitLogReplayer.handleReplayError(tolerateErrors, "Invalid mutation size %d at %d in %s", serializedSize, mutationStart, errorContext);
                    return false;
                }
                long claimedSizeChecksum = desc.version < 4 ? reader.readLong() : (long)reader.readInt() & 0xFFFFFFFFL;
                this.checksum.reset();
                if (desc.version < 3) {
                    this.checksum.update(serializedSize);
                } else {
                    FBUtilities.updateChecksumInt(this.checksum, serializedSize);
                }
                if (this.checksum.getValue() != claimedSizeChecksum) {
                    CommitLogReplayer.handleReplayError(tolerateErrors, "Mutation size checksum failure at %d in %s", mutationStart, errorContext);
                    return false;
                }
                if (serializedSize > this.buffer.length) {
                    this.buffer = new byte[(int)(1.2 * (double)serializedSize)];
                }
                reader.readFully(this.buffer, 0, serializedSize);
                claimedCRC32 = desc.version < 4 ? reader.readLong() : (long)reader.readInt() & 0xFFFFFFFFL;
            }
            catch (EOFException eof) {
                CommitLogReplayer.handleReplayError(tolerateErrors, "Unexpected end of segment", mutationStart, errorContext);
                return false;
            }
            this.checksum.update(this.buffer, 0, serializedSize);
            if (claimedCRC32 != this.checksum.getValue()) {
                CommitLogReplayer.handleReplayError(tolerateErrors, "Mutation checksum failure at %d in %s", mutationStart, errorContext);
                continue;
            }
            this.replayMutation(this.buffer, serializedSize, (int)reader.getFilePointer(), desc);
        }
        return true;
    }

    void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) throws IOException {
        Mutation mutation;
        try (DataInputBuffer bufIn = new DataInputBuffer(inputBuffer, 0, size);){
            mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
            for (PartitionUpdate upd : mutation.getPartitionUpdates()) {
                upd.validate();
            }
        }
        catch (UnknownColumnFamilyException ex) {
            if (ex.cfId == null) {
                return;
            }
            AtomicInteger i = this.invalidMutations.get(ex.cfId);
            if (i == null) {
                i = new AtomicInteger(1);
                this.invalidMutations.put(ex.cfId, i);
            } else {
                i.incrementAndGet();
            }
            return;
        }
        catch (Throwable t) {
            JVMStabilityInspector.inspectThrowable(t);
            File f = File.createTempFile("mutation", "dat");
            try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f));){
                out.write(inputBuffer, 0, size);
            }
            CommitLogReplayer.handleReplayError(false, "Unexpected error deserializing mutation; saved to %s.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: %s", f.getAbsolutePath(), t);
            return;
        }
        if (logger.isTraceEnabled()) {
            logger.trace("replaying mutation for {}.{}: {}", new Object[]{mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), (String)", ") + "}"});
        }
        WrappedRunnable runnable = new WrappedRunnable(){

            @Override
            public void runMayThrow() {
                if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) {
                    return;
                }
                if (CommitLogReplayer.this.pointInTimeExceeded(mutation)) {
                    return;
                }
                Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
                Mutation newMutation = null;
                for (PartitionUpdate update : CommitLogReplayer.this.replayFilter.filter(mutation)) {
                    if (Schema.instance.getCF(update.metadata().cfId) == null || !CommitLogReplayer.this.shouldReplay(update.metadata().cfId, new ReplayPosition(desc.id, entryLocation))) continue;
                    if (newMutation == null) {
                        newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
                    }
                    newMutation.add(update);
                    CommitLogReplayer.this.replayedCount.incrementAndGet();
                }
                if (newMutation != null) {
                    assert (!newMutation.isEmpty());
                    try {
                        Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
                    }
                    catch (ExecutionException e) {
                        throw Throwables.propagate((Throwable)e.getCause());
                    }
                    CommitLogReplayer.this.keyspacesRecovered.add(keyspace);
                }
            }
        };
        this.futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
        if (this.futures.size() > MAX_OUTSTANDING_REPLAY_COUNT) {
            FBUtilities.waitOnFutures(this.futures);
            this.futures.clear();
        }
    }

    protected boolean pointInTimeExceeded(Mutation fm) {
        long restoreTarget = this.archiver.restorePointInTime;
        for (PartitionUpdate upd : fm.getPartitionUpdates()) {
            if (this.archiver.precision.toMillis(upd.maxTimestamp()) <= restoreTarget) continue;
            return true;
        }
        return false;
    }

    static void handleReplayError(boolean permissible, String message, Object ... messageArgs) throws IOException {
        String msg = String.format(message, messageArgs);
        CommitLogReplayException e = new CommitLogReplayException(msg);
        if (permissible) {
            logger.error("Ignoring commit log replay error likely due to incomplete flush to disk", (Throwable)e);
        } else if (Boolean.getBoolean(IGNORE_REPLAY_ERRORS_PROPERTY)) {
            logger.error("Ignoring commit log replay error", (Throwable)e);
        } else if (!CommitLog.handleCommitError("Failed commit log replay", e)) {
            logger.error("Replay stopped. If you wish to override this error and continue starting the node ignoring commit log replay problems, specify -Dcassandra.commitlog.ignorereplayerrors=true on the command line");
            throw e;
        }
    }

    public static class CommitLogReplayException
    extends IOException {
        public CommitLogReplayException(String message, Throwable cause) {
            super(message, cause);
        }

        public CommitLogReplayException(String message) {
            super(message);
        }
    }

    private static class CustomReplayFilter
    extends ReplayFilter {
        private Multimap<String, String> toReplay;

        public CustomReplayFilter(Multimap<String, String> toReplay) {
            this.toReplay = toReplay;
        }

        @Override
        public Iterable<PartitionUpdate> filter(Mutation mutation) {
            final Collection cfNames = this.toReplay.get((Object)mutation.getKeyspaceName());
            if (cfNames == null) {
                return Collections.emptySet();
            }
            return Iterables.filter(mutation.getPartitionUpdates(), (Predicate)new Predicate<PartitionUpdate>(){

                public boolean apply(PartitionUpdate upd) {
                    return cfNames.contains(upd.metadata().cfName);
                }
            });
        }

        @Override
        public boolean includes(CFMetaData metadata) {
            return this.toReplay.containsEntry((Object)metadata.ksName, (Object)metadata.cfName);
        }
    }

    private static class AlwaysReplayFilter
    extends ReplayFilter {
        private AlwaysReplayFilter() {
        }

        @Override
        public Iterable<PartitionUpdate> filter(Mutation mutation) {
            return mutation.getPartitionUpdates();
        }

        @Override
        public boolean includes(CFMetaData metadata) {
            return true;
        }
    }

    static abstract class ReplayFilter {
        ReplayFilter() {
        }

        public abstract Iterable<PartitionUpdate> filter(Mutation var1);

        public abstract boolean includes(CFMetaData var1);

        public static ReplayFilter create() {
            if (System.getProperty("cassandra.replayList") == null) {
                return new AlwaysReplayFilter();
            }
            HashMultimap toReplay = HashMultimap.create();
            for (String rawPair : System.getProperty("cassandra.replayList").split(",")) {
                String[] pair = rawPair.trim().split("\\.");
                if (pair.length != 2) {
                    throw new IllegalArgumentException("Each table to be replayed must be fully qualified with keyspace name, e.g., 'system.peers'");
                }
                Keyspace ks = Schema.instance.getKeyspaceInstance(pair[0]);
                if (ks == null) {
                    throw new IllegalArgumentException("Unknown keyspace " + pair[0]);
                }
                ColumnFamilyStore cfs = ks.getColumnFamilyStore(pair[1]);
                if (cfs == null) {
                    throw new IllegalArgumentException(String.format("Unknown table %s.%s", pair[0], pair[1]));
                }
                toReplay.put((Object)pair[0], (Object)pair[1]);
            }
            return new CustomReplayFilter((Multimap<String, String>)toReplay);
        }
    }
}

