/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.commitlog;

import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitLogSegmentManager {
    static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManager.class);
    private final BlockingQueue<Callable<CommitLogSegment>> segmentManagementTasks = new LinkedBlockingQueue<Callable<CommitLogSegment>>();
    private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue();
    private volatile CommitLogSegment allocatingFrom = null;
    private final WaitQueue hasAvailableSegments = new WaitQueue();
    private static final AtomicReferenceFieldUpdater<CommitLogSegmentManager, CommitLogSegment> allocatingFromUpdater = AtomicReferenceFieldUpdater.newUpdater(CommitLogSegmentManager.class, CommitLogSegment.class, "allocatingFrom");
    private final AtomicLong size = new AtomicLong();
    private volatile boolean createReserveSegments = false;
    private final Thread managerThread;
    private volatile boolean run = true;

    public CommitLogSegmentManager() {
        WrappedRunnable runnable = new WrappedRunnable(){

            @Override
            public void runMayThrow() throws Exception {
                while (CommitLogSegmentManager.this.run) {
                    CommitLogSegment recycled;
                    Callable task = (Callable)CommitLogSegmentManager.this.segmentManagementTasks.poll();
                    if (task == null) {
                        long unused;
                        if (CommitLogSegmentManager.this.availableSegments.isEmpty() && (CommitLogSegmentManager.this.activeSegments.isEmpty() || CommitLogSegmentManager.this.createReserveSegments)) {
                            logger.debug("No segments in reserve; creating a fresh one");
                            CommitLogSegmentManager.this.size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
                            CommitLogSegmentManager.this.availableSegments.add(CommitLogSegment.freshSegment());
                            CommitLogSegmentManager.this.hasAvailableSegments.signalAll();
                        }
                        if ((unused = CommitLogSegmentManager.this.unusedCapacity()) < 0L) {
                            CommitLogSegment segment;
                            ArrayList<CommitLogSegment> segmentsToRecycle = new ArrayList<CommitLogSegment>();
                            long spaceToReclaim = 0L;
                            Iterator i$ = CommitLogSegmentManager.this.activeSegments.iterator();
                            while (i$.hasNext() && (segment = (CommitLogSegment)i$.next()) != CommitLogSegmentManager.this.allocatingFrom) {
                                segmentsToRecycle.add(segment);
                                if ((spaceToReclaim += (long)DatabaseDescriptor.getCommitLogSegmentSize()) + unused < 0L) continue;
                                break;
                            }
                            CommitLogSegmentManager.this.flushDataFrom(segmentsToRecycle, false);
                        }
                        try {
                            task = (Callable)CommitLogSegmentManager.this.segmentManagementTasks.take();
                        }
                        catch (InterruptedException e) {
                            continue;
                        }
                    }
                    if ((recycled = (CommitLogSegment)task.call()) == null) continue;
                    CommitLogSegmentManager.this.availableSegments.add(recycled);
                    CommitLogSegmentManager.this.hasAvailableSegments.signalAll();
                }
            }
        };
        this.managerThread = new Thread((Runnable)runnable, "COMMIT-LOG-ALLOCATOR");
        this.managerThread.start();
    }

    public CommitLogSegment.Allocation allocate(Mutation mutation, int size, CommitLogSegment.Allocation alloc) {
        CommitLogSegment segment = this.allocatingFrom();
        while (!segment.allocate(mutation, size, alloc)) {
            this.advanceAllocatingFrom(segment);
            segment = this.allocatingFrom;
        }
        return alloc;
    }

    CommitLogSegment allocatingFrom() {
        CommitLogSegment r = this.allocatingFrom;
        if (r == null) {
            this.advanceAllocatingFrom(null);
            r = this.allocatingFrom;
        }
        return r;
    }

    private void advanceAllocatingFrom(CommitLogSegment old) {
        while (true) {
            Iterator<CommitLogSegment> iter;
            if ((iter = this.availableSegments.iterator()).hasNext()) {
                CommitLogSegment next = iter.next();
                if (!allocatingFromUpdater.compareAndSet(this, old, next)) {
                    return;
                }
                iter.remove();
                this.activeSegments.add(next);
                if (this.availableSegments.isEmpty()) {
                    this.wakeManager();
                }
                if (old != null) {
                    CommitLog.instance.archiver.maybeArchive(old.getPath(), old.getName());
                }
                if (old != null) {
                    old.discardUnusedTail();
                }
                CommitLog.instance.requestExtraSync();
                return;
            }
            WaitQueue.Signal signal = this.hasAvailableSegments.register(CommitLog.instance.metrics.waitingOnSegmentAllocation.time());
            this.wakeManager();
            if (!this.availableSegments.isEmpty() || this.allocatingFrom != old) {
                signal.cancel();
                if (this.allocatingFrom == old) continue;
                return;
            }
            signal.awaitUninterruptibly();
        }
    }

    private void wakeManager() {
        this.segmentManagementTasks.add(new Callable<CommitLogSegment>(){

            @Override
            public CommitLogSegment call() {
                return null;
            }
        });
    }

    void forceRecycleAll() {
        ArrayList<CommitLogSegment> segmentsToRecycle = new ArrayList<CommitLogSegment>(this.activeSegments);
        CommitLogSegment last = (CommitLogSegment)segmentsToRecycle.get(segmentsToRecycle.size() - 1);
        this.advanceAllocatingFrom(last);
        Future<?> future = this.flushDataFrom(segmentsToRecycle, true);
        try {
            CommitLogSegment first;
            future.get();
            for (CommitLogSegment segment : this.activeSegments) {
                if (!segment.isUnused()) continue;
                this.recycleSegment(segment);
            }
            assert ((first = this.activeSegments.peek()) == null || first.id > last.id);
        }
        catch (Throwable t) {
            logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
        }
    }

    void recycleSegment(final CommitLogSegment segment) {
        this.activeSegments.remove(segment);
        if (!CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName())) {
            this.discardSegment(segment, false);
            return;
        }
        if (this.isCapExceeded()) {
            this.discardSegment(segment, true);
            return;
        }
        logger.debug("Recycling {}", (Object)segment);
        this.segmentManagementTasks.add(new Callable<CommitLogSegment>(){

            @Override
            public CommitLogSegment call() {
                return segment.recycle();
            }
        });
    }

    void recycleSegment(final File file) {
        if (this.isCapExceeded() || CommitLogDescriptor.fromFileName(file.getName()).getMessagingVersion() != 8) {
            logger.debug("(Unopened) segment {} is no longer needed and will be deleted now", (Object)file);
            FileUtils.deleteWithConfirm(file);
            return;
        }
        logger.debug("Recycling {}", (Object)file);
        this.size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
        this.segmentManagementTasks.add(new Callable<CommitLogSegment>(){

            @Override
            public CommitLogSegment call() {
                return new CommitLogSegment(file.getPath());
            }
        });
    }

    private void discardSegment(final CommitLogSegment segment, final boolean deleteFile) {
        logger.debug("Segment {} is no longer active and will be deleted {}", (Object)segment, (Object)(deleteFile ? "now" : "by the archive script"));
        this.size.addAndGet(-DatabaseDescriptor.getCommitLogSegmentSize());
        this.segmentManagementTasks.add(new Callable<CommitLogSegment>(){

            @Override
            public CommitLogSegment call() {
                segment.close();
                if (deleteFile) {
                    segment.delete();
                }
                return null;
            }
        });
    }

    public long bytesUsed() {
        return this.size.get();
    }

    public boolean manages(String name) {
        for (CommitLogSegment segment : Iterables.concat(this.activeSegments, this.availableSegments)) {
            if (!segment.getName().equals(name)) continue;
            return true;
        }
        return false;
    }

    private boolean isCapExceeded() {
        return this.unusedCapacity() < 0L;
    }

    private long unusedCapacity() {
        long currentSize = this.size.get();
        logger.debug("Total active commitlog segment space used is {}", (Object)currentSize);
        return DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024L * 1024L - currentSize;
    }

    public void enableReserveSegmentCreation() {
        this.createReserveSegments = true;
        this.wakeManager();
    }

    private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force) {
        if (segments.isEmpty()) {
            return Futures.immediateFuture(null);
        }
        ReplayPosition maxReplayPosition = segments.get(segments.size() - 1).getContext();
        LinkedHashMap flushes = new LinkedHashMap();
        for (CommitLogSegment segment : segments) {
            for (UUID dirtyCFId : segment.getDirtyCFIDs()) {
                Pair<String, String> pair = Schema.instance.getCF(dirtyCFId);
                if (pair == null) {
                    logger.debug("Marking clean CF {} that doesn't exist anymore", (Object)dirtyCFId);
                    segment.markClean(dirtyCFId, segment.getContext());
                    continue;
                }
                if (flushes.containsKey(dirtyCFId)) continue;
                String keyspace = (String)pair.left;
                ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
                flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxReplayPosition));
            }
        }
        return Futures.allAsList(flushes.values());
    }

    public void resetUnsafe() {
        logger.debug("Closing and clearing existing commit log segments...");
        while (!this.segmentManagementTasks.isEmpty()) {
            Thread.yield();
        }
        this.activeSegments.clear();
        this.availableSegments.clear();
        this.allocatingFrom = null;
    }

    public void shutdown() {
        this.run = false;
        this.managerThread.interrupt();
    }

    public void awaitTermination() throws InterruptedException {
        this.managerThread.join();
    }

    Collection<CommitLogSegment> getActiveSegments() {
        return Collections.unmodifiableCollection(this.activeSegments);
    }
}

