/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution.buffer;

import com.facebook.presto.OutputBuffers;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.execution.SystemMemoryUsageListener;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.buffer.BufferInfo;
import com.facebook.presto.execution.buffer.BufferResult;
import com.facebook.presto.execution.buffer.BufferState;
import com.facebook.presto.execution.buffer.OutputBuffer;
import com.facebook.presto.execution.buffer.OutputBufferInfo;
import com.facebook.presto.execution.buffer.OutputBufferMemoryManager;
import com.facebook.presto.execution.buffer.SharedOutputBufferPartition;
import com.facebook.presto.spi.Page;
import com.facebook.presto.util.ImmutableCollectors;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.DataSize;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class SharedOutputBuffer
implements OutputBuffer {
    private final SettableFuture<OutputBuffers> finalOutputBuffers = SettableFuture.create();
    @GuardedBy(value="this")
    private OutputBuffers outputBuffers;
    @GuardedBy(value="this")
    private final Map<Integer, SharedOutputBufferPartition> partitionBuffers = new ConcurrentHashMap<Integer, SharedOutputBufferPartition>();
    @GuardedBy(value="this")
    private final Map<Integer, Set<NamedBuffer>> partitionToNamedBuffer = new ConcurrentHashMap<Integer, Set<NamedBuffer>>();
    @GuardedBy(value="this")
    private final ConcurrentMap<OutputBuffers.OutputBufferId, NamedBuffer> namedBuffers = new ConcurrentHashMap<OutputBuffers.OutputBufferId, NamedBuffer>();
    @GuardedBy(value="this")
    private final Set<OutputBuffers.OutputBufferId> abortedBuffers = new HashSet<OutputBuffers.OutputBufferId>();
    private final StateMachine<BufferState> state;
    private final String taskInstanceId;
    @GuardedBy(value="this")
    private final List<GetBufferResult> stateChangeListeners = new ArrayList<GetBufferResult>();
    private final OutputBufferMemoryManager memoryManager;

    public SharedOutputBuffer(TaskId taskId, String taskInstanceId, Executor executor, DataSize maxBufferSize) {
        this(taskId, taskInstanceId, executor, maxBufferSize, (long deltaMemory) -> {});
    }

    public SharedOutputBuffer(TaskId taskId, String taskInstanceId, Executor executor, DataSize maxBufferSize, SystemMemoryUsageListener systemMemoryUsageListener) {
        this(taskInstanceId, new StateMachine<BufferState>(Objects.requireNonNull(taskId, "taskId is null") + "-buffer", Objects.requireNonNull(executor, "executor is null"), BufferState.OPEN, BufferState.TERMINAL_BUFFER_STATES), maxBufferSize, systemMemoryUsageListener, executor);
    }

    public SharedOutputBuffer(String taskInstanceId, StateMachine<BufferState> state, DataSize maxBufferSize, SystemMemoryUsageListener systemMemoryUsageListener, Executor notificationExecutor) {
        this.taskInstanceId = Objects.requireNonNull(taskInstanceId, "taskInstanceId is null");
        this.state = Objects.requireNonNull(state, "state is null");
        Objects.requireNonNull(maxBufferSize, "maxBufferSize is null");
        Preconditions.checkArgument((maxBufferSize.toBytes() > 0L ? 1 : 0) != 0, (Object)"maxBufferSize must be at least 1");
        Objects.requireNonNull(systemMemoryUsageListener, "systemMemoryUsageListener is null");
        Objects.requireNonNull(notificationExecutor, "notificationExecutor is null");
        this.memoryManager = new OutputBufferMemoryManager(maxBufferSize.toBytes(), systemMemoryUsageListener, notificationExecutor);
    }

    @Override
    public void addStateChangeListener(StateMachine.StateChangeListener<BufferState> stateChangeListener) {
        this.state.addStateChangeListener(stateChangeListener);
    }

    @Override
    public double getUtilization() {
        return this.memoryManager.getUtilization();
    }

    @Override
    public boolean isFinished() {
        return this.state.get() == BufferState.FINISHED;
    }

    @Override
    public OutputBufferInfo getInfo() {
        this.checkDoesNotHoldLock();
        BufferState state = this.state.get();
        ImmutableList.Builder infos = ImmutableList.builder();
        for (NamedBuffer namedBuffer : this.namedBuffers.values()) {
            infos.add((Object)namedBuffer.getInfo());
        }
        long totalBufferedBytes = this.partitionBuffers.values().stream().mapToLong(SharedOutputBufferPartition::getBufferedBytes).sum();
        long totalBufferedPages = this.partitionBuffers.values().stream().mapToLong(SharedOutputBufferPartition::getBufferedPageCount).sum();
        long totalRowsSent = this.partitionBuffers.values().stream().mapToLong(SharedOutputBufferPartition::getRowCount).sum();
        long totalPagesSent = this.partitionBuffers.values().stream().mapToLong(SharedOutputBufferPartition::getPageCount).sum();
        return new OutputBufferInfo("SHARED", state, state.canAddBuffers(), state.canAddPages(), totalBufferedBytes, totalBufferedPages, totalRowsSent, totalPagesSent, (List<BufferInfo>)infos.build());
    }

    @Override
    public synchronized void setOutputBuffers(OutputBuffers newOutputBuffers) {
        Objects.requireNonNull(newOutputBuffers, "newOutputBuffers is null");
        if (this.outputBuffers == null) {
            this.outputBuffers = OutputBuffers.createInitialEmptyOutputBuffers(newOutputBuffers.getType());
        }
        if (this.state.get().isTerminal() || this.outputBuffers.getVersion() >= newOutputBuffers.getVersion()) {
            return;
        }
        this.outputBuffers.checkValidTransition(newOutputBuffers);
        this.outputBuffers = newOutputBuffers;
        for (Map.Entry<OutputBuffers.OutputBufferId, Integer> entry : this.outputBuffers.getBuffers().entrySet()) {
            OutputBuffers.OutputBufferId bufferId = entry.getKey();
            if (this.namedBuffers.containsKey(bufferId)) continue;
            Preconditions.checkState((boolean)this.state.get().canAddBuffers(), (String)"Cannot add buffers to %s", (Object[])new Object[]{this.getClass().getSimpleName()});
            int partition = entry.getValue();
            SharedOutputBufferPartition partitionBuffer = this.createOrGetPartitionBuffer(partition);
            NamedBuffer namedBuffer = new NamedBuffer(bufferId, partitionBuffer);
            if (this.abortedBuffers.contains(bufferId)) {
                namedBuffer.abort();
            }
            this.namedBuffers.put(bufferId, namedBuffer);
            Set namedBuffers = this.partitionToNamedBuffer.computeIfAbsent(partition, k -> new HashSet());
            namedBuffers.add(namedBuffer);
        }
        if (this.outputBuffers.isNoMoreBufferIds()) {
            this.state.compareAndSet(BufferState.OPEN, BufferState.NO_MORE_BUFFERS);
            this.state.compareAndSet(BufferState.NO_MORE_PAGES, BufferState.FLUSHING);
            this.finalOutputBuffers.set((Object)this.outputBuffers);
        }
        this.updateState();
    }

    private SharedOutputBufferPartition createOrGetPartitionBuffer(int partition) {
        this.checkHoldsLock();
        return this.partitionBuffers.computeIfAbsent(partition, k -> new SharedOutputBufferPartition(partition, this.memoryManager));
    }

    @Override
    public synchronized ListenableFuture<?> enqueue(Page page) {
        return this.enqueue(0, page);
    }

    @Override
    public synchronized ListenableFuture<?> enqueue(int partition, Page page) {
        Objects.requireNonNull(page, "page is null");
        if (!this.state.get().canAddPages()) {
            return Futures.immediateFuture((Object)true);
        }
        SharedOutputBufferPartition partitionBuffer = this.createOrGetPartitionBuffer(partition);
        partitionBuffer.enqueuePage(page);
        this.processPendingReads();
        this.updateState();
        return this.memoryManager.getNotFullFuture();
    }

    @Override
    public synchronized CompletableFuture<BufferResult> get(OutputBuffers.OutputBufferId bufferId, long startingSequenceId, DataSize maxSize) {
        Objects.requireNonNull(bufferId, "outputId is null");
        Preconditions.checkArgument((maxSize.toBytes() > 0L ? 1 : 0) != 0, (Object)"maxSize must be at least 1 byte");
        BufferState state = this.state.get();
        if (state != BufferState.FAILED && !state.canAddBuffers() && this.namedBuffers.get(bufferId) == null) {
            return CompletableFuture.completedFuture(BufferResult.emptyResults(this.taskInstanceId, 0L, true));
        }
        GetBufferResult getBufferResult = new GetBufferResult(bufferId, startingSequenceId, maxSize);
        this.stateChangeListeners.add(getBufferResult);
        this.updateState();
        return getBufferResult.getFuture();
    }

    @Override
    public synchronized void abort(OutputBuffers.OutputBufferId bufferId) {
        Objects.requireNonNull(bufferId, "outputId is null");
        this.abortedBuffers.add(bufferId);
        NamedBuffer namedBuffer = (NamedBuffer)this.namedBuffers.get(bufferId);
        if (namedBuffer != null) {
            namedBuffer.abort();
        }
        this.updateState();
    }

    @Override
    public synchronized void setNoMorePages() {
        if (this.state.compareAndSet(BufferState.OPEN, BufferState.NO_MORE_PAGES) || this.state.compareAndSet(BufferState.NO_MORE_BUFFERS, BufferState.FLUSHING)) {
            this.updateState();
        }
    }

    @Override
    public synchronized void destroy() {
        if (this.state.get().isTerminal()) {
            return;
        }
        this.state.set(BufferState.FINISHED);
        this.partitionBuffers.values().forEach(SharedOutputBufferPartition::destroy);
        this.namedBuffers.values().forEach(NamedBuffer::abort);
        this.processPendingReads();
    }

    @Override
    public synchronized void fail() {
        if (this.state.get().isTerminal()) {
            return;
        }
        this.state.set(BufferState.FAILED);
        this.partitionBuffers.values().forEach(SharedOutputBufferPartition::destroy);
    }

    private void checkFlushComplete() {
        this.checkHoldsLock();
        if (this.state.get() == BufferState.FLUSHING) {
            for (NamedBuffer namedBuffer : this.namedBuffers.values()) {
                if (namedBuffer.isFinished()) continue;
                return;
            }
            this.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateState() {
        this.checkHoldsLock();
        try {
            this.processPendingReads();
            BufferState state = this.state.get();
            if (state.isTerminal()) {
                return;
            }
            if (!state.canAddBuffers() && !this.namedBuffers.isEmpty()) {
                for (Map.Entry<Integer, Set<NamedBuffer>> entry : this.partitionToNamedBuffer.entrySet()) {
                    SharedOutputBufferPartition partition = this.partitionBuffers.get(entry.getKey());
                    long newMasterSequenceId = entry.getValue().stream().mapToLong(NamedBuffer::getSequenceId).min().getAsLong();
                    partition.advanceSequenceId(newMasterSequenceId);
                }
            }
            if (!state.canAddPages()) {
                this.memoryManager.setNoBlockOnFull();
            }
        }
        finally {
            this.checkFlushComplete();
        }
    }

    private void processPendingReads() {
        this.checkHoldsLock();
        Set finishedListeners = (Set)ImmutableList.copyOf(this.stateChangeListeners).stream().filter(GetBufferResult::execute).collect(ImmutableCollectors.toImmutableSet());
        this.stateChangeListeners.removeAll(finishedListeners);
    }

    private void checkHoldsLock() {
        if (!Thread.holdsLock(this)) {
            throw new IllegalStateException(String.format("Thread must hold a lock on the %s", this.getClass().getSimpleName()));
        }
    }

    private void checkDoesNotHoldLock() {
        if (Thread.holdsLock(this)) {
            throw new IllegalStateException(String.format("Thread must NOT hold a lock on the %s", this.getClass().getSimpleName()));
        }
    }

    @Immutable
    private class GetBufferResult {
        private final CompletableFuture<BufferResult> future = new CompletableFuture();
        private final OutputBuffers.OutputBufferId outputId;
        private final long startingSequenceId;
        private final DataSize maxSize;

        public GetBufferResult(OutputBuffers.OutputBufferId outputId, long startingSequenceId, DataSize maxSize) {
            this.outputId = outputId;
            this.startingSequenceId = startingSequenceId;
            this.maxSize = maxSize;
        }

        public CompletableFuture<BufferResult> getFuture() {
            return this.future;
        }

        public boolean execute() {
            SharedOutputBuffer.this.checkHoldsLock();
            if (this.future.isDone()) {
                return true;
            }
            if (SharedOutputBuffer.this.state.get() == BufferState.FAILED) {
                return false;
            }
            try {
                NamedBuffer namedBuffer = (NamedBuffer)SharedOutputBuffer.this.namedBuffers.get(this.outputId);
                if (SharedOutputBuffer.this.state.get() == BufferState.FINISHED) {
                    this.future.complete(BufferResult.emptyResults(SharedOutputBuffer.this.taskInstanceId, namedBuffer == null ? 0L : namedBuffer.getSequenceId(), true));
                    return true;
                }
                if (namedBuffer == null) {
                    return false;
                }
                if (this.startingSequenceId < namedBuffer.getSequenceId()) {
                    this.future.complete(BufferResult.emptyResults(SharedOutputBuffer.this.taskInstanceId, this.startingSequenceId, false));
                    return true;
                }
                BufferResult bufferResult = namedBuffer.getPages(this.startingSequenceId, this.maxSize);
                SharedOutputBuffer.this.checkFlushComplete();
                if (bufferResult.isEmpty() && !bufferResult.isBufferComplete()) {
                    return false;
                }
                this.future.complete(bufferResult);
            }
            catch (Throwable throwable) {
                this.future.completeExceptionally(throwable);
            }
            return true;
        }
    }

    @ThreadSafe
    private final class NamedBuffer {
        private final OutputBuffers.OutputBufferId bufferId;
        private final SharedOutputBufferPartition partition;
        private final AtomicLong sequenceId = new AtomicLong();
        private final AtomicBoolean finished = new AtomicBoolean();

        private NamedBuffer(OutputBuffers.OutputBufferId bufferId, SharedOutputBufferPartition partition) {
            this.bufferId = Objects.requireNonNull(bufferId, "bufferId is null");
            this.partition = Objects.requireNonNull(partition, "partitionBuffer is null");
        }

        public BufferInfo getInfo() {
            SharedOutputBuffer.this.checkDoesNotHoldLock();
            long sequenceId = this.sequenceId.get();
            if (this.finished.get()) {
                return new BufferInfo(this.bufferId, true, 0, sequenceId, this.partition.getInfo());
            }
            int bufferedPages = Math.max(Ints.checkedCast((long)(this.partition.getPageCount() - sequenceId)), 0);
            return new BufferInfo(this.bufferId, this.finished.get(), bufferedPages, sequenceId, this.partition.getInfo());
        }

        public long getSequenceId() {
            SharedOutputBuffer.this.checkHoldsLock();
            return this.sequenceId.get();
        }

        public BufferResult getPages(long startingSequenceId, DataSize maxSize) {
            SharedOutputBuffer.this.checkHoldsLock();
            Preconditions.checkArgument((maxSize.toBytes() > 0L ? 1 : 0) != 0, (Object)"maxSize must be at least 1 byte");
            long sequenceId = this.sequenceId.get();
            Preconditions.checkArgument((startingSequenceId >= sequenceId ? 1 : 0) != 0, (Object)"startingSequenceId is before the beginning of the buffer");
            if (startingSequenceId > sequenceId) {
                this.sequenceId.set(startingSequenceId);
                sequenceId = startingSequenceId;
            }
            if (this.isFinished()) {
                return BufferResult.emptyResults(SharedOutputBuffer.this.taskInstanceId, startingSequenceId, true);
            }
            List<Page> pages = this.partition.getPages(maxSize, sequenceId);
            if (pages.isEmpty() && !((BufferState)((Object)SharedOutputBuffer.this.state.get())).canAddPages() && !this.partition.hasMorePages(sequenceId)) {
                return BufferResult.emptyResults(SharedOutputBuffer.this.taskInstanceId, startingSequenceId, true);
            }
            return new BufferResult(SharedOutputBuffer.this.taskInstanceId, startingSequenceId, startingSequenceId + (long)pages.size(), false, pages);
        }

        public void abort() {
            SharedOutputBuffer.this.checkHoldsLock();
            this.finished.set(true);
            SharedOutputBuffer.this.checkFlushComplete();
        }

        public boolean isFinished() {
            SharedOutputBuffer.this.checkHoldsLock();
            return this.finished.get();
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("bufferId", (Object)this.bufferId).add("sequenceId", this.sequenceId.get()).add("finished", this.finished.get()).toString();
        }
    }
}

