/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.impl.api.index;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.eclipse.collections.impl.utility.ArrayIterate;
import org.neo4j.common.EntityType;
import org.neo4j.common.TokenNameLookup;
import org.neo4j.function.Predicates;
import org.neo4j.function.ThrowingConsumer;
import org.neo4j.internal.helpers.Exceptions;
import org.neo4j.internal.helpers.collection.Pair;
import org.neo4j.internal.helpers.collection.Visitor;
import org.neo4j.internal.kernel.api.PopulationProgress;
import org.neo4j.internal.schema.IndexDescriptor;
import org.neo4j.internal.schema.SchemaDescriptor;
import org.neo4j.internal.schema.SchemaDescriptorSupplier;
import org.neo4j.internal.schema.SchemaState;
import org.neo4j.io.pagecache.tracing.PageCacheTracer;
import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracer;
import org.neo4j.kernel.api.exceptions.index.FlipFailedKernelException;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException;
import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.index.IndexSample;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.MinimalIndexAccessor;
import org.neo4j.kernel.impl.api.index.FailedIndexProxy;
import org.neo4j.kernel.impl.api.index.FailedIndexProxyFactory;
import org.neo4j.kernel.impl.api.index.FlippableIndexProxy;
import org.neo4j.kernel.impl.api.index.IndexPopulationFailure;
import org.neo4j.kernel.impl.api.index.IndexPopulationJob;
import org.neo4j.kernel.impl.api.index.IndexStoreView;
import org.neo4j.kernel.impl.api.index.LoggingPhaseTracker;
import org.neo4j.kernel.impl.api.index.PhaseTracker;
import org.neo4j.kernel.impl.api.index.StoreScan;
import org.neo4j.kernel.impl.api.index.stats.IndexStatisticsStore;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.memory.MemoryTracker;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.storageengine.api.EntityUpdates;
import org.neo4j.storageengine.api.IndexEntryUpdate;
import org.neo4j.storageengine.api.NodePropertyAccessor;
import org.neo4j.util.FeatureToggles;
import org.neo4j.util.VisibleForTesting;

public class MultipleIndexPopulator {
    private static final String MULTIPLE_INDEX_POPULATOR_TAG = "multipleIndexPopulator";
    private static final String POPULATION_WORK_FLUSH_TAG = "populationWorkFlush";
    public static final String QUEUE_THRESHOLD_NAME = "queue_threshold";
    public static final String BATCH_SIZE_NAME = "batch_size";
    static final String AWAIT_TIMEOUT_MINUTES_NAME = "await_timeout_minutes";
    private static final String EOL = System.lineSeparator();
    private final int QUEUE_THRESHOLD = FeatureToggles.getInteger(MultipleIndexPopulator.class, (String)"queue_threshold", (int)20000);
    private final int BATCH_SIZE_SCAN = FeatureToggles.getInteger(MultipleIndexPopulator.class, (String)"batch_size", (int)10000);
    private final boolean PRINT_DEBUG = FeatureToggles.flag(MultipleIndexPopulator.class, (String)"print_debug", (boolean)false);
    private final int AWAIT_TIMEOUT_MINUTES = FeatureToggles.getInteger(MultipleIndexPopulator.class, (String)"await_timeout_minutes", (int)30);
    private final Queue<IndexEntryUpdate<?>> concurrentUpdateQueue = new LinkedBlockingQueue();
    private final List<IndexPopulation> populations = new CopyOnWriteArrayList<IndexPopulation>();
    private final AtomicLong activeTasks = new AtomicLong();
    private final IndexStoreView storeView;
    private final NodePropertyAccessor propertyAccessor;
    private final LogProvider logProvider;
    private final Log log;
    private final EntityType type;
    private final SchemaState schemaState;
    private final IndexStatisticsStore indexStatisticsStore;
    private final PhaseTracker phaseTracker;
    private final JobScheduler jobScheduler;
    private final PageCursorTracer cursorTracer;
    private final MemoryTracker memoryTracker;
    private StoreScan<IndexPopulationFailedKernelException> storeScan;
    private final TokenNameLookup tokenNameLookup;
    private final PageCacheTracer cacheTracer;

