/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution;

import com.facebook.presto.OutputBuffers;
import com.facebook.presto.PagePartitionFunction;
import com.facebook.presto.execution.BufferInfo;
import com.facebook.presto.execution.BufferResult;
import com.facebook.presto.execution.PartitionBuffer;
import com.facebook.presto.execution.SharedBufferInfo;
import com.facebook.presto.execution.SharedBufferMemoryManager;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.execution.SystemMemoryUsageListener;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.spi.Page;
import com.facebook.presto.util.ImmutableCollectors;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.DataSize;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class SharedBuffer {
    private final SettableFuture<OutputBuffers> finalOutputBuffers = SettableFuture.create();
    @GuardedBy(value="this")
    private OutputBuffers outputBuffers = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
    @GuardedBy(value="this")
    private final Map<Integer, PartitionBuffer> partitionBuffers = new ConcurrentHashMap<Integer, PartitionBuffer>();
    @GuardedBy(value="this")
    private final Map<Integer, Set<NamedBuffer>> partitionToNamedBuffer = new ConcurrentHashMap<Integer, Set<NamedBuffer>>();
    @GuardedBy(value="this")
    private final ConcurrentMap<TaskId, NamedBuffer> namedBuffers = new ConcurrentHashMap<TaskId, NamedBuffer>();
    @GuardedBy(value="this")
    private final Set<TaskId> abortedBuffers = new HashSet<TaskId>();
    private final StateMachine<BufferState> state;
    @GuardedBy(value="this")
    private final List<GetBufferResult> stateChangeListeners = new ArrayList<GetBufferResult>();
    private final SharedBufferMemoryManager memoryManager;

    public SharedBuffer(TaskId taskId, Executor executor, DataSize maxBufferSize) {
        this(taskId, executor, maxBufferSize, deltaMemory -> {});
    }

    public SharedBuffer(TaskId taskId, Executor executor, DataSize maxBufferSize, SystemMemoryUsageListener systemMemoryUsageListener) {
        Preconditions.checkNotNull((Object)taskId, (Object)"taskId is null");
        Preconditions.checkNotNull((Object)executor, (Object)"executor is null");
        this.state = new StateMachine<BufferState>(taskId + "-buffer", executor, BufferState.OPEN, BufferState.TERMINAL_BUFFER_STATES);
        Preconditions.checkNotNull((Object)maxBufferSize, (Object)"maxBufferSize is null");
        Preconditions.checkArgument((maxBufferSize.toBytes() > 0L ? 1 : 0) != 0, (Object)"maxBufferSize must be at least 1");
        Preconditions.checkNotNull((Object)systemMemoryUsageListener, (Object)"systemMemoryUsageListener is null");
        this.memoryManager = new SharedBufferMemoryManager(maxBufferSize.toBytes(), systemMemoryUsageListener);
    }

    public void addStateChangeListener(StateMachine.StateChangeListener<BufferState> stateChangeListener) {
        this.state.addStateChangeListener(stateChangeListener);
    }

    public boolean isFinished() {
        return this.state.get() == BufferState.FINISHED;
    }

    public SharedBufferInfo getInfo() {
        Preconditions.checkState((!Thread.holdsLock(this) ? 1 : 0) != 0, (String)"Thread must NOT hold a lock on the %s", (Object[])new Object[]{SharedBuffer.class.getSimpleName()});
        BufferState state = this.state.get();
        ImmutableList.Builder infos = ImmutableList.builder();
        for (NamedBuffer namedBuffer : this.namedBuffers.values()) {
            infos.add((Object)namedBuffer.getInfo());
        }
        long totalBufferedBytes = this.partitionBuffers.values().stream().mapToLong(PartitionBuffer::getBufferedBytes).sum();
        long totalBufferedPages = this.partitionBuffers.values().stream().mapToLong(PartitionBuffer::getBufferedPageCount).sum();
        long totalQueuedPages = this.partitionBuffers.values().stream().mapToLong(PartitionBuffer::getQueuedPageCount).sum();
        long totalPagesSent = this.partitionBuffers.values().stream().mapToLong(PartitionBuffer::getPageCount).sum();
        return new SharedBufferInfo(state, state.canAddBuffers(), state.canAddPages(), totalBufferedBytes, totalBufferedPages, totalQueuedPages, totalPagesSent, (List<BufferInfo>)infos.build());
    }

    public ListenableFuture<OutputBuffers> getFinalOutputBuffers() {
        return this.finalOutputBuffers;
    }

    public synchronized void setOutputBuffers(OutputBuffers newOutputBuffers) {
        Preconditions.checkNotNull((Object)newOutputBuffers, (Object)"newOutputBuffers is null");
        if (this.state.get().isTerminal() || this.outputBuffers.getVersion() >= newOutputBuffers.getVersion()) {
            return;
        }
        Sets.SetView missingBuffers = Sets.difference(this.outputBuffers.getBuffers().keySet(), newOutputBuffers.getBuffers().keySet());
        Preconditions.checkArgument((boolean)missingBuffers.isEmpty(), (String)"newOutputBuffers does not have existing buffers %s", (Object[])new Object[]{missingBuffers});
        Preconditions.checkArgument((!this.outputBuffers.isNoMoreBufferIds() || newOutputBuffers.isNoMoreBufferIds() ? 1 : 0) != 0, (Object)"Expected newOutputBuffers to have noMoreBufferIds set");
        this.outputBuffers = newOutputBuffers;
        for (Map.Entry<TaskId, PagePartitionFunction> entry : this.outputBuffers.getBuffers().entrySet()) {
            TaskId bufferId = entry.getKey();
            if (this.namedBuffers.containsKey(bufferId)) continue;
            Preconditions.checkState((boolean)this.state.get().canAddBuffers(), (String)"Cannot add buffers to %s", (Object[])new Object[]{SharedBuffer.class.getSimpleName()});
            PagePartitionFunction partitionFunction = entry.getValue();
            int partition = partitionFunction.getPartition();
            PartitionBuffer partitionBuffer = this.createOrGetPartitionBuffer(partition);
            NamedBuffer namedBuffer = new NamedBuffer(bufferId, partitionBuffer);
            if (this.abortedBuffers.contains(bufferId)) {
                namedBuffer.abort();
            }
            this.namedBuffers.put(bufferId, namedBuffer);
            Set namedBuffers = this.partitionToNamedBuffer.computeIfAbsent(partition, k -> new HashSet());
            namedBuffers.add(namedBuffer);
        }
        if (this.outputBuffers.isNoMoreBufferIds()) {
            this.state.compareAndSet(BufferState.OPEN, BufferState.NO_MORE_BUFFERS);
            this.state.compareAndSet(BufferState.NO_MORE_PAGES, BufferState.FLUSHING);
            this.finalOutputBuffers.set((Object)this.outputBuffers);
        }
        this.updateState();
    }

    private PartitionBuffer createOrGetPartitionBuffer(int partition) {
        Preconditions.checkState((boolean)Thread.holdsLock(this), (String)"Thread must hold a lock on the %s", (Object[])new Object[]{SharedBuffer.class.getSimpleName()});
        return this.partitionBuffers.computeIfAbsent(partition, k -> new PartitionBuffer(partition, this.memoryManager));
    }

    public synchronized ListenableFuture<?> enqueue(Page page) {
        return this.enqueue(0, page);
    }

    public synchronized ListenableFuture<?> enqueue(int partition, Page page) {
        Preconditions.checkNotNull((Object)page, (Object)"page is null");
        if (!this.state.get().canAddPages()) {
            return Futures.immediateFuture((Object)true);
        }
        PartitionBuffer partitionBuffer = this.createOrGetPartitionBuffer(partition);
        ListenableFuture<?> result = partitionBuffer.enqueuePage(page);
        this.processPendingReads();
        this.updateState();
        return result;
    }

    public synchronized ListenableFuture<BufferResult> get(TaskId outputId, long startingSequenceId, DataSize maxSize) {
        Preconditions.checkNotNull((Object)outputId, (Object)"outputId is null");
        Preconditions.checkArgument((maxSize.toBytes() > 0L ? 1 : 0) != 0, (Object)"maxSize must be at least 1 byte");
        BufferState state = this.state.get();
        if (state != BufferState.FAILED && !state.canAddBuffers() && this.namedBuffers.get(outputId) == null) {
            return Futures.immediateFuture((Object)BufferResult.emptyResults(0L, true));
        }
        GetBufferResult getBufferResult = new GetBufferResult(outputId, startingSequenceId, maxSize);
        this.stateChangeListeners.add(getBufferResult);
        this.updateState();
        return getBufferResult.getFuture();
    }

    public synchronized void abort(TaskId outputId) {
        Preconditions.checkNotNull((Object)outputId, (Object)"outputId is null");
        this.abortedBuffers.add(outputId);
        NamedBuffer namedBuffer = (NamedBuffer)this.namedBuffers.get(outputId);
        if (namedBuffer != null) {
            namedBuffer.abort();
        }
        this.updateState();
    }

    public synchronized void setNoMorePages() {
        if (this.state.compareAndSet(BufferState.OPEN, BufferState.NO_MORE_PAGES) || this.state.compareAndSet(BufferState.NO_MORE_BUFFERS, BufferState.FLUSHING)) {
            this.updateState();
        }
    }

    public synchronized void destroy() {
        if (this.state.get().isTerminal()) {
            return;
        }
        this.state.set(BufferState.FINISHED);
        this.partitionBuffers.values().forEach(PartitionBuffer::destroy);
        this.namedBuffers.values().forEach(NamedBuffer::abort);
        this.processPendingReads();
    }

    public synchronized void fail() {
        if (this.state.get().isTerminal()) {
            return;
        }
        this.state.set(BufferState.FAILED);
        this.partitionBuffers.values().forEach(PartitionBuffer::destroy);
    }

    private void checkFlushComplete() {
        Preconditions.checkState((boolean)Thread.holdsLock(this), (String)"Thread must hold a lock on the %s", (Object[])new Object[]{SharedBuffer.class.getSimpleName()});
        if (this.state.get() == BufferState.FLUSHING) {
            for (NamedBuffer namedBuffer : this.namedBuffers.values()) {
                if (namedBuffer.checkCompletion()) continue;
                return;
            }
            this.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateState() {
        Preconditions.checkState((boolean)Thread.holdsLock(this), (String)"Thread must hold a lock on the %s", (Object[])new Object[]{SharedBuffer.class.getSimpleName()});
        try {
            this.processPendingReads();
            BufferState state = this.state.get();
            if (state.isTerminal()) {
                return;
            }
            if (!state.canAddPages()) {
                this.partitionBuffers.values().forEach(PartitionBuffer::clearQueue);
            }
            if (!state.canAddBuffers() && !this.namedBuffers.isEmpty()) {
                for (Map.Entry<Integer, Set<NamedBuffer>> entry : this.partitionToNamedBuffer.entrySet()) {
                    PartitionBuffer partitionBuffer = this.partitionBuffers.get(entry.getKey());
                    long newMasterSequenceId = entry.getValue().stream().mapToLong(NamedBuffer::getSequenceId).min().getAsLong();
                    partitionBuffer.advanceSequenceId(newMasterSequenceId);
                    this.partitionBuffers.values().forEach(PartitionBuffer::dequeuePages);
                }
            }
            if (!state.canAddPages()) {
                this.namedBuffers.values().forEach(NamedBuffer::checkCompletion);
            }
        }
        finally {
            this.checkFlushComplete();
        }
    }

    private void processPendingReads() {
        Preconditions.checkState((boolean)Thread.holdsLock(this), (String)"Thread must hold a lock on the %s", (Object[])new Object[]{SharedBuffer.class.getSimpleName()});
        ImmutableList.copyOf(this.stateChangeListeners).stream().filter(GetBufferResult::execute).forEach(this.stateChangeListeners::remove);
    }

    @Immutable
    private class GetBufferResult {
        private final SettableFuture<BufferResult> future = SettableFuture.create();
        private final TaskId outputId;
        private final long startingSequenceId;
        private final DataSize maxSize;

        public GetBufferResult(TaskId outputId, long startingSequenceId, DataSize maxSize) {
            this.outputId = outputId;
            this.startingSequenceId = startingSequenceId;
            this.maxSize = maxSize;
        }

        public SettableFuture<BufferResult> getFuture() {
            return this.future;
        }

        public boolean execute() {
            Preconditions.checkState((boolean)Thread.holdsLock(SharedBuffer.this), (String)"Thread must hold a lock on the %s", (Object[])new Object[]{SharedBuffer.class.getSimpleName()});
            if (this.future.isDone()) {
                return true;
            }
            if (SharedBuffer.this.state.get() == BufferState.FAILED) {
                return false;
            }
            try {
                NamedBuffer namedBuffer = (NamedBuffer)SharedBuffer.this.namedBuffers.get(this.outputId);
                if (SharedBuffer.this.state.get() == BufferState.FINISHED) {
                    this.future.set((Object)BufferResult.emptyResults(namedBuffer == null ? 0L : namedBuffer.getSequenceId(), true));
                    return true;
                }
                if (namedBuffer == null) {
                    return false;
                }
                if (this.startingSequenceId < namedBuffer.getSequenceId()) {
                    this.future.set((Object)BufferResult.emptyResults(this.startingSequenceId, false));
                    return true;
                }
                BufferResult bufferResult = namedBuffer.getPages(this.startingSequenceId, this.maxSize);
                SharedBuffer.this.checkFlushComplete();
                if (bufferResult.isEmpty() && !bufferResult.isBufferClosed()) {
                    return false;
                }
                this.future.set((Object)bufferResult);
            }
            catch (Throwable throwable) {
                this.future.setException(throwable);
            }
            return true;
        }
    }

    @ThreadSafe
    private final class NamedBuffer {
        private final TaskId bufferId;
        private final PartitionBuffer partitionBuffer;
        private final AtomicLong sequenceId = new AtomicLong();
        private final AtomicBoolean finished = new AtomicBoolean();

        private NamedBuffer(TaskId bufferId, PartitionBuffer partitionBuffer) {
            this.bufferId = Objects.requireNonNull(bufferId, "bufferId is null");
            this.partitionBuffer = Objects.requireNonNull(partitionBuffer, "partitionBuffer is null");
        }

        public BufferInfo getInfo() {
            Preconditions.checkState((!Thread.holdsLock(SharedBuffer.this) ? 1 : 0) != 0, (String)"Thread must NOT hold a lock on the %s", (Object[])new Object[]{SharedBuffer.class.getSimpleName()});
            long sequenceId = this.sequenceId.get();
            if (this.finished.get()) {
                return new BufferInfo(this.bufferId, true, 0, sequenceId, this.partitionBuffer.getInfo());
            }
            int bufferedPages = Math.max(Ints.checkedCast((long)(this.partitionBuffer.getPageCount() - sequenceId)), 0);
            return new BufferInfo(this.bufferId, this.finished.get(), bufferedPages, sequenceId, this.partitionBuffer.getInfo());
        }

        public long getSequenceId() {
            Preconditions.checkState((boolean)Thread.holdsLock(SharedBuffer.this), (String)"Thread must hold a lock on the %s", (Object[])new Object[]{SharedBuffer.class.getSimpleName()});
            return this.sequenceId.get();
        }

        public BufferResult getPages(long startingSequenceId, DataSize maxSize) {
            Preconditions.checkState((boolean)Thread.holdsLock(SharedBuffer.this), (String)"Thread must hold a lock on the %s", (Object[])new Object[]{SharedBuffer.class.getSimpleName()});
            Preconditions.checkArgument((maxSize.toBytes() > 0L ? 1 : 0) != 0, (Object)"maxSize must be at least 1 byte");
            long sequenceId = this.sequenceId.get();
            Preconditions.checkArgument((startingSequenceId >= sequenceId ? 1 : 0) != 0, (Object)"startingSequenceId is before the beginning of the buffer");
            if (startingSequenceId > sequenceId) {
                this.sequenceId.set(startingSequenceId);
                sequenceId = startingSequenceId;
            }
            if (this.checkCompletion()) {
                return BufferResult.emptyResults(startingSequenceId, true);
            }
            List<Page> pages = this.partitionBuffer.getPages(maxSize, sequenceId);
            return new BufferResult(startingSequenceId, startingSequenceId + (long)pages.size(), false, pages);
        }

        public void abort() {
            Preconditions.checkState((boolean)Thread.holdsLock(SharedBuffer.this), (String)"Thread must hold a lock on the %s", (Object[])new Object[]{SharedBuffer.class.getSimpleName()});
            this.finished.set(true);
        }

        public boolean checkCompletion() {
            Preconditions.checkState((boolean)Thread.holdsLock(SharedBuffer.this), (String)"Thread must hold a lock on the %s", (Object[])new Object[]{SharedBuffer.class.getSimpleName()});
            if (this.finished.get()) {
                return true;
            }
            long pagesAdded = this.partitionBuffer.getPageCount();
            if (!((BufferState)((Object)SharedBuffer.this.state.get())).canAddPages() && this.sequenceId.get() >= pagesAdded) {
                this.finished.set(true);
                SharedBuffer.this.checkFlushComplete();
            }
            return this.finished.get();
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("bufferId", (Object)this.bufferId).add("sequenceId", this.sequenceId.get()).add("finished", this.finished.get()).toString();
        }
    }

    public static enum BufferState {
        OPEN(true, true, false),
        NO_MORE_BUFFERS(true, false, false),
        NO_MORE_PAGES(false, true, false),
        FLUSHING(false, false, false),
        FINISHED(false, false, true),
        FAILED(false, false, true);

        public static final Set<BufferState> TERMINAL_BUFFER_STATES;
        private final boolean newPagesAllowed;
        private final boolean newBuffersAllowed;
        private final boolean terminal;

        private BufferState(boolean newPagesAllowed, boolean newBuffersAllowed, boolean terminal) {
            this.newPagesAllowed = newPagesAllowed;
            this.newBuffersAllowed = newBuffersAllowed;
            this.terminal = terminal;
        }

        public boolean canAddPages() {
            return this.newPagesAllowed;
        }

        public boolean canAddBuffers() {
            return this.newBuffersAllowed;
        }

        public boolean isTerminal() {
            return this.terminal;
        }

        static {
            TERMINAL_BUFFER_STATES = (Set)Stream.of(BufferState.values()).filter(BufferState::isTerminal).collect(ImmutableCollectors.toImmutableSet());
        }
    }
}

