/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.stats.CounterStat;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.memory.QueryContextVisitor;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.memory.context.MemoryTrackingContext;
import io.trino.operator.BlockedReason;
import io.trino.operator.DriverContext;
import io.trino.operator.OperationTimer;
import io.trino.operator.Operator;
import io.trino.operator.OperatorInfo;
import io.trino.operator.OperatorStats;
import io.trino.operator.SpillContext;
import io.trino.plugin.base.metrics.TDigestHistogram;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.Page;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.metrics.Metrics;
import io.trino.sql.planner.plan.PlanNodeId;
import jakarta.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

public class OperatorContext {
    private final int operatorId;
    private final PlanNodeId planNodeId;
    private final String operatorType;
    private final DriverContext driverContext;
    private final Executor executor;
    private final CounterStat physicalInputDataSize = new CounterStat();
    private final CounterStat physicalInputPositions = new CounterStat();
    private final AtomicLong physicalInputReadTimeNanos = new AtomicLong();
    private final CounterStat internalNetworkInputDataSize = new CounterStat();
    private final CounterStat internalNetworkPositions = new CounterStat();
    private final OperationTimer.OperationTiming addInputTiming = new OperationTimer.OperationTiming();
    private final CounterStat inputDataSize = new CounterStat();
    private final CounterStat inputPositions = new CounterStat();
    private final OperationTimer.OperationTiming getOutputTiming = new OperationTimer.OperationTiming();
    private final CounterStat outputDataSize = new CounterStat();
    private final CounterStat outputPositions = new CounterStat();
    private final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong();
    private final AtomicReference<Metrics> metrics = new AtomicReference<Metrics>(Metrics.EMPTY);
    private final AtomicReference<Metrics> connectorMetrics = new AtomicReference<Metrics>(Metrics.EMPTY);
    private final AtomicLong writerInputDataSize = new AtomicLong();
    private final AtomicLong physicalWrittenDataSize = new AtomicLong();
    private final AtomicReference<SettableFuture<Void>> memoryFuture;
    private final AtomicReference<SettableFuture<Void>> revocableMemoryFuture;
    private final AtomicReference<BlockedMonitor> blockedMonitor = new AtomicReference();
    private final AtomicReference<ListenableFuture<Void>> finishedFuture = new AtomicReference();
    private final AtomicLong blockedWallNanos = new AtomicLong();
    private final OperationTimer.OperationTiming finishTiming = new OperationTimer.OperationTiming();
    private final OperatorSpillContext spillContext;
    private final AtomicReference<Supplier<? extends OperatorInfo>> infoSupplier = new AtomicReference();
    private final AtomicLong peakUserMemoryReservation = new AtomicLong();
    private final AtomicLong peakRevocableMemoryReservation = new AtomicLong();
    private final AtomicLong peakTotalMemoryReservation = new AtomicLong();
    @GuardedBy(value="this")
    private boolean memoryRevokingRequested;
    @Nullable
    @GuardedBy(value="this")
    private Runnable memoryRevocationRequestListener;
    private final MemoryTrackingContext operatorMemoryContext;

    public OperatorContext(int operatorId, PlanNodeId planNodeId, String operatorType, DriverContext driverContext, Executor executor, MemoryTrackingContext operatorMemoryContext) {
        Preconditions.checkArgument((operatorId >= 0 ? 1 : 0) != 0, (Object)"operatorId is negative");
        this.operatorId = operatorId;
        this.planNodeId = Objects.requireNonNull(planNodeId, "planNodeId is null");
        this.operatorType = Objects.requireNonNull(operatorType, "operatorType is null");
        this.driverContext = Objects.requireNonNull(driverContext, "driverContext is null");
        this.spillContext = new OperatorSpillContext(this.driverContext);
        this.executor = Objects.requireNonNull(executor, "executor is null");
        this.memoryFuture = new AtomicReference<SettableFuture>(SettableFuture.create());
        this.memoryFuture.get().set(null);
        this.revocableMemoryFuture = new AtomicReference<SettableFuture>(SettableFuture.create());
        this.revocableMemoryFuture.get().set(null);
        this.operatorMemoryContext = Objects.requireNonNull(operatorMemoryContext, "operatorMemoryContext is null");
        operatorMemoryContext.initializeLocalMemoryContexts(operatorType);
    }