    public MultipleIndexPopulator(IndexStoreView storeView, LogProvider logProvider, EntityType type, SchemaState schemaState, IndexStatisticsStore indexStatisticsStore, JobScheduler jobScheduler, TokenNameLookup tokenNameLookup, PageCacheTracer cacheTracer, MemoryTracker memoryTracker) {
        this.storeView = storeView;
        this.cursorTracer = cacheTracer.createPageCursorTracer(MULTIPLE_INDEX_POPULATOR_TAG);
        this.memoryTracker = memoryTracker;
        this.propertyAccessor = storeView.newPropertyAccessor(this.cursorTracer, memoryTracker);
        this.logProvider = logProvider;
        this.log = logProvider.getLog(IndexPopulationJob.class);
        this.type = type;
        this.schemaState = schemaState;
        this.indexStatisticsStore = indexStatisticsStore;
        this.phaseTracker = new LoggingPhaseTracker(logProvider.getLog(IndexPopulationJob.class));
        this.jobScheduler = jobScheduler;
        this.tokenNameLookup = tokenNameLookup;
        this.cacheTracer = cacheTracer;
    }

    IndexPopulation addPopulator(IndexPopulator populator, IndexDescriptor indexDescriptor, FlippableIndexProxy flipper, FailedIndexProxyFactory failedIndexProxyFactory, String indexUserDescription) {
        IndexPopulation population = this.createPopulation(populator, indexDescriptor, flipper, failedIndexProxyFactory, indexUserDescription);
        this.populations.add(population);
        return population;
    }

    private IndexPopulation createPopulation(IndexPopulator populator, IndexDescriptor indexDescriptor, FlippableIndexProxy flipper, FailedIndexProxyFactory failedIndexProxyFactory, String indexUserDescription) {
        return new IndexPopulation(populator, indexDescriptor, flipper, failedIndexProxyFactory, indexUserDescription);
    }

    boolean hasPopulators() {
        return !this.populations.isEmpty();
    }

    public void create(PageCursorTracer cursorTracer) {
        this.forEachPopulation((ThrowingConsumer<IndexPopulation, Exception>)((ThrowingConsumer)population -> {
            this.log.info("Index population started: [%s]", new Object[]{population.indexUserDescription});
            population.create();
        }), cursorTracer);
    }

    StoreScan<IndexPopulationFailedKernelException> createStoreScan(PageCursorTracer cursorTracer) {
        int[] entityTokenIds = this.entityTokenIds();
        int[] propertyKeyIds = this.propertyKeyIds();
        IntPredicate propertyKeyIdFilter = propertyKeyId -> ArrayIterate.contains((int[])propertyKeyIds, (int)propertyKeyId);
        this.storeScan = this.type == EntityType.RELATIONSHIP ? this.storeView.visitRelationships(entityTokenIds, propertyKeyIdFilter, new EntityPopulationVisitor(), null, false, cursorTracer, this.memoryTracker) : this.storeView.visitNodes(entityTokenIds, propertyKeyIdFilter, new EntityPopulationVisitor(), null, false, cursorTracer, this.memoryTracker);
        this.storeScan.setPhaseTracker(this.phaseTracker);
        return new BatchingStoreScan<IndexPopulationFailedKernelException>(this.storeScan);
    }

    void queueConcurrentUpdate(IndexEntryUpdate<?> update) {
        this.concurrentUpdateQueue.add(update);
    }

    public void cancel(Throwable failure, PageCursorTracer cursorTracer) {
        for (IndexPopulation population : this.populations) {
            this.cancel(population, failure, cursorTracer);
        }
    }

    protected void cancel(IndexPopulation population, Throwable failure, PageCursorTracer cursorTracer) {
        Throwable cause;
        if (!this.removeFromOngoingPopulations(population)) {
            return;
        }
        if (failure instanceof IndexPopulationFailedKernelException && (cause = failure.getCause()) instanceof IndexEntryConflictException) {
            failure = cause;
        }
        this.log.error(String.format("Failed to populate index: [%s]", population.indexUserDescription), failure);
        IndexPopulationFailure indexPopulationFailure = IndexPopulationFailure.failure(failure);
        population.cancel(indexPopulationFailure);
        try {
            population.populator.markAsFailed(indexPopulationFailure.asString());
            population.populator.close(false, cursorTracer);
        }
        catch (Throwable e) {
            this.log.error(String.format("Unable to close failed populator for index: [%s]", population.indexUserDescription), e);
        }
    }

