/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spiller;

import com.facebook.presto.memory.context.AggregatedMemoryContext;
import com.facebook.presto.operator.PartitionFunction;
import com.facebook.presto.operator.SpillContext;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PageBuilder;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spiller.PartitioningSpiller;
import com.facebook.presto.spiller.SingleStreamSpiller;
import com.facebook.presto.spiller.SingleStreamSpillerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.MoreFutures;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.IntPredicate;
import java.util.function.Predicate;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class GenericPartitioningSpiller
implements PartitioningSpiller {
    private final List<Type> types;
    private final PartitionFunction partitionFunction;
    private final Closer closer = Closer.create();
    private final SingleStreamSpillerFactory spillerFactory;
    private final SpillContext spillContext;
    private final AggregatedMemoryContext memoryContext;
    private final PageBuilder[] pageBuilders;
    private final Optional<SingleStreamSpiller>[] spillers;
    private boolean readingStarted;
    private Set<Integer> spilledPartitions = new HashSet<Integer>();

    public GenericPartitioningSpiller(List<Type> types, PartitionFunction partitionFunction, SpillContext spillContext, AggregatedMemoryContext memoryContext, SingleStreamSpillerFactory spillerFactory) {
        Objects.requireNonNull(spillContext, "spillContext is null");
        this.types = ImmutableList.copyOf((Collection)Objects.requireNonNull(types, "types is null"));
        this.partitionFunction = Objects.requireNonNull(partitionFunction, "partitionFunction is null");
        this.spillerFactory = Objects.requireNonNull(spillerFactory, "spillerFactory is null");
        this.spillContext = (SpillContext)this.closer.register((Closeable)Objects.requireNonNull(spillContext, "spillContext is null"));
        Objects.requireNonNull(memoryContext, "memoryContext is null");
        this.closer.register(() -> ((AggregatedMemoryContext)memoryContext).close());
        this.memoryContext = memoryContext;
        int partitionCount = partitionFunction.getPartitionCount();
        this.pageBuilders = new PageBuilder[partitionCount];
        this.spillers = new Optional[partitionCount];
        for (int partition = 0; partition < partitionCount; ++partition) {
            this.pageBuilders[partition] = new PageBuilder(types);
            this.spillers[partition] = Optional.empty();
        }
    }

    @Override
    public synchronized Iterator<Page> getSpilledPages(int partition) {
        this.readingStarted = true;
        MoreFutures.getFutureValue(this.flush(partition));
        this.spilledPartitions.remove(partition);
        return this.getSpiller(partition).getSpilledPages();
    }

    @Override
    public synchronized void verifyAllPartitionsRead() {
        Verify.verify((boolean)this.spilledPartitions.isEmpty(), (String)"Some partitions were spilled but not read: %s", this.spilledPartitions);
    }

    @Override
    public synchronized PartitioningSpiller.PartitioningSpillResult partitionAndSpill(Page page, IntPredicate spillPartitionMask) {
        Objects.requireNonNull(page, "page is null");
        Objects.requireNonNull(spillPartitionMask, "spillPartitionMask is null");
        Preconditions.checkArgument((page.getChannelCount() == this.types.size() ? 1 : 0) != 0, (String)"Wrong page channel count, expected %s but got %s", (int)this.types.size(), (int)page.getChannelCount());
        Preconditions.checkState((!this.readingStarted ? 1 : 0) != 0, (Object)"reading already started");
        IntArrayList unspilledPositions = this.partitionPage(page, spillPartitionMask);
        ListenableFuture<?> future = this.flushFullBuilders();
        return new PartitioningSpiller.PartitioningSpillResult(future, page.getPositions(unspilledPositions.elements(), 0, unspilledPositions.size()));
    }

    private synchronized IntArrayList partitionPage(Page page, IntPredicate spillPartitionMask) {
        IntArrayList unspilledPositions = new IntArrayList();
        for (int position = 0; position < page.getPositionCount(); ++position) {
            int partition = this.partitionFunction.getPartition(page, position);
            if (!spillPartitionMask.test(partition)) {
                unspilledPositions.add(position);
                continue;
            }
            this.spilledPartitions.add(partition);
            PageBuilder pageBuilder = this.pageBuilders[partition];
            pageBuilder.declarePosition();
            for (int channel = 0; channel < this.types.size(); ++channel) {
                Type type = this.types.get(channel);
                type.appendTo(page.getBlock(channel), position, pageBuilder.getBlockBuilder(channel));
            }
        }
        return unspilledPositions;
    }

    private ListenableFuture<?> flushFullBuilders() {
        return this.flush(PageBuilder::isFull);
    }

    @VisibleForTesting
    ListenableFuture<?> flush() {
        return this.flush(pageBuilder -> true);
    }

    private synchronized ListenableFuture<?> flush(Predicate<PageBuilder> flushCondition) {
        Objects.requireNonNull(flushCondition, "flushCondition is null");
        ImmutableList.Builder futures = ImmutableList.builder();
        for (int partition = 0; partition < this.spillers.length; ++partition) {
            PageBuilder pageBuilder = this.pageBuilders[partition];
            if (!flushCondition.test(pageBuilder)) continue;
            futures.add(this.flush(partition));
        }
        return Futures.allAsList((Iterable)futures.build());
    }

    private synchronized ListenableFuture<?> flush(int partition) {
        PageBuilder pageBuilder = this.pageBuilders[partition];
        if (pageBuilder.isEmpty()) {
            return Futures.immediateFuture(null);
        }
        Page page = pageBuilder.build();
        pageBuilder.reset();
        return this.getSpiller(partition).spill(page);
    }

    private synchronized SingleStreamSpiller getSpiller(int partition) {
        Optional<Closeable> spiller = this.spillers[partition];
        if (!spiller.isPresent()) {
            this.spillers[partition] = spiller = Optional.of(this.closer.register((Closeable)this.spillerFactory.create(this.types, this.spillContext, this.memoryContext.newLocalMemoryContext())));
        }
        return spiller.get();
    }

    @Override
    public synchronized void close() throws IOException {
        this.closer.close();
    }
}