    public int getOperatorId() {
        return this.operatorId;
    }

    public String getOperatorType() {
        return this.operatorType;
    }

    public DriverContext getDriverContext() {
        return this.driverContext;
    }

    public Session getSession() {
        return this.driverContext.getSession();
    }

    void recordAddInput(OperationTimer operationTimer, Page page) {
        operationTimer.recordOperationComplete(this.addInputTiming);
        if (page != null) {
            this.inputDataSize.update(page.getSizeInBytes());
            this.inputPositions.update((long)page.getPositionCount());
        }
    }

    public void recordPhysicalInputWithTiming(long sizeInBytes, long positions, long readNanos) {
        Preconditions.checkArgument((sizeInBytes >= 0L ? 1 : 0) != 0, (String)"sizeInBytes is negative (%s)", (long)sizeInBytes);
        Preconditions.checkArgument((positions >= 0L ? 1 : 0) != 0, (String)"positions is negative (%s)", (long)positions);
        Preconditions.checkArgument((readNanos >= 0L ? 1 : 0) != 0, (String)"readNanos is negative (%s)", (long)readNanos);
        this.physicalInputDataSize.update(sizeInBytes);
        this.physicalInputPositions.update(positions);
        this.physicalInputReadTimeNanos.getAndAdd(readNanos);
    }

    public void recordNetworkInput(long sizeInBytes, long positions) {
        Preconditions.checkArgument((sizeInBytes >= 0L ? 1 : 0) != 0, (String)"sizeInBytes is negative (%s)", (long)sizeInBytes);
        Preconditions.checkArgument((positions >= 0L ? 1 : 0) != 0, (String)"positions is negative (%s)", (long)positions);
        this.internalNetworkInputDataSize.update(sizeInBytes);
        this.internalNetworkPositions.update(positions);
    }

    public void recordProcessedInput(long sizeInBytes, long positions) {
        Preconditions.checkArgument((sizeInBytes >= 0L ? 1 : 0) != 0, (String)"sizeInBytes is negative (%s)", (long)sizeInBytes);
        Preconditions.checkArgument((positions >= 0L ? 1 : 0) != 0, (String)"positions is negative (%s)", (long)positions);
        this.inputDataSize.update(sizeInBytes);
        this.inputPositions.update(positions);
    }

    void recordGetOutput(OperationTimer operationTimer, Page page) {
        operationTimer.recordOperationComplete(this.getOutputTiming);
        if (page != null) {
            this.outputDataSize.update(page.getSizeInBytes());
            this.outputPositions.update((long)page.getPositionCount());
        }
    }

    public void recordOutput(long sizeInBytes, long positions) {
        Preconditions.checkArgument((sizeInBytes >= 0L ? 1 : 0) != 0, (String)"sizeInBytes is negative (%s)", (long)sizeInBytes);
        Preconditions.checkArgument((positions >= 0L ? 1 : 0) != 0, (String)"positions is negative (%s)", (long)positions);
        this.outputDataSize.update(sizeInBytes);
        this.outputPositions.update(positions);
    }

    public void recordDynamicFilterSplitProcessed(long dynamicFilterSplits) {
        this.dynamicFilterSplitsProcessed.getAndAdd(dynamicFilterSplits);
    }

    public void setLatestMetrics(Metrics metrics) {
        this.metrics.set(metrics);
    }

    public void setLatestConnectorMetrics(Metrics metrics) {
        this.connectorMetrics.set(metrics);
    }

    Optional<ListenableFuture<Void>> getFinishedFuture() {
        return Optional.ofNullable(this.finishedFuture.get());
    }

    public void setFinishedFuture(ListenableFuture<Void> finishedFuture) {
        Preconditions.checkState((this.finishedFuture.getAndSet(Objects.requireNonNull(finishedFuture, "finishedFuture is null")) == null ? 1 : 0) != 0, (Object)"finishedFuture already set");
    }

    public void recordWriterInputDataSize(long sizeInBytes) {
        this.writerInputDataSize.getAndAdd(sizeInBytes);
    }

    public void recordPhysicalWrittenData(long sizeInBytes) {
        this.physicalWrittenDataSize.getAndAdd(sizeInBytes);
    }