    @VisibleForTesting
    MultipleIndexUpdater newPopulatingUpdater(NodePropertyAccessor accessor, PageCursorTracer cursorTracer) {
        HashMap<SchemaDescriptor, Pair<IndexPopulation, IndexUpdater>> updaters = new HashMap<SchemaDescriptor, Pair<IndexPopulation, IndexUpdater>>();
        this.forEachPopulation((ThrowingConsumer<IndexPopulation, Exception>)((ThrowingConsumer)population -> {
            IndexUpdater updater = population.populator.newPopulatingUpdater(accessor, cursorTracer);
            updaters.put(population.schema(), Pair.of((Object)population, (Object)updater));
        }), cursorTracer);
        return new MultipleIndexUpdater(this, updaters, this.logProvider, cursorTracer);
    }

    public void close() {
        this.phaseTracker.stop();
        this.propertyAccessor.close();
        this.cursorTracer.close();
    }

    void resetIndexCounts(PageCursorTracer cursorTracer) {
        this.forEachPopulation((ThrowingConsumer<IndexPopulation, Exception>)((ThrowingConsumer)this::resetIndexCountsForPopulation), cursorTracer);
    }

    private void resetIndexCountsForPopulation(IndexPopulation indexPopulation) {
        this.indexStatisticsStore.replaceStats(indexPopulation.indexId, new IndexSample(0L, 0L, 0L));
    }

    void flipAfterStoreScan(boolean verifyBeforeFlipping, PageCursorTracer cursorTracer) {
        for (IndexPopulation population : this.populations) {
            try {
                population.scanCompleted(cursorTracer);
                population.flip(verifyBeforeFlipping, cursorTracer);
            }
            catch (Throwable t) {
                this.cancel(population, t, cursorTracer);
            }
        }
    }

    private int[] propertyKeyIds() {
        return this.populations.stream().flatMapToInt(this::propertyKeyIds).distinct().toArray();
    }

    private IntStream propertyKeyIds(IndexPopulation population) {
        return IntStream.of(population.schema().getPropertyIds());
    }

    private int[] entityTokenIds() {
        return this.populations.stream().flatMapToInt(population -> Arrays.stream(population.schema().getEntityTokenIds())).toArray();
    }

    public void stop(PageCursorTracer cursorTracer) {
        this.forEachPopulation((ThrowingConsumer<IndexPopulation, Exception>)((ThrowingConsumer)population -> this.stop((IndexPopulation)population, cursorTracer)), cursorTracer);
    }

    void stop(IndexPopulation indexPopulation, PageCursorTracer cursorTracer) {
        indexPopulation.disconnectAndStop(cursorTracer);
    }

    void dropIndexPopulation(IndexPopulation indexPopulation) {
        indexPopulation.disconnectAndDrop();
    }

    private boolean removeFromOngoingPopulations(IndexPopulation indexPopulation) {
        return this.populations.remove(indexPopulation);
    }

    boolean applyConcurrentUpdateQueueBatched(long currentlyIndexedNodeId) {
        return this.applyConcurrentUpdateQueue(this.QUEUE_THRESHOLD, currentlyIndexedNodeId);
    }

    private void flushAll() {
        this.populations.forEach(this::flush);
        this.awaitCompletion();
    }

    private void flush(IndexPopulation population) {
        this.phaseTracker.enterPhase(PhaseTracker.Phase.WRITE);
        this.activeTasks.incrementAndGet();
        List<IndexEntryUpdate<?>> batch = population.takeCurrentBatchFromScan();
        this.jobScheduler.schedule(Group.INDEX_POPULATION_WORK, () -> {
            try (PageCursorTracer cursorTracer = this.cacheTracer.createPageCursorTracer(POPULATION_WORK_FLUSH_TAG);){
                String batchDescription = "EMPTY";
                if (this.PRINT_DEBUG) {
                    if (!batch.isEmpty()) {
                        batchDescription = String.format("[%d, %d - %d]", batch.size(), ((IndexEntryUpdate)batch.get(0)).getEntityId(), ((IndexEntryUpdate)batch.get(batch.size() - 1)).getEntityId());
                    }
                    this.log.info("Applying scan batch %s", new Object[]{batchDescription});
                }
                population.populator.add((Collection)batch, cursorTracer);
                if (this.PRINT_DEBUG) {
                    this.log.info("Applied scan batch %s", new Object[]{batchDescription});
                }
            }
            catch (Throwable failure) {
                this.cancel(population, failure, this.cursorTracer);
            }
            finally {
                this.activeTasks.decrementAndGet();
            }
        });
    }

