/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import java.io.File;
import java.io.IOError;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.streaming.StreamLockfile;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamTask;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamReceiveTask
extends StreamTask {
    private static final ThreadPoolExecutor executor = DebuggableThreadPoolExecutor.createWithMaximumPoolSize("StreamReceiveTask", FBUtilities.getAvailableProcessors(), 60, TimeUnit.SECONDS);
    private static final Logger logger = LoggerFactory.getLogger(StreamReceiveTask.class);
    private final int totalFiles;
    private final long totalSize;
    private boolean done = false;
    protected Collection<SSTableWriter> sstables;

    public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize) {
        super(session, cfId);
        this.totalFiles = totalFiles;
        this.totalSize = totalSize;
        this.sstables = new ArrayList<SSTableWriter>(totalFiles);
    }

    public synchronized void received(SSTableWriter sstable) {
        if (this.done) {
            return;
        }
        assert (this.cfId.equals(sstable.metadata.cfId));
        this.sstables.add(sstable);
        if (this.sstables.size() == this.totalFiles) {
            this.done = true;
            executor.submit(new OnCompletionRunnable(this));
        }
    }

    @Override
    public int getTotalNumberOfFiles() {
        return this.totalFiles;
    }

    @Override
    public long getTotalSize() {
        return this.totalSize;
    }

    @Override
    public synchronized void abort() {
        if (this.done) {
            return;
        }
        this.done = true;
        for (SSTableWriter writer : this.sstables) {
            writer.abort();
        }
        this.sstables.clear();
    }

    private static class OnCompletionRunnable
    implements Runnable {
        private final StreamReceiveTask task;

        public OnCompletionRunnable(StreamReceiveTask task) {
            this.task = task;
        }

        @Override
        public void run() {
            try {
                Pair<String, String> kscf = Schema.instance.getCF(this.task.cfId);
                if (kscf == null) {
                    for (SSTableWriter writer : this.task.sstables) {
                        writer.abort();
                    }
                    this.task.sstables.clear();
                    this.task.session.taskCompleted(this.task);
                    return;
                }
                ColumnFamilyStore cfs = Keyspace.open((String)kscf.left).getColumnFamilyStore((String)kscf.right);
                File lockfiledir = cfs.directories.getWriteableLocationAsFile((long)this.task.sstables.size() * 256L);
                if (lockfiledir == null) {
                    throw new IOError(new IOException("All disks full"));
                }
                StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
                lockfile.create(this.task.sstables);
                ArrayList<SSTableReader> readers = new ArrayList<SSTableReader>();
                for (SSTableWriter writer : this.task.sstables) {
                    readers.add(writer.closeAndOpenReader());
                }
                lockfile.delete();
                this.task.sstables.clear();
                try (Refs refs = Refs.ref(readers);){
                    cfs.addSSTables(readers);
                    cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
                    if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter()) {
                        int invalidatedKeys;
                        ArrayList boundsToInvalidate = new ArrayList(readers.size());
                        for (SSTableReader sstable : readers) {
                            boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
                        }
                        Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
                        if (cfs.isRowCacheEnabled() && (invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds)) > 0) {
                            logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream receive task completed.", new Object[]{this.task.session.planId(), invalidatedKeys, cfs.keyspace.getName(), cfs.getColumnFamilyName()});
                        }
                        if (cfs.metadata.isCounter() && (invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds)) > 0) {
                            logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream receive task completed.", new Object[]{this.task.session.planId(), invalidatedKeys, cfs.keyspace.getName(), cfs.getColumnFamilyName()});
                        }
                    }
                }
                this.task.session.taskCompleted(this.task);
            }
            catch (Throwable t) {
                logger.error("Error applying streamed data: ", t);
                JVMStabilityInspector.inspectThrowable(t);
                this.task.session.onError(t);
            }
        }
    }
}