    public void recordBlocked(ListenableFuture<Void> blocked) {
        Objects.requireNonNull(blocked, "blocked is null");
        BlockedMonitor monitor = new BlockedMonitor();
        BlockedMonitor oldMonitor = this.blockedMonitor.getAndSet(monitor);
        if (oldMonitor != null) {
            oldMonitor.run();
        }
        blocked.addListener((Runnable)monitor, this.executor);
    }

    void recordFinish(OperationTimer operationTimer) {
        operationTimer.recordOperationComplete(this.finishTiming);
    }

    public ListenableFuture<Void> isWaitingForMemory() {
        return (ListenableFuture)this.memoryFuture.get();
    }

    public ListenableFuture<Void> isWaitingForRevocableMemory() {
        return (ListenableFuture)this.revocableMemoryFuture.get();
    }

    public LocalMemoryContext newLocalUserMemoryContext(String allocationTag) {
        return new InternalLocalMemoryContext(this.operatorMemoryContext.newUserMemoryContext(allocationTag), this.memoryFuture, this::updatePeakMemoryReservations, true);
    }

    public LocalMemoryContext localUserMemoryContext() {
        return new InternalLocalMemoryContext(this.operatorMemoryContext.localUserMemoryContext(), this.memoryFuture, this::updatePeakMemoryReservations, false);
    }

    public LocalMemoryContext localRevocableMemoryContext() {
        return new InternalLocalMemoryContext(this.operatorMemoryContext.localRevocableMemoryContext(), this.revocableMemoryFuture, this::updatePeakMemoryReservations, false);
    }

    public AggregatedMemoryContext aggregateUserMemoryContext() {
        return new InternalAggregatedMemoryContext(this.operatorMemoryContext.aggregateUserMemoryContext(), this.memoryFuture, this::updatePeakMemoryReservations, false);
    }

    public AggregatedMemoryContext aggregateRevocableMemoryContext() {
        return new InternalAggregatedMemoryContext(this.operatorMemoryContext.aggregateRevocableMemoryContext(), this.revocableMemoryFuture, this::updatePeakMemoryReservations, false);
    }

    public AggregatedMemoryContext newAggregateUserMemoryContext() {
        return new InternalAggregatedMemoryContext(this.operatorMemoryContext.newAggregateUserMemoryContext(), this.memoryFuture, this::updatePeakMemoryReservations, true);
    }

    public AggregatedMemoryContext newAggregateRevocableMemoryContext() {
        return new InternalAggregatedMemoryContext(this.operatorMemoryContext.newAggregateRevocableMemoryContext(), this.revocableMemoryFuture, this::updatePeakMemoryReservations, true);
    }

    private void updatePeakMemoryReservations() {
        long userMemory = this.operatorMemoryContext.getUserMemory();
        long revocableMemory = this.operatorMemoryContext.getRevocableMemory();
        long totalMemory = userMemory;
        this.peakUserMemoryReservation.accumulateAndGet(userMemory, Math::max);
        this.peakRevocableMemoryReservation.accumulateAndGet(revocableMemory, Math::max);
        this.peakTotalMemoryReservation.accumulateAndGet(totalMemory, Math::max);
    }

    public long getReservedRevocableBytes() {
        return this.operatorMemoryContext.getRevocableMemory();
    }

    private static void updateMemoryFuture(ListenableFuture<Void> memoryPoolFuture, AtomicReference<SettableFuture<Void>> targetFutureReference) {
        if (!memoryPoolFuture.isDone()) {
            SettableFuture<Void> currentMemoryFuture = targetFutureReference.get();
            while (currentMemoryFuture.isDone()) {
                SettableFuture<Void> settableFuture = SettableFuture.create();
                if (targetFutureReference.compareAndSet(currentMemoryFuture, settableFuture)) {
                    currentMemoryFuture = settableFuture;
                    continue;
                }
                currentMemoryFuture = targetFutureReference.get();
            }
            SettableFuture<Void> finalMemoryFuture = currentMemoryFuture;
            memoryPoolFuture.addListener(() -> finalMemoryFuture.set(null), MoreExecutors.directExecutor());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void destroy() {
        OperatorContext operatorContext = this;
        synchronized (operatorContext) {
            this.memoryRevocationRequestListener = null;
        }
        Supplier<? extends OperatorInfo> infoSupplier = this.infoSupplier.get();
        if (infoSupplier != null) {
            OperatorInfo info = infoSupplier.get();
            this.infoSupplier.set((Supplier<? extends OperatorInfo>)(info == null ? null : Suppliers.ofInstance((Object)info)));
        }
        this.operatorMemoryContext.close();
        if (this.operatorMemoryContext.getUserMemory() != 0L) {
            throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("Operator %s has non-zero user memory (%d bytes) after destroy()", this, this.operatorMemoryContext.getUserMemory()));
        }
        if (this.operatorMemoryContext.getRevocableMemory() != 0L) {
            throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("Operator %s has non-zero revocable memory (%d bytes) after destroy()", this, this.operatorMemoryContext.getRevocableMemory()));
        }
    }