    private boolean applyConcurrentUpdateQueue(int queueThreshold, long currentlyIndexedNodeId) {
        int queueSize = this.concurrentUpdateQueue.size();
        if (queueSize > 0 && queueSize >= queueThreshold) {
            if (this.PRINT_DEBUG) {
                this.log.info("Populating from queue at %d", new Object[]{currentlyIndexedNodeId});
            }
            this.flushAll();
            try (MultipleIndexUpdater updater = this.newPopulatingUpdater(this.propertyAccessor, this.cursorTracer);){
                do {
                    IndexEntryUpdate<?> update = this.concurrentUpdateQueue.poll();
                    this.storeScan.acceptUpdate(updater, update, currentlyIndexedNodeId);
                    if (!this.PRINT_DEBUG) continue;
                    this.log.info("Applied %s from queue" + (update == null ? null : update.describe(this.tokenNameLookup)));
                } while (!this.concurrentUpdateQueue.isEmpty());
            }
            if (this.PRINT_DEBUG) {
                this.log.info("Done applying updates from queue");
            }
            return true;
        }
        return false;
    }

    private void forEachPopulation(ThrowingConsumer<IndexPopulation, Exception> action, PageCursorTracer cursorTracer) {
        for (IndexPopulation population : this.populations) {
            try {
                action.accept((Object)population);
            }
            catch (Throwable failure) {
                this.cancel(population, failure, cursorTracer);
            }
        }
    }

    private void awaitCompletion() {
        try {
            this.log.debug("Waiting " + this.AWAIT_TIMEOUT_MINUTES + " minutes for all submitted and active flush tasks to complete." + EOL + this);
            BooleanSupplier allSubmittedTasksCompleted = () -> this.activeTasks.get() == 0L;
            Predicates.await((BooleanSupplier)allSubmittedTasksCompleted, (long)this.AWAIT_TIMEOUT_MINUTES, (TimeUnit)TimeUnit.MINUTES);
        }
        catch (TimeoutException e) {
            this.handleTimeout();
        }
    }

    private void handleTimeout() {
        throw new IllegalStateException("Index population tasks were not able to complete in " + this.AWAIT_TIMEOUT_MINUTES + " minutes." + EOL + this + EOL + MultipleIndexPopulator.allStackTraces());
    }

    private static String allStackTraces() {
        return Thread.getAllStackTraces().entrySet().stream().map(entry -> Exceptions.stringify((Thread)((Thread)entry.getKey()), (StackTraceElement[])((StackTraceElement[])entry.getValue()))).collect(Collectors.joining());
    }

    public String toString() {
        String updatesString = this.populations.stream().map(population -> population.batchedUpdatesFromScan.size() + " updates").collect(Collectors.joining(", ", "[", "]"));
        return "MultipleIndexPopulator{activeTasks=" + this.activeTasks + ", batchedUpdatesFromScan = " + updatesString + ", concurrentUpdateQueue = " + this.concurrentUpdateQueue.size() + "}";
    }

    private class BatchingStoreScan<E extends Exception>
    extends DelegatingStoreScan<E> {
        BatchingStoreScan(StoreScan<E> delegate) {
            super(delegate);
        }

        @Override
        public void run() throws E {
            super.run();
            MultipleIndexPopulator.this.log.info("Completed node store scan. Flushing all pending updates." + EOL + MultipleIndexPopulator.this);
            MultipleIndexPopulator.this.flushAll();
        }
    }

    protected static class DelegatingStoreScan<E extends Exception>
    implements StoreScan<E> {
        private final StoreScan<E> delegate;

        DelegatingStoreScan(StoreScan<E> delegate) {
            this.delegate = delegate;
        }

        @Override
        public void run() throws E {
            this.delegate.run();
        }

        @Override
        public void stop() {
            this.delegate.stop();
        }

        @Override
        public void acceptUpdate(MultipleIndexUpdater updater, IndexEntryUpdate<?> update, long currentlyIndexedNodeId) {
            this.delegate.acceptUpdate(updater, update, currentlyIndexedNodeId);
        }

        @Override
        public PopulationProgress getProgress() {
            return this.delegate.getProgress();
        }

        @Override
        public void setPhaseTracker(PhaseTracker phaseTracker) {
            this.delegate.setPhaseTracker(phaseTracker);
        }
    }

