/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.commitlog;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Runnables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.commitlog.SimpleCachedBufferPool;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractCommitLogSegmentManager {
    static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
    private final BlockingQueue<Runnable> segmentManagementTasks = new LinkedBlockingQueue<Runnable>();
    private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue();
    protected volatile CommitLogSegment allocatingFrom = null;
    private final WaitQueue hasAvailableSegments = new WaitQueue();
    final String storageDirectory;
    private final AtomicLong size = new AtomicLong();
    volatile boolean createReserveSegments = false;
    private Thread managerThread;
    protected volatile boolean run = true;
    protected final CommitLog commitLog;
    private static final SimpleCachedBufferPool bufferPool = new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize());

    AbstractCommitLogSegmentManager(CommitLog commitLog, String storageDirectory) {
        this.commitLog = commitLog;
        this.storageDirectory = storageDirectory;
    }

    void start() {
        WrappedRunnable runnable = new WrappedRunnable(){

            @Override
            public void runMayThrow() throws Exception {
                while (AbstractCommitLogSegmentManager.this.run) {
                    try {
                        Runnable task = (Runnable)AbstractCommitLogSegmentManager.this.segmentManagementTasks.poll();
                        if (task == null) {
                            long unused;
                            if (!this.atSegmentLimit() && AbstractCommitLogSegmentManager.this.availableSegments.isEmpty() && (AbstractCommitLogSegmentManager.this.activeSegments.isEmpty() || AbstractCommitLogSegmentManager.this.createReserveSegments)) {
                                logger.trace("No segments in reserve; creating a fresh one");
                                AbstractCommitLogSegmentManager.this.availableSegments.add(AbstractCommitLogSegmentManager.this.createSegment());
                                AbstractCommitLogSegmentManager.this.hasAvailableSegments.signalAll();
                            }
                            if ((unused = AbstractCommitLogSegmentManager.this.unusedCapacity()) < 0L) {
                                CommitLogSegment segment;
                                ArrayList<CommitLogSegment> segmentsToRecycle = new ArrayList<CommitLogSegment>();
                                long spaceToReclaim = 0L;
                                Iterator iterator = AbstractCommitLogSegmentManager.this.activeSegments.iterator();
                                while (iterator.hasNext() && (segment = (CommitLogSegment)iterator.next()) != AbstractCommitLogSegmentManager.this.allocatingFrom) {
                                    segmentsToRecycle.add(segment);
                                    if ((spaceToReclaim += (long)DatabaseDescriptor.getCommitLogSegmentSize()) + unused < 0L) continue;
                                    break;
                                }
                                AbstractCommitLogSegmentManager.this.flushDataFrom(segmentsToRecycle, false);
                            }
                            try {
                                task = (Runnable)AbstractCommitLogSegmentManager.this.segmentManagementTasks.take();
                            }
                            catch (InterruptedException e) {
                                throw new AssertionError();
                            }
                        }
                        task.run();
                    }
                    catch (Throwable t) {
                        JVMStabilityInspector.inspectThrowable(t);
                        if (!CommitLog.handleCommitError("Failed managing commit log segments", t)) {
                            return;
                        }
                        Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
                    }
                }
            }

            private boolean atSegmentLimit() {
                return CommitLogSegment.usesBufferPool(AbstractCommitLogSegmentManager.this.commitLog) && bufferPool.atLimit();
            }
        };
        this.run = true;
        this.managerThread = new Thread((Runnable)runnable, "COMMIT-LOG-ALLOCATOR");
        this.managerThread.start();
    }

    public abstract void shutdown();

    public abstract CommitLogSegment.Allocation allocate(Mutation var1, int var2);

    abstract void handleReplayedSegment(File var1);

    abstract CommitLogSegment createSegment();

    abstract void discard(CommitLogSegment var1, boolean var2);

    CommitLogSegment allocatingFrom() {
        CommitLogSegment r = this.allocatingFrom;
        if (r == null) {
            this.advanceAllocatingFrom(null);
            r = this.allocatingFrom;
        }
        return r;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void advanceAllocatingFrom(CommitLogSegment old) {
        while (true) {
            CommitLogSegment next;
            AbstractCommitLogSegmentManager abstractCommitLogSegmentManager = this;
            synchronized (abstractCommitLogSegmentManager) {
                if (this.allocatingFrom != old) {
                    return;
                }
                next = this.availableSegments.poll();
                if (next != null) {
                    this.allocatingFrom = next;
                    this.activeSegments.add(next);
                }
            }
            if (next != null) {
                if (old != null) {
                    this.commitLog.archiver.maybeArchive(old);
                    old.discardUnusedTail();
                }
                this.commitLog.requestExtraSync();
                return;
            }
            WaitQueue.Signal signal = this.hasAvailableSegments.register(this.commitLog.metrics.waitingOnSegmentAllocation.time());
            this.wakeManager();
            if (!this.availableSegments.isEmpty() || this.allocatingFrom != old) {
                signal.cancel();
                if (this.allocatingFrom == old) continue;
                return;
            }
            signal.awaitUninterruptibly();
        }
    }

    protected void wakeManager() {
        this.segmentManagementTasks.add(Runnables.doNothing());
    }

    void forceRecycleAll(Iterable<UUID> droppedCfs) {
        ArrayList<CommitLogSegment> segmentsToRecycle = new ArrayList<CommitLogSegment>(this.activeSegments);
        CommitLogSegment last = (CommitLogSegment)segmentsToRecycle.get(segmentsToRecycle.size() - 1);
        this.advanceAllocatingFrom(last);
        last.waitForModifications();
        Keyspace.writeOrder.awaitNewBarrier();
        Future<?> future = this.flushDataFrom(segmentsToRecycle, true);
        try {
            future.get();
            for (CommitLogSegment segment : this.activeSegments) {
                for (UUID cfId : droppedCfs) {
                    segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
                }
            }
            for (CommitLogSegment segment : this.activeSegments) {
                if (!segment.isUnused()) continue;
                this.recycleSegment(segment);
            }
            CommitLogSegment first = this.activeSegments.peek();
            if (first != null && first.id <= last.id) {
                logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
            }
        }
        catch (Throwable t) {
            logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
        }
    }

    void recycleSegment(CommitLogSegment segment) {
        boolean archiveSuccess = this.commitLog.archiver.maybeWaitForArchiving(segment.getName());
        if (this.activeSegments.remove(segment)) {
            this.discardSegment(segment, archiveSuccess);
        } else {
            logger.warn("segment {} not found in activeSegments queue", (Object)segment);
        }
    }

    private void discardSegment(CommitLogSegment segment, boolean deleteFile) {
        logger.trace("Segment {} is no longer active and will be deleted {}", (Object)segment, (Object)(deleteFile ? "now" : "by the archive script"));
        this.segmentManagementTasks.add(() -> this.discard(segment, deleteFile));
    }

    void addSize(long addedSize) {
        this.size.addAndGet(addedSize);
    }

    public long onDiskSize() {
        return this.size.get();
    }

    private long unusedCapacity() {
        long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024L * 1024L;
        long currentSize = this.size.get();
        logger.trace("Total active commitlog segment space used is {} out of {}", (Object)currentSize, (Object)total);
        return total - currentSize;
    }

    public boolean manages(String name) {
        for (CommitLogSegment segment : Iterables.concat(this.activeSegments, this.availableSegments)) {
            if (!segment.getName().equals(name)) continue;
            return true;
        }
        return false;
    }

    void enableReserveSegmentCreation() {
        this.createReserveSegments = true;
        this.wakeManager();
    }

    private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force) {
        if (segments.isEmpty()) {
            return Futures.immediateFuture(null);
        }
        CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition();
        LinkedHashMap<UUID, ListenableFuture<CommitLogPosition>> flushes = new LinkedHashMap<UUID, ListenableFuture<CommitLogPosition>>();
        for (CommitLogSegment segment : segments) {
            for (UUID dirtyCFId : segment.getDirtyCFIDs()) {
                Pair<String, String> pair = Schema.instance.getCF(dirtyCFId);
                if (pair == null) {
                    logger.trace("Marking clean CF {} that doesn't exist anymore", (Object)dirtyCFId);
                    segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
                    continue;
                }
                if (flushes.containsKey(dirtyCFId)) continue;
                String keyspace = (String)pair.left;
                ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
                flushes.put(dirtyCFId, (ListenableFuture<CommitLogPosition>)(force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition)));
            }
        }
        return Futures.allAsList(flushes.values());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopUnsafe(boolean deleteSegments) {
        logger.trace("CLSM closing and clearing existing commit log segments...");
        this.createReserveSegments = false;
        this.awaitManagementTasksCompletion();
        this.shutdown();
        try {
            this.awaitTermination();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        AbstractCommitLogSegmentManager abstractCommitLogSegmentManager = this;
        synchronized (abstractCommitLogSegmentManager) {
            for (CommitLogSegment segment : this.activeSegments) {
                this.closeAndDeleteSegmentUnsafe(segment, deleteSegments);
            }
            this.activeSegments.clear();
            for (CommitLogSegment segment : this.availableSegments) {
                this.closeAndDeleteSegmentUnsafe(segment, deleteSegments);
            }
            this.availableSegments.clear();
        }
        this.allocatingFrom = null;
        this.segmentManagementTasks.clear();
        this.size.set(0L);
        logger.trace("CLSM done with closing and clearing existing commit log segments.");
    }

    void awaitManagementTasksCompletion() {
        while (!this.segmentManagementTasks.isEmpty()) {
            Thread.yield();
        }
        Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
    }

    private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete) {
        try {
            this.discard(segment, delete);
        }
        catch (AssertionError assertionError) {
            // empty catch block
        }
    }

    public void awaitTermination() throws InterruptedException {
        this.managerThread.join();
        for (CommitLogSegment segment : this.activeSegments) {
            segment.close();
        }
        for (CommitLogSegment segment : this.availableSegments) {
            segment.close();
        }
        bufferPool.shutdown();
    }

    @VisibleForTesting
    public Collection<CommitLogSegment> getActiveSegments() {
        return Collections.unmodifiableCollection(this.activeSegments);
    }

    CommitLogPosition getCurrentPosition() {
        return this.allocatingFrom().getCurrentCommitLogPosition();
    }

    public void sync(boolean syncAllSegments) throws IOException {
        CommitLogSegment current = this.allocatingFrom();
        for (CommitLogSegment segment : this.getActiveSegments()) {
            if (!syncAllSegments && segment.id > current.id) {
                return;
            }
            segment.sync();
        }
    }

    SimpleCachedBufferPool getBufferPool() {
        return bufferPool;
    }
}

