/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.view.View;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamTask;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamReceiveTask
extends StreamTask {
    private static final Logger logger = LoggerFactory.getLogger(StreamReceiveTask.class);
    private static final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("StreamReceiveTask"));
    private final int totalFiles;
    private final long totalSize;
    public final LifecycleTransaction txn;
    private boolean done = false;
    protected Collection<SSTableMultiWriter> sstables;

    public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize) {
        super(session, cfId);
        this.totalFiles = totalFiles;
        this.totalSize = totalSize;
        this.txn = LifecycleTransaction.offline(OperationType.STREAM);
        this.sstables = new ArrayList<SSTableMultiWriter>(totalFiles);
    }

    public synchronized void received(SSTableMultiWriter sstable) {
        if (this.done) {
            return;
        }
        assert (this.cfId.equals(sstable.getCfId()));
        this.sstables.add(sstable);
        if (this.sstables.size() == this.totalFiles) {
            this.done = true;
            executor.submit(new OnCompletionRunnable(this));
        }
    }

    @Override
    public int getTotalNumberOfFiles() {
        return this.totalFiles;
    }

    @Override
    public long getTotalSize() {
        return this.totalSize;
    }

    @Override
    public synchronized void abort() {
        if (this.done) {
            return;
        }
        this.done = true;
        this.sstables.forEach(SSTableMultiWriter::abortOrDie);
        this.txn.abort();
        this.sstables.clear();
    }

    private static class OnCompletionRunnable
    implements Runnable {
        private final StreamReceiveTask task;

        public OnCompletionRunnable(StreamReceiveTask task) {
            this.task = task;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            boolean hasViews = false;
            ColumnFamilyStore cfs = null;
            try {
                block47: {
                    Pair<String, String> kscf = Schema.instance.getCF(this.task.cfId);
                    if (kscf == null) {
                        this.task.sstables.forEach(SSTableMultiWriter::abortOrDie);
                        this.task.sstables.clear();
                        this.task.txn.abort();
                        this.task.session.taskCompleted(this.task);
                        return;
                    }
                    cfs = Keyspace.open((String)kscf.left).getColumnFamilyStore((String)kscf.right);
                    hasViews = !Iterables.isEmpty(View.findAll((String)kscf.left, (String)kscf.right));
                    ArrayList<SSTableReader> readers = new ArrayList<SSTableReader>();
                    for (SSTableMultiWriter writer : this.task.sstables) {
                        Collection<SSTableReader> newReaders = writer.finish(true);
                        readers.addAll(newReaders);
                        this.task.txn.update(newReaders, false);
                    }
                    this.task.sstables.clear();
                    try (Refs refs = Refs.ref(readers);){
                        int invalidatedKeys;
                        if (hasViews) {
                            for (SSTableReader reader : readers) {
                                ISSTableScanner scanner = reader.getScanner();
                                Throwable throwable = null;
                                try {
                                    while (scanner.hasNext()) {
                                        UnfilteredRowIterator rowIterator;
                                        block46: {
                                            rowIterator = (UnfilteredRowIterator)scanner.next();
                                            Throwable throwable2 = null;
                                            try {
                                                new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
                                                if (rowIterator == null) continue;
                                                if (throwable2 == null) break block46;
                                            }
                                            catch (Throwable throwable3) {
                                                try {
                                                    throwable2 = throwable3;
                                                    throw throwable3;
                                                }
                                                catch (Throwable throwable4) {
                                                    if (rowIterator == null) throw throwable4;
                                                    if (throwable2 != null) {
                                                        try {
                                                            rowIterator.close();
                                                            throw throwable4;
                                                        }
                                                        catch (Throwable throwable5) {
                                                            throwable2.addSuppressed(throwable5);
                                                            throw throwable4;
                                                        }
                                                    }
                                                    rowIterator.close();
                                                    throw throwable4;
                                                }
                                            }
                                            try {
                                                rowIterator.close();
                                                continue;
                                            }
                                            catch (Throwable throwable6) {
                                                throwable2.addSuppressed(throwable6);
                                                continue;
                                            }
                                        }
                                        rowIterator.close();
                                    }
                                }
                                catch (Throwable throwable7) {
                                    throwable = throwable7;
                                    throw throwable7;
                                }
                                finally {
                                    if (scanner == null) continue;
                                    if (throwable != null) {
                                        try {
                                            scanner.close();
                                        }
                                        catch (Throwable throwable8) {
                                            throwable.addSuppressed(throwable8);
                                        }
                                        continue;
                                    }
                                    scanner.close();
                                }
                            }
                            break block47;
                        }
                        this.task.txn.finish();
                        cfs.addSSTables(readers);
                        cfs.indexManager.buildAllIndexesBlocking(readers);
                        if (!cfs.isRowCacheEnabled() && !cfs.metadata.isCounter()) break block47;
                        ArrayList boundsToInvalidate = new ArrayList(readers.size());
                        readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
                        Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
                        if (cfs.isRowCacheEnabled() && (invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds)) > 0) {
                            logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream receive task completed.", new Object[]{this.task.session.planId(), invalidatedKeys, cfs.keyspace.getName(), cfs.getTableName()});
                        }
                        if (cfs.metadata.isCounter() && (invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds)) > 0) {
                            logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream receive task completed.", new Object[]{this.task.session.planId(), invalidatedKeys, cfs.keyspace.getName(), cfs.getTableName()});
                        }
                    }
                }
                this.task.session.taskCompleted(this.task);
                return;
            }
            catch (Throwable t) {
                logger.error("Error applying streamed data: ", t);
                JVMStabilityInspector.inspectThrowable(t);
                this.task.session.onError(t);
                return;
            }
            finally {
                if (hasViews) {
                    if (cfs != null) {
                        cfs.forceBlockingFlush();
                    }
                    this.task.txn.abort();
                }
            }
        }
    }
}