    private class EntityPopulationVisitor
    implements Visitor<EntityUpdates, IndexPopulationFailedKernelException> {
        private EntityPopulationVisitor() {
        }

        public boolean visit(EntityUpdates updates) {
            this.addFromScan(updates);
            if (MultipleIndexPopulator.this.PRINT_DEBUG) {
                MultipleIndexPopulator.this.log.info("Added scan updates for entity %d", new Object[]{updates.getEntityId()});
            }
            return MultipleIndexPopulator.this.applyConcurrentUpdateQueueBatched(updates.getEntityId());
        }

        private void addFromScan(EntityUpdates updates) {
            for (IndexEntryUpdate indexUpdate : updates.forIndexKeys(MultipleIndexPopulator.this.populations)) {
                ((IndexPopulation)indexUpdate.indexKey()).onUpdateFromScan(indexUpdate);
            }
        }
    }

    public class IndexPopulation
    implements SchemaDescriptorSupplier {
        public final IndexPopulator populator;
        final FlippableIndexProxy flipper;
        private final long indexId;
        private final IndexDescriptor indexDescriptor;
        private final FailedIndexProxyFactory failedIndexProxyFactory;
        private final String indexUserDescription;
        private boolean populationOngoing = true;
        private final ReentrantLock populatorLock = new ReentrantLock();
        List<IndexEntryUpdate<?>> batchedUpdatesFromScan;

        IndexPopulation(IndexPopulator populator, IndexDescriptor indexDescriptor, FlippableIndexProxy flipper, FailedIndexProxyFactory failedIndexProxyFactory, String indexUserDescription) {
            this.populator = populator;
            this.indexDescriptor = indexDescriptor;
            this.indexId = indexDescriptor.getId();
            this.flipper = flipper;
            this.failedIndexProxyFactory = failedIndexProxyFactory;
            this.indexUserDescription = indexUserDescription;
            this.batchedUpdatesFromScan = new ArrayList(MultipleIndexPopulator.this.BATCH_SIZE_SCAN);
        }

        private void cancel(IndexPopulationFailure failure) {
            this.flipper.flipTo(new FailedIndexProxy(this.indexDescriptor, this.indexUserDescription, (MinimalIndexAccessor)this.populator, failure, MultipleIndexPopulator.this.indexStatisticsStore, MultipleIndexPopulator.this.logProvider));
        }

        void create() {
            this.populatorLock.lock();
            try {
                if (this.populationOngoing) {
                    this.populator.create();
                }
            }
            finally {
                this.populatorLock.unlock();
            }
        }

        void disconnectAndStop(PageCursorTracer cursorTracer) {
            this.disconnect(() -> this.populator.close(false, cursorTracer));
        }

        void disconnectAndDrop() {
            this.disconnect(() -> ((IndexPopulator)this.populator).drop());
        }

        private void disconnect(Runnable specificPopulatorOperation) {
            this.populatorLock.lock();
            try {
                if (this.populationOngoing) {
                    MultipleIndexPopulator.this.removeFromOngoingPopulations(this);
                    specificPopulatorOperation.run();
                    MultipleIndexPopulator.this.resetIndexCountsForPopulation(this);
                    this.populationOngoing = false;
                }
            }
            finally {
                this.populatorLock.unlock();
            }
        }

        private void onUpdateFromScan(IndexEntryUpdate<?> update) {
            this.populator.includeSample(update);
            if (this.addToBatchFromScan(update)) {
                MultipleIndexPopulator.this.flush(this);
            }
        }

        void flip(boolean verifyBeforeFlipping, PageCursorTracer cursorTracer) throws FlipFailedKernelException {
            MultipleIndexPopulator.this.phaseTracker.enterPhase(PhaseTracker.Phase.FLIP);
            this.flipper.flip(() -> {
                this.populatorLock.lock();
                try {
                    if (this.populationOngoing) {
                        this.populator.add(this.takeCurrentBatchFromScan(), cursorTracer);
                        MultipleIndexPopulator.this.applyConcurrentUpdateQueue(0, Long.MAX_VALUE);
                        if (MultipleIndexPopulator.this.populations.contains(this)) {
                            if (verifyBeforeFlipping) {
                                this.populator.verifyDeferredConstraints(MultipleIndexPopulator.this.propertyAccessor);
                            }
                            IndexSample sample = this.populator.sample(cursorTracer);
                            MultipleIndexPopulator.this.indexStatisticsStore.replaceStats(this.indexId, sample);
                            this.populator.close(true, cursorTracer);
                            MultipleIndexPopulator.this.schemaState.clear();
                            Boolean bl = true;
                            return bl;
                        }
                    }
                    Boolean bl = false;
                    return bl;
                }
                finally {
                    this.logCompletionMessage();
                    this.populationOngoing = false;
                    this.populatorLock.unlock();
                }
            }, this.failedIndexProxyFactory);
            MultipleIndexPopulator.this.removeFromOngoingPopulations(this);
        }

        private void logCompletionMessage() {
            MultipleIndexPopulator.this.log.info("Index creation finished for index [%s].", new Object[]{this.indexUserDescription});
        }

        public SchemaDescriptor schema() {
            return this.indexDescriptor.schema();
        }

        public String userDescription(TokenNameLookup tokenNameLookup) {
            return this.indexUserDescription;
        }

        boolean addToBatchFromScan(IndexEntryUpdate<?> update) {
            this.batchedUpdatesFromScan.add(update);
            return this.batchedUpdatesFromScan.size() >= MultipleIndexPopulator.this.BATCH_SIZE_SCAN;
        }

        List<IndexEntryUpdate<?>> takeCurrentBatchFromScan() {
            if (this.batchedUpdatesFromScan.isEmpty()) {
                return Collections.emptyList();
            }
            List<IndexEntryUpdate<?>> batch = this.batchedUpdatesFromScan;
            this.batchedUpdatesFromScan = new ArrayList(MultipleIndexPopulator.this.BATCH_SIZE_SCAN);
            return batch;
        }

        void scanCompleted(PageCursorTracer cursorTracer) throws IndexEntryConflictException {
            this.populator.scanCompleted(MultipleIndexPopulator.this.phaseTracker, MultipleIndexPopulator.this.jobScheduler, cursorTracer);
        }

        PopulationProgress progress(PopulationProgress storeScanProgress) {
            return this.populator.progress(storeScanProgress);
        }
    }