    public SpillContext getSpillContext() {
        return this.spillContext;
    }

    public synchronized boolean isMemoryRevokingRequested() {
        return this.memoryRevokingRequested;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long requestMemoryRevoking() {
        long revokedMemory = 0L;
        Runnable listener = null;
        OperatorContext operatorContext = this;
        synchronized (operatorContext) {
            if (!this.isMemoryRevokingRequested() && this.operatorMemoryContext.getRevocableMemory() > 0L) {
                this.memoryRevokingRequested = true;
                revokedMemory = this.operatorMemoryContext.getRevocableMemory();
                listener = this.memoryRevocationRequestListener;
            }
        }
        if (listener != null) {
            OperatorContext.runListener(listener);
        }
        return revokedMemory;
    }

    public synchronized void resetMemoryRevokingRequested() {
        this.memoryRevokingRequested = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setMemoryRevocationRequestListener(Runnable listener) {
        boolean shouldNotify;
        Objects.requireNonNull(listener, "listener is null");
        OperatorContext operatorContext = this;
        synchronized (operatorContext) {
            Preconditions.checkState((this.memoryRevocationRequestListener == null ? 1 : 0) != 0, (Object)"listener already set");
            this.memoryRevocationRequestListener = listener;
            shouldNotify = this.memoryRevokingRequested;
        }
        if (shouldNotify) {
            OperatorContext.runListener(listener);
        }
    }

    private static void runListener(Runnable listener) {
        Objects.requireNonNull(listener, "listener is null");
        try {
            listener.run();
        }
        catch (RuntimeException e) {
            throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Exception while running the listener", (Throwable)e);
        }
    }

    public void setInfoSupplier(Supplier<? extends OperatorInfo> infoSupplier) {
        Objects.requireNonNull(infoSupplier, "infoSupplier is null");
        this.infoSupplier.set(infoSupplier);
    }

    public CounterStat getInputDataSize() {
        return this.inputDataSize;
    }

    public CounterStat getInputPositions() {
        return this.inputPositions;
    }

    public CounterStat getOutputDataSize() {
        return this.outputDataSize;
    }

    public CounterStat getOutputPositions() {
        return this.outputPositions;
    }

    public long getWriterInputDataSize() {
        return this.writerInputDataSize.get();
    }

    public long getPhysicalWrittenDataSize() {
        return this.physicalWrittenDataSize.get();
    }

    public String toString() {
        return String.format("%s-%s", this.operatorType, this.planNodeId);
    }

    public static Metrics getOperatorMetrics(Metrics operatorMetrics, long inputPositions, double cpuTimeSeconds, double wallTimeSeconds, double blockedWallSeconds) {
        return operatorMetrics.mergeWith(new Metrics((Map)ImmutableMap.of((Object)"Input rows distribution", (Object)TDigestHistogram.fromValue((double)inputPositions), (Object)"CPU time distribution (s)", (Object)TDigestHistogram.fromValue((double)cpuTimeSeconds), (Object)"Scheduled time distribution (s)", (Object)TDigestHistogram.fromValue((double)wallTimeSeconds), (Object)"Blocked time distribution (s)", (Object)TDigestHistogram.fromValue((double)blockedWallSeconds))));
    }

    public <C, R> R accept(QueryContextVisitor<C, R> visitor, C context) {
        return visitor.visitOperatorContext(this, context);
    }

    public OperatorStats getOperatorStats() {
        Supplier<? extends OperatorInfo> infoSupplier = this.infoSupplier.get();
        OperatorInfo info = Optional.ofNullable(infoSupplier).map(Supplier::get).orElse(null);
        long inputPositionsCount = this.inputPositions.getTotalCount();
        return new OperatorStats(this.driverContext.getTaskId().getStageId().getId(), this.driverContext.getPipelineContext().getPipelineId(), this.operatorId, this.planNodeId, this.operatorType, 1L, this.addInputTiming.getCalls(), new Duration((double)this.addInputTiming.getWallNanos(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), new Duration((double)this.addInputTiming.getCpuNanos(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes((long)this.physicalInputDataSize.getTotalCount()), this.physicalInputPositions.getTotalCount(), new Duration((double)this.physicalInputReadTimeNanos.get(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes((long)this.internalNetworkInputDataSize.getTotalCount()), this.internalNetworkPositions.getTotalCount(), DataSize.ofBytes((long)(this.physicalInputDataSize.getTotalCount() + this.internalNetworkInputDataSize.getTotalCount())), DataSize.ofBytes((long)this.inputDataSize.getTotalCount()), inputPositionsCount, (double)inputPositionsCount * (double)inputPositionsCount, this.getOutputTiming.getCalls(), new Duration((double)this.getOutputTiming.getWallNanos(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), new Duration((double)this.getOutputTiming.getCpuNanos(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes((long)this.outputDataSize.getTotalCount()), this.outputPositions.getTotalCount(), this.dynamicFilterSplitsProcessed.get(), OperatorContext.getOperatorMetrics(this.metrics.get(), inputPositionsCount, new Duration((double)(this.addInputTiming.getCpuNanos() + this.getOutputTiming.getCpuNanos() + this.finishTiming.getCpuNanos()), TimeUnit.NANOSECONDS).convertTo(TimeUnit.SECONDS).getValue(), new Duration((double)(this.addInputTiming.getWallNanos() + this.getOutputTiming.getWallNanos() + this.finishTiming.getWallNanos()), TimeUnit.NANOSECONDS).convertTo(TimeUnit.SECONDS).getValue(), new Duration((double)this.blockedWallNanos.get(), TimeUnit.NANOSECONDS).convertTo(TimeUnit.SECONDS).getValue()), this.connectorMetrics.get(), DataSize.ofBytes((long)this.physicalWrittenDataSize.get()), new Duration((double)this.blockedWallNanos.get(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), this.finishTiming.getCalls(), new Duration((double)this.finishTiming.getWallNanos(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), new Duration((double)this.finishTiming.getCpuNanos(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes((long)this.operatorMemoryContext.getUserMemory()), DataSize.ofBytes((long)this.getReservedRevocableBytes()), DataSize.ofBytes((long)this.peakUserMemoryReservation.get()), DataSize.ofBytes((long)this.peakRevocableMemoryReservation.get()), DataSize.ofBytes((long)this.peakTotalMemoryReservation.get()), DataSize.ofBytes((long)this.spillContext.getSpilledBytes()), this.memoryFuture.get().isDone() ? Optional.empty() : Optional.of(BlockedReason.WAITING_FOR_MEMORY), info);
    }

    private static long nanosBetween(long start, long end) {
        return Math.max(0L, end - start);
    }

    @VisibleForTesting
    public MemoryTrackingContext getOperatorMemoryContext() {
        return this.operatorMemoryContext;
    }

    @ThreadSafe
    private static class OperatorSpillContext
    implements SpillContext {
        private final DriverContext driverContext;
        private final AtomicLong reservedBytes = new AtomicLong();
        private final AtomicLong spilledBytes = new AtomicLong();

        public OperatorSpillContext(DriverContext driverContext) {
            this.driverContext = driverContext;
        }

        @Override
        public void updateBytes(long bytes) {
            if (bytes >= 0L) {
                this.reservedBytes.addAndGet(bytes);
                this.driverContext.reserveSpill(bytes);
                this.spilledBytes.addAndGet(bytes);
            } else {
                this.reservedBytes.accumulateAndGet(-bytes, this::decrementSpilledReservation);
                this.driverContext.freeSpill(-bytes);
            }
        }

        public long getSpilledBytes() {
            return this.spilledBytes.longValue();
        }

        private long decrementSpilledReservation(long reservedBytes, long bytesBeingFreed) {
            Preconditions.checkArgument((bytesBeingFreed >= 0L ? 1 : 0) != 0);
            Preconditions.checkArgument((bytesBeingFreed <= reservedBytes ? 1 : 0) != 0, (String)"tried to free %s spilled bytes from %s bytes reserved", (long)bytesBeingFreed, (long)reservedBytes);
            return reservedBytes - bytesBeingFreed;
        }

        @Override
        public void close() {
            throw new UnsupportedOperationException(String.format("%s should not be closed directly", this.getClass()));
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("usedBytes", this.reservedBytes.get()).toString();
        }
    }

    private class BlockedMonitor
    implements Runnable {
        private final long start = System.nanoTime();
        private boolean finished;

        private BlockedMonitor() {
        }

        @Override
        public synchronized void run() {
            if (this.finished) {
                return;
            }
            this.finished = true;
            OperatorContext.this.blockedMonitor.compareAndSet(this, null);
            OperatorContext.this.blockedWallNanos.getAndAdd(this.getBlockedTime());
        }

        public long getBlockedTime() {
            return OperatorContext.nanosBetween(this.start, System.nanoTime());
        }
    }

    private static class InternalLocalMemoryContext
    implements LocalMemoryContext {
        private final LocalMemoryContext delegate;
        private final AtomicReference<SettableFuture<Void>> memoryFuture;
        private final Runnable allocationListener;
        private final boolean closeable;

        InternalLocalMemoryContext(LocalMemoryContext delegate, AtomicReference<SettableFuture<Void>> memoryFuture, Runnable allocationListener, boolean closeable) {
            this.delegate = Objects.requireNonNull(delegate, "delegate is null");
            this.memoryFuture = Objects.requireNonNull(memoryFuture, "memoryFuture is null");
            this.allocationListener = Objects.requireNonNull(allocationListener, "allocationListener is null");
            this.closeable = closeable;
        }

        public long getBytes() {
            return this.delegate.getBytes();
        }

        public ListenableFuture<Void> setBytes(long bytes) {
            if (bytes == this.delegate.getBytes()) {
                return Operator.NOT_BLOCKED;
            }
            ListenableFuture blocked = this.delegate.setBytes(bytes);
            OperatorContext.updateMemoryFuture((ListenableFuture<Void>)blocked, this.memoryFuture);
            this.allocationListener.run();
            return blocked;
        }

        public boolean trySetBytes(long bytes) {
            if (this.delegate.trySetBytes(bytes)) {
                this.allocationListener.run();
                return true;
            }
            return false;
        }

        public void close() {
            if (!this.closeable) {
                throw new UnsupportedOperationException("Called close on unclosable local memory context");
            }
            this.delegate.close();
            this.allocationListener.run();
        }
    }

    private static class InternalAggregatedMemoryContext
    implements AggregatedMemoryContext {
        private final AggregatedMemoryContext delegate;
        private final AtomicReference<SettableFuture<Void>> memoryFuture;
        private final Runnable allocationListener;
        private final boolean closeable;

        InternalAggregatedMemoryContext(AggregatedMemoryContext delegate, AtomicReference<SettableFuture<Void>> memoryFuture, Runnable allocationListener, boolean closeable) {
            this.delegate = Objects.requireNonNull(delegate, "delegate is null");
            this.memoryFuture = Objects.requireNonNull(memoryFuture, "memoryFuture is null");
            this.allocationListener = Objects.requireNonNull(allocationListener, "allocationListener is null");
            this.closeable = closeable;
        }

        public AggregatedMemoryContext newAggregatedMemoryContext() {
            return new InternalAggregatedMemoryContext(this.delegate.newAggregatedMemoryContext(), this.memoryFuture, this.allocationListener, true);
        }

        public LocalMemoryContext newLocalMemoryContext(String allocationTag) {
            return new InternalLocalMemoryContext(this.delegate.newLocalMemoryContext(allocationTag), this.memoryFuture, this.allocationListener, true);
        }

        public long getBytes() {
            return this.delegate.getBytes();
        }

        public void close() {
            if (!this.closeable) {
                throw new UnsupportedOperationException("Called close on unclosable aggregated memory context");
            }
            this.delegate.close();
        }
    }
}

