/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.commitlog;

import com.google.common.collect.Ordering;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Checksum;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.PureJavaCrc32;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang.StringUtils;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitLogReplayer {
    private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
    private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
    private final Set<Table> tablesRecovered = new NonBlockingHashSet();
    private final List<Future<?>> futures = new ArrayList();
    private final Map<Integer, AtomicInteger> invalidMutations;
    private final AtomicInteger replayedCount;
    private final Map<Integer, ReplayPosition> cfPositions;
    private final ReplayPosition globalPosition;
    private final Checksum checksum;
    private byte[] buffer = new byte[4096];

    public CommitLogReplayer() {
        this.invalidMutations = new HashMap<Integer, AtomicInteger>();
        this.replayedCount = new AtomicInteger();
        this.cfPositions = new HashMap<Integer, ReplayPosition>();
        for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) {
            ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables());
            this.cfPositions.put(cfs.metadata.cfId, rp);
        }
        this.globalPosition = (ReplayPosition)Ordering.from(ReplayPosition.comparator).min(this.cfPositions.values());
        this.checksum = new PureJavaCrc32();
    }

    public void recover(File[] clogs) throws IOException {
        for (File file : clogs) {
            this.recover(file);
        }
    }

    public int blockForWrites() throws IOException {
        for (Map.Entry<Integer, AtomicInteger> entry : this.invalidMutations.entrySet()) {
            logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %d", entry.getValue().intValue(), entry.getKey()));
        }
        FBUtilities.waitOnFutures(this.futures);
        logger.debug("Finished waiting on mutations from recovery");
        this.futures.clear();
        for (Table table : this.tablesRecovered) {
            this.futures.addAll(table.flush());
        }
        FBUtilities.waitOnFutures(this.futures);
        return this.replayedCount.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void recover(File file) throws IOException {
        logger.info("Replaying " + file.getPath());
        final long segment = CommitLogSegment.idFromFilename(file.getName());
        RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true);
        try {
            assert (reader.length() <= Integer.MAX_VALUE);
            int replayPosition = this.globalPosition.segment < segment ? 0 : (this.globalPosition.segment == segment ? this.globalPosition.position : (int)reader.length());
            if (replayPosition < 0 || (long)replayPosition >= reader.length()) {
                logger.debug("skipping replay of fully-flushed {}", (Object)file);
                return;
            }
            reader.seek(replayPosition);
            if (logger.isDebugEnabled()) {
                logger.debug("Replaying " + file + " starting at " + reader.getFilePointer());
            }
            while (!reader.isEOF()) {
                RowMutation rm;
                long claimedCRC32;
                int serializedSize;
                if (logger.isDebugEnabled()) {
                    logger.debug("Reading mutation at " + reader.getFilePointer());
                }
                try {
                    serializedSize = reader.readInt();
                    if (serializedSize == 0) {
                        logger.debug("Encountered end of segment marker at " + reader.getFilePointer());
                        break;
                    }
                    if (serializedSize < 10) {
                        break;
                    }
                    long claimedSizeChecksum = reader.readLong();
                    this.checksum.reset();
                    this.checksum.update(serializedSize);
                    if (this.checksum.getValue() != claimedSizeChecksum) {
                        break;
                    }
                    if (serializedSize > this.buffer.length) {
                        this.buffer = new byte[(int)(1.2 * (double)serializedSize)];
                    }
                    reader.readFully(this.buffer, 0, serializedSize);
                    claimedCRC32 = reader.readLong();
                }
                catch (EOFException eof) {
                    break;
                }
                this.checksum.update(this.buffer, 0, serializedSize);
                if (claimedCRC32 != this.checksum.getValue()) continue;
                FastByteArrayInputStream bufIn = new FastByteArrayInputStream(this.buffer, 0, serializedSize);
                try {
                    rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), 4, IColumnSerializer.Flag.LOCAL);
                }
                catch (UnknownColumnFamilyException ex) {
                    AtomicInteger i = this.invalidMutations.get(ex.cfId);
                    if (i == null) {
                        i = new AtomicInteger(1);
                        this.invalidMutations.put(ex.cfId, i);
                        continue;
                    }
                    i.incrementAndGet();
                    continue;
                }
                if (logger.isDebugEnabled()) {
                    logger.debug(String.format("replaying mutation for %s.%s: %s", rm.getTable(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), (String)", ") + "}"));
                }
                final long entryLocation = reader.getFilePointer();
                final RowMutation frm = rm;
                WrappedRunnable runnable = new WrappedRunnable(){

                    @Override
                    public void runMayThrow() throws IOException {
                        if (Schema.instance.getKSMetaData(frm.getTable()) == null) {
                            return;
                        }
                        if (CommitLogReplayer.this.pointInTimeExceeded(frm)) {
                            return;
                        }
                        Table table = Table.open(frm.getTable());
                        RowMutation newRm = new RowMutation(frm.getTable(), frm.key());
                        for (ColumnFamily columnFamily : frm.getColumnFamilies()) {
                            if (Schema.instance.getCF(columnFamily.id()) == null) continue;
                            ReplayPosition rp = (ReplayPosition)CommitLogReplayer.this.cfPositions.get(columnFamily.id());
                            if (segment <= rp.segment && (segment != rp.segment || entryLocation <= (long)rp.position)) continue;
                            newRm.add(columnFamily);
                            CommitLogReplayer.this.replayedCount.incrementAndGet();
                        }
                        if (!newRm.isEmpty()) {
                            Table.open(newRm.getTable()).apply(newRm, false);
                            CommitLogReplayer.this.tablesRecovered.add(table);
                        }
                    }
                };
                this.futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
                if (this.futures.size() <= 1024) continue;
                FBUtilities.waitOnFutures(this.futures);
                this.futures.clear();
            }
        }
        finally {
            FileUtils.closeQuietly(reader);
            logger.info("Finished reading " + file);
        }
    }

    protected boolean pointInTimeExceeded(RowMutation frm) {
        long restoreTarget = CommitLog.instance.archiver.restorePointInTime;
        for (ColumnFamily families : frm.getColumnFamilies()) {
            if (families.maxTimestamp() <= restoreTarget) continue;
            return true;
        }
        return false;
    }
}