    public static class MultipleIndexUpdater
    implements IndexUpdater {
        private final Map<SchemaDescriptor, Pair<IndexPopulation, IndexUpdater>> populationsWithUpdaters;
        private final MultipleIndexPopulator multipleIndexPopulator;
        private final Log log;
        private final PageCursorTracer cursorTracer;

        MultipleIndexUpdater(MultipleIndexPopulator multipleIndexPopulator, Map<SchemaDescriptor, Pair<IndexPopulation, IndexUpdater>> populationsWithUpdaters, LogProvider logProvider, PageCursorTracer cursorTracer) {
            this.multipleIndexPopulator = multipleIndexPopulator;
            this.populationsWithUpdaters = populationsWithUpdaters;
            this.log = logProvider.getLog(this.getClass());
            this.cursorTracer = cursorTracer;
        }

        public void process(IndexEntryUpdate<?> update) {
            Pair<IndexPopulation, IndexUpdater> pair = this.populationsWithUpdaters.get(update.indexKey().schema());
            if (pair != null) {
                IndexPopulation population = (IndexPopulation)pair.first();
                IndexUpdater updater = (IndexUpdater)pair.other();
                try {
                    population.populator.includeSample(update);
                    updater.process(update);
                }
                catch (Throwable t) {
                    try {
                        updater.close();
                    }
                    catch (Throwable ce) {
                        this.log.error(String.format("Failed to close index updater: [%s]", updater), ce);
                    }
                    this.populationsWithUpdaters.remove(update.indexKey().schema());
                    this.multipleIndexPopulator.cancel(population, t, this.cursorTracer);
                }
            }
        }

        public void close() {
            for (Pair<IndexPopulation, IndexUpdater> pair : this.populationsWithUpdaters.values()) {
                IndexPopulation population = (IndexPopulation)pair.first();
                IndexUpdater updater = (IndexUpdater)pair.other();
                try {
                    updater.close();
                }
                catch (Throwable t) {
                    this.multipleIndexPopulator.cancel(population, t, this.cursorTracer);
                }
            }
            this.populationsWithUpdaters.clear();
        }
    }
}

