/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.index;

import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.LivenessInfo;
import org.apache.cassandra.db.PartitionColumns;
import org.apache.cassandra.db.RangeTombstone;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowDiffListener;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.index.SecondaryIndexBuilder;
import org.apache.cassandra.index.internal.CassandraIndex;
import org.apache.cassandra.index.transactions.CleanupTransaction;
import org.apache.cassandra.index.transactions.CompactionTransaction;
import org.apache.cassandra.index.transactions.IndexTransaction;
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.Indexes;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Refs;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SecondaryIndexManager
implements IndexRegistry {
    private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
    private Map<String, Index> indexes = Maps.newConcurrentMap();
    private static final ExecutorService asyncExecutor = new JMXEnabledThreadPoolExecutor(1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("SecondaryIndexManagement"), "internal");
    private static final ExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService();
    public final ColumnFamilyStore baseCfs;

    public SecondaryIndexManager(ColumnFamilyStore baseCfs) {
        this.baseCfs = baseCfs;
    }

    public void reload() {
        Indexes tableIndexes = this.baseCfs.metadata.getIndexes();
        this.indexes.keySet().stream().filter(indexName -> !tableIndexes.has((String)indexName)).forEach(this::removeIndex);
        for (IndexMetadata tableIndex : tableIndexes) {
            this.addIndex(tableIndex);
        }
    }

    private Future<?> reloadIndex(IndexMetadata indexDef) {
        IndexMetadata registered = this.indexes.get(indexDef.name).getIndexMetadata();
        if (!registered.equals(indexDef)) {
            Index index = this.indexes.remove(registered.name);
            index.register(this);
            return blockingExecutor.submit(index.getMetadataReloadTask(indexDef));
        }
        return Futures.immediateFuture(null);
    }

    private Future<?> createIndex(IndexMetadata indexDef) {
        Index index = this.createInstance(indexDef);
        index.register(this);
        Callable<?> initialBuildTask = index.getInitializationTask();
        return initialBuildTask == null ? Futures.immediateFuture(null) : asyncExecutor.submit(initialBuildTask);
    }

    public synchronized Future<?> addIndex(IndexMetadata indexDef) {
        if (this.indexes.containsKey(indexDef.name)) {
            return this.reloadIndex(indexDef);
        }
        return this.createIndex(indexDef);
    }

    public synchronized void removeIndex(String indexName) {
        Index index = this.indexes.remove(indexName);
        if (null != index) {
            SecondaryIndexManager.executeBlocking(index.getInvalidateTask());
            this.unregisterIndex(index);
        }
    }

    public Set<IndexMetadata> getDependentIndexes(ColumnDefinition column) {
        if (this.indexes.isEmpty()) {
            return Collections.emptySet();
        }
        HashSet<IndexMetadata> dependentIndexes = new HashSet<IndexMetadata>();
        for (Index index : this.indexes.values()) {
            if (!index.dependsOn(column)) continue;
            dependentIndexes.add(index.getIndexMetadata());
        }
        return dependentIndexes;
    }

    public void markAllIndexesRemoved() {
        this.getBuiltIndexNames().forEach(this::markIndexRemoved);
    }

    public void rebuildIndexesBlocking(Collection<SSTableReader> sstables, Set<String> indexNames) {
        Set<Index> toRebuild = this.indexes.values().stream().filter(index -> indexNames.contains(index.getIndexName())).filter(Index::shouldBuildBlocking).collect(Collectors.toSet());
        if (toRebuild.isEmpty()) {
            logger.info("No defined indexes with the supplied names: {}", (Object)Joiner.on((char)',').join(indexNames));
            return;
        }
        toRebuild.forEach(indexer -> this.markIndexRemoved(indexer.getIndexName()));
        this.buildIndexesBlocking(sstables, toRebuild);
        toRebuild.forEach(indexer -> this.markIndexBuilt(indexer.getIndexName()));
    }

    public void buildAllIndexesBlocking(Collection<SSTableReader> sstables) {
        this.buildIndexesBlocking(sstables, this.indexes.values().stream().filter(Index::shouldBuildBlocking).collect(Collectors.toSet()));
    }

    public void buildIndexBlocking(Index index) {
        if (index.shouldBuildBlocking()) {
            try (ColumnFamilyStore.RefViewFragment viewFragment = this.baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
                 Refs<SSTableReader> sstables = viewFragment.refs;){
                this.buildIndexesBlocking(sstables, Collections.singleton(index));
                this.markIndexBuilt(index.getIndexName());
            }
        }
    }

    public static boolean isIndexColumnFamilyStore(ColumnFamilyStore cfs) {
        return SecondaryIndexManager.isIndexColumnFamily(cfs.name);
    }

    public static boolean isIndexColumnFamily(String cfName) {
        return cfName.contains(".");
    }

    public static ColumnFamilyStore getParentCfs(ColumnFamilyStore cfs) {
        String parentCfs = SecondaryIndexManager.getParentCfsName(cfs.name);
        return cfs.keyspace.getColumnFamilyStore(parentCfs);
    }

    public static String getParentCfsName(String cfName) {
        assert (SecondaryIndexManager.isIndexColumnFamily(cfName));
        return StringUtils.substringBefore((String)cfName, (String)".");
    }

    public static String getIndexName(ColumnFamilyStore cfs) {
        return SecondaryIndexManager.getIndexName(cfs.name);
    }

    public static String getIndexName(String cfName) {
        assert (SecondaryIndexManager.isIndexColumnFamily(cfName));
        return StringUtils.substringAfter((String)cfName, (String)".");
    }

    private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes) {
        if (indexes.isEmpty()) {
            return;
        }
        logger.info("Submitting index build of {} for data in {}", (Object)indexes.stream().map(Index::getIndexName).collect(Collectors.joining(",")), (Object)sstables.stream().map(SSTable::toString).collect(Collectors.joining(",")));
        SecondaryIndexBuilder builder = new SecondaryIndexBuilder(this.baseCfs, indexes, new ReducingKeyIterator(sstables));
        Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
        FBUtilities.waitOnFuture(future);
        this.flushIndexesBlocking(indexes);
        logger.info("Index build of {} complete", (Object)indexes.stream().map(Index::getIndexName).collect(Collectors.joining(",")));
    }

    private void markIndexBuilt(String indexName) {
        SystemKeyspace.setIndexBuilt(this.baseCfs.name, indexName);
    }

    private void markIndexRemoved(String indexName) {
        SystemKeyspace.setIndexRemoved(this.baseCfs.name, indexName);
    }

    public Index getIndexByName(String indexName) {
        return this.indexes.get(indexName);
    }

    private Index createInstance(IndexMetadata indexDef) {
        Index newIndex;
        if (indexDef.isCustom()) {
            assert (indexDef.options != null);
            String className = indexDef.options.get("class_name");
            assert (!Strings.isNullOrEmpty((String)className));
            try {
                Class indexClass = FBUtilities.classForName(className, "Index");
                Constructor ctor = indexClass.getConstructor(ColumnFamilyStore.class, IndexMetadata.class);
                newIndex = (Index)ctor.newInstance(this.baseCfs, indexDef);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else {
            newIndex = CassandraIndex.newIndex(this.baseCfs, indexDef);
        }
        return newIndex;
    }

    public void truncateAllIndexesBlocking(long truncatedAt) {
        SecondaryIndexManager.executeAllBlocking(this.indexes.values().stream(), index -> index.getTruncateTask(truncatedAt));
    }

    public void invalidateAllIndexesBlocking() {
        SecondaryIndexManager.executeAllBlocking(this.indexes.values().stream(), Index::getInvalidateTask);
    }

    public void flushAllIndexesBlocking() {
        this.flushIndexesBlocking((Set<Index>)ImmutableSet.copyOf(this.indexes.values()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void flushIndexesBlocking(Set<Index> indexes) {
        if (indexes.isEmpty()) {
            return;
        }
        ArrayList wait = new ArrayList();
        ArrayList nonCfsIndexes = new ArrayList();
        Tracker tracker = this.baseCfs.getTracker();
        synchronized (tracker) {
            indexes.forEach(index -> index.getBackingTable().map(cfs -> wait.add(cfs.forceFlush())).orElseGet(() -> nonCfsIndexes.add(index)));
        }
        SecondaryIndexManager.executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask);
        FBUtilities.waitOnFutures(wait);
    }

    public void flushAllNonCFSBackedIndexesBlocking() {
        Set<Index> customIndexers = this.indexes.values().stream().filter(index -> !index.getBackingTable().isPresent()).collect(Collectors.toSet());
        this.flushIndexesBlocking(customIndexers);
    }

    public List<String> getBuiltIndexNames() {
        HashSet<String> allIndexNames = new HashSet<String>();
        this.indexes.values().stream().map(Index::getIndexName).forEach(allIndexNames::add);
        return SystemKeyspace.getBuiltIndexes(this.baseCfs.keyspace.getName(), allIndexNames);
    }

    public Set<ColumnFamilyStore> getAllIndexColumnFamilyStores() {
        HashSet<ColumnFamilyStore> backingTables = new HashSet<ColumnFamilyStore>();
        this.indexes.values().forEach(index -> index.getBackingTable().ifPresent(backingTables::add));
        return backingTables;
    }

    public boolean hasIndexes() {
        return !this.indexes.isEmpty();
    }

    public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<Index> indexes, int nowInSec) {
        if (!indexes.isEmpty()) {
            DecoratedKey key = partition.partitionKey();
            Set<Index.Indexer> indexers = indexes.stream().map(index -> index.indexerFor(key, nowInSec, opGroup, IndexTransaction.Type.UPDATE)).collect(Collectors.toSet());
            indexers.forEach(Index.Indexer::begin);
            try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec);){
                if (!filtered.staticRow().isEmpty()) {
                    indexers.forEach(indexer -> indexer.insertRow(filtered.staticRow()));
                }
                while (filtered.hasNext()) {
                    Row row = (Row)filtered.next();
                    indexers.forEach(indexer -> indexer.insertRow(row));
                }
            }
            indexers.forEach(Index.Indexer::finish);
        }
    }

    public void deletePartition(UnfilteredRowIterator partition, int nowInSec) {
        CleanupTransaction indexTransaction = this.newCleanupTransaction(partition.partitionKey(), partition.columns(), nowInSec);
        indexTransaction.start();
        indexTransaction.onPartitionDeletion(partition.partitionLevelDeletion());
        indexTransaction.commit();
        while (partition.hasNext()) {
            Unfiltered unfiltered = (Unfiltered)partition.next();
            if (unfiltered.kind() != Unfiltered.Kind.ROW) continue;
            indexTransaction = this.newCleanupTransaction(partition.partitionKey(), partition.columns(), nowInSec);
            indexTransaction.start();
            indexTransaction.onRowDelete((Row)unfiltered);
            indexTransaction.commit();
        }
    }

    public Index getBestIndexFor(ReadCommand command) {
        Index selected;
        if (this.indexes.isEmpty() || command.rowFilter().isEmpty()) {
            return null;
        }
        ArrayList<Index> searchableIndexes = new ArrayList<Index>();
        for (RowFilter.Expression expression : command.rowFilter()) {
            if (expression.isCustom()) {
                RowFilter.CustomExpression customExpression = (RowFilter.CustomExpression)expression;
                searchableIndexes.add(this.indexes.get(customExpression.getTargetIndex().name));
                continue;
            }
            this.indexes.values().stream().filter(index -> index.supportsExpression(expression.column(), expression.operator())).forEach(searchableIndexes::add);
        }
        if (searchableIndexes.isEmpty()) {
            logger.debug("No applicable indexes found");
            Tracing.trace("No applicable indexes found");
            return null;
        }
        Index index2 = selected = searchableIndexes.size() == 1 ? (Index)searchableIndexes.get(0) : (Index)searchableIndexes.stream().max((a, b) -> Longs.compare((long)a.getEstimatedResultRows(), (long)b.getEstimatedResultRows())).orElseThrow(() -> new AssertionError((Object)"Could not select most selective index"));
        if (Tracing.isTracing()) {
            Tracing.trace("Index mean cardinalities are {}. Scanning with {}.", (Object)searchableIndexes.stream().map(i -> i.getIndexName() + ':' + i.getEstimatedResultRows()).collect(Collectors.joining(",")), (Object)selected.getIndexName());
        }
        return selected;
    }

    public void validate(PartitionUpdate update) throws InvalidRequestException {
        this.indexes.values().stream().filter(i -> i.indexes(update.columns())).forEach(i -> i.validate(update));
    }

    @Override
    public void registerIndex(Index index) {
        this.indexes.put(index.getIndexMetadata().name, index);
        logger.debug("Registered index {}", (Object)index.getIndexMetadata().name);
    }

    @Override
    public void unregisterIndex(Index index) {
        Index removed = this.indexes.remove(index.getIndexMetadata().name);
        logger.debug(removed == null ? "Index {} was not registered" : "Removed index {} from registry", (Object)index.getIndexMetadata().name);
    }

    @Override
    public Index getIndex(IndexMetadata metadata) {
        return this.indexes.get(metadata.name);
    }

    @Override
    public Collection<Index> listIndexes() {
        return ImmutableSet.copyOf(this.indexes.values());
    }

    public UpdateTransaction newUpdateTransaction(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec) {
        if (!this.hasIndexes()) {
            return UpdateTransaction.NO_OP;
        }
        Index.Indexer[] indexers = (Index.Indexer[])this.indexes.values().stream().filter(i -> i.indexes(update.columns())).map(i -> i.indexerFor(update.partitionKey(), nowInSec, opGroup, IndexTransaction.Type.UPDATE)).toArray(Index.Indexer[]::new);
        return indexers.length == 0 ? UpdateTransaction.NO_OP : new WriteTimeTransaction(indexers);
    }

    public CompactionTransaction newCompactionTransaction(DecoratedKey key, PartitionColumns partitionColumns, int versions, int nowInSec) {
        Index[] interestedIndexes = (Index[])this.indexes.values().stream().filter(i -> i.indexes(partitionColumns)).toArray(Index[]::new);
        return interestedIndexes.length == 0 ? CompactionTransaction.NO_OP : new IndexGCTransaction(key, versions, nowInSec, interestedIndexes);
    }

    public CleanupTransaction newCleanupTransaction(DecoratedKey key, PartitionColumns partitionColumns, int nowInSec) {
        if (!this.hasIndexes()) {
            return CleanupTransaction.NO_OP;
        }
        Index[] interestedIndexes = (Index[])this.indexes.values().stream().filter(i -> i.indexes(partitionColumns)).toArray(Index[]::new);
        return interestedIndexes.length == 0 ? CleanupTransaction.NO_OP : new CleanupGCTransaction(key, nowInSec, interestedIndexes);
    }

    private static void executeBlocking(Callable<?> task) {
        if (null != task) {
            FBUtilities.waitOnFuture(blockingExecutor.submit(task));
        }
    }

    private static void executeAllBlocking(Stream<Index> indexers, Function<Index, Callable<?>> function) {
        ArrayList waitFor = new ArrayList();
        indexers.forEach(indexer -> {
            Callable task = (Callable)function.apply((Index)indexer);
            if (null != task) {
                waitFor.add(blockingExecutor.submit(task));
            }
        });
        FBUtilities.waitOnFutures(waitFor);
    }

    private static final class CleanupGCTransaction
    implements CleanupTransaction {
        private final DecoratedKey key;
        private final int nowInSec;
        private final Index[] indexes;
        private Row row;
        private DeletionTime partitionDelete;

        private CleanupGCTransaction(DecoratedKey key, int nowInSec, Index ... indexes) {
            for (Index index : indexes) {
                assert (index != null);
            }
            this.key = key;
            this.indexes = indexes;
            this.nowInSec = nowInSec;
        }

        @Override
        public void start() {
        }

        @Override
        public void onPartitionDeletion(DeletionTime deletionTime) {
            this.partitionDelete = deletionTime;
        }

        @Override
        public void onRowDelete(Row row) {
            this.row = row;
        }

        @Override
        public void commit() {
            if (this.row == null && this.partitionDelete == null) {
                return;
            }
            try (OpOrder.Group opGroup = Keyspace.writeOrder.start();){
                for (Index index : this.indexes) {
                    Index.Indexer indexer = index.indexerFor(this.key, this.nowInSec, opGroup, IndexTransaction.Type.CLEANUP);
                    indexer.begin();
                    if (this.row != null) {
                        indexer.removeRow(this.row);
                    }
                    indexer.finish();
                }
            }
        }
    }

    private static final class IndexGCTransaction
    implements CompactionTransaction {
        private final DecoratedKey key;
        private final int versions;
        private final int nowInSec;
        private final Index[] indexes;
        private Row[] rows;

        private IndexGCTransaction(DecoratedKey key, int versions, int nowInSec, Index ... indexes) {
            for (Index index : indexes) {
                assert (index != null);
            }
            this.key = key;
            this.versions = versions;
            this.indexes = indexes;
            this.nowInSec = nowInSec;
        }

        @Override
        public void start() {
            if (this.versions > 0) {
                this.rows = new Row[this.versions];
            }
        }

        @Override
        public void onRowMerge(Row merged, Row ... versions) {
            final Row.Builder[] builders = new Row.Builder[versions.length];
            RowDiffListener diffListener = new RowDiffListener(){

                @Override
                public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) {
                }

                @Override
                public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original) {
                }

                @Override
                public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original) {
                }

                @Override
                public void onCell(int i, Clustering clustering, Cell merged, Cell original) {
                    if (original != null && merged == null) {
                        if (builders[i] == null) {
                            builders[i] = BTreeRow.sortedBuilder();
                            builders[i].newRow(clustering);
                        }
                        builders[i].addCell(original);
                    }
                }
            };
            Rows.diff(diffListener, merged, versions);
            for (int i = 0; i < builders.length; ++i) {
                if (builders[i] == null) continue;
                this.rows[i] = builders[i].build();
            }
        }

        @Override
        public void commit() {
            if (this.rows == null) {
                return;
            }
            try (OpOrder.Group opGroup = Keyspace.writeOrder.start();){
                for (Index index : this.indexes) {
                    Index.Indexer indexer = index.indexerFor(this.key, this.nowInSec, opGroup, IndexTransaction.Type.COMPACTION);
                    indexer.begin();
                    for (Row row : this.rows) {
                        if (row == null) continue;
                        indexer.removeRow(row);
                    }
                    indexer.finish();
                }
            }
        }
    }

    private static final class WriteTimeTransaction
    implements UpdateTransaction {
        private final Index.Indexer[] indexers;

        private WriteTimeTransaction(Index.Indexer ... indexers) {
            for (Index.Indexer indexer : indexers) {
                assert (indexer != null);
            }
            this.indexers = indexers;
        }

        @Override
        public void start() {
            for (Index.Indexer indexer : this.indexers) {
                indexer.begin();
            }
        }

        @Override
        public void onPartitionDeletion(DeletionTime deletionTime) {
            for (Index.Indexer indexer : this.indexers) {
                indexer.partitionDelete(deletionTime);
            }
        }

        @Override
        public void onRangeTombstone(RangeTombstone tombstone) {
            for (Index.Indexer indexer : this.indexers) {
                indexer.rangeTombstone(tombstone);
            }
        }

        @Override
        public void onInserted(Row row) {
            Arrays.stream(this.indexers).forEach(h -> h.insertRow(row));
        }

        @Override
        public void onUpdated(Row existing, Row updated) {
            final Row.Builder toRemove = BTreeRow.sortedBuilder();
            toRemove.newRow(existing.clustering());
            final Row.Builder toInsert = BTreeRow.sortedBuilder();
            toInsert.newRow(updated.clustering());
            RowDiffListener diffListener = new RowDiffListener(){

                @Override
                public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) {
                    if (merged != null && merged != original) {
                        toInsert.addPrimaryKeyLivenessInfo(merged);
                    }
                }

                @Override
                public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original) {
                }

                @Override
                public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original) {
                }

                @Override
                public void onCell(int i, Clustering clustering, Cell merged, Cell original) {
                    if (merged != null && merged != original) {
                        toInsert.addCell(merged);
                    }
                    if (merged == null || original != null && this.shouldCleanupOldValue(original, merged)) {
                        toRemove.addCell(original);
                    }
                }
            };
            Rows.diff(diffListener, updated, existing);
            Row oldRow = toRemove.build();
            Row newRow = toInsert.build();
            for (Index.Indexer indexer : this.indexers) {
                indexer.updateRow(oldRow, newRow);
            }
        }

        @Override
        public void commit() {
            for (Index.Indexer indexer : this.indexers) {
                indexer.finish();
            }
        }

        private boolean shouldCleanupOldValue(Cell oldCell, Cell newCell) {
            return !oldCell.value().equals(newCell.value()) || oldCell.timestamp() != newCell.timestamp();
        }
    }
}

