/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.hints;

import com.google.common.util.concurrent.RateLimiter;
import java.io.File;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.hints.HintsDescriptor;
import org.apache.cassandra.hints.HintsDispatcher;
import org.apache.cassandra.hints.HintsStore;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class HintsDispatchExecutor {
    private static final Logger logger = LoggerFactory.getLogger(HintsDispatchExecutor.class);
    private final File hintsDirectory;
    private final ExecutorService executor;
    private final AtomicBoolean isPaused;
    private final Map<UUID, Future> scheduledDispatches;

    HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused) {
        this.hintsDirectory = hintsDirectory;
        this.isPaused = isPaused;
        this.scheduledDispatches = new ConcurrentHashMap<UUID, Future>();
        this.executor = new JMXEnabledThreadPoolExecutor(1, maxThreads, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("HintsDispatcher", 1), "internal");
    }

    void shutdownBlocking() {
        this.scheduledDispatches.clear();
        this.executor.shutdownNow();
    }

    boolean isScheduled(HintsStore store) {
        return this.scheduledDispatches.containsKey(store.hostId);
    }

    Future dispatch(HintsStore store) {
        return this.dispatch(store, store.hostId);
    }

    Future dispatch(HintsStore store, UUID hostId) {
        return this.scheduledDispatches.computeIfAbsent(store.hostId, uuid -> this.executor.submit(new DispatchHintsTask(store, hostId)));
    }

    void completeDispatchBlockingly(HintsStore store) {
        Future future = this.scheduledDispatches.get(store.hostId);
        try {
            if (future != null) {
                future.get();
            }
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private final class DispatchHintsTask
    implements Runnable {
        private final HintsStore store;
        private final UUID hostId;
        private final RateLimiter rateLimiter;

        DispatchHintsTask(HintsStore store, UUID hostId) {
            this.store = store;
            this.hostId = hostId;
            int nodesCount = Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
            int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB() / nodesCount;
            this.rateLimiter = RateLimiter.create((double)(throttleInKB == 0 ? Double.MAX_VALUE : (double)(throttleInKB * 1024)));
        }

        @Override
        public void run() {
            try {
                this.dispatch();
            }
            finally {
                HintsDispatchExecutor.this.scheduledDispatches.remove(this.hostId);
            }
        }

        private void dispatch() {
            HintsDescriptor descriptor;
            while (!HintsDispatchExecutor.this.isPaused.get() && (descriptor = this.store.poll()) != null) {
                try {
                    this.dispatch(descriptor);
                }
                catch (FSReadError e) {
                    logger.error("Failed to dispatch hints file {}: file is corrupted ({})", (Object)descriptor.fileName(), (Object)e);
                    this.store.cleanUp(descriptor);
                    this.store.blacklist(descriptor);
                    throw e;
                }
            }
        }

        private void dispatch(HintsDescriptor descriptor) {
            logger.debug("Dispatching hints file {}", (Object)descriptor.fileName());
            File file = new File(HintsDispatchExecutor.this.hintsDirectory, descriptor.fileName());
            Long offset = this.store.getDispatchOffset(descriptor).orElse(null);
            try (HintsDispatcher dispatcher = HintsDispatcher.create(file, this.rateLimiter, this.hostId, HintsDispatchExecutor.this.isPaused);){
                if (offset != null) {
                    dispatcher.seek(offset);
                }
                if (dispatcher.dispatch()) {
                    if (!file.delete()) {
                        logger.error("Failed to delete hints file {}", (Object)descriptor.fileName());
                    }
                    this.store.cleanUp(descriptor);
                    logger.info("Finished hinted handoff of file {} to endpoint {}", (Object)descriptor.fileName(), (Object)this.hostId);
                } else {
                    this.store.markDispatchOffset(descriptor, dispatcher.dispatchOffset());
                    this.store.offerFirst(descriptor);
                    logger.info("Finished hinted handoff of file {} to endpoint {}, partially", (Object)descriptor.fileName(), (Object)this.hostId);
                }
            }
        }
    }
}

