/*
 * Decompiled with CFR 0.152.
 */
package io.trino.exchange;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.trino.exchange.ExchangeDataSource;
import io.trino.exchange.ExchangeInput;
import io.trino.exchange.SpoolingExchangeInput;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.operator.OperatorInfo;
import io.trino.spi.exchange.ExchangeSource;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

public class SpoolingExchangeDataSource
implements ExchangeDataSource {
    private static final Logger log = Logger.get(SpoolingExchangeDataSource.class);
    private ExchangeSource exchangeSource;
    private final LocalMemoryContext systemMemoryContext;
    private volatile boolean closed;

    public SpoolingExchangeDataSource(ExchangeSource exchangeSource, LocalMemoryContext systemMemoryContext) {
        this.exchangeSource = Objects.requireNonNull(exchangeSource, "exchangeSource is null");
        this.systemMemoryContext = Objects.requireNonNull(systemMemoryContext, "systemMemoryContext is null");
    }

    @Override
    public Slice pollPage() {
        ExchangeSource exchangeSource = this.exchangeSource;
        if (exchangeSource == null) {
            return null;
        }
        Slice data = exchangeSource.read();
        this.systemMemoryContext.setBytes(exchangeSource.getMemoryUsage());
        if (this.closed) {
            this.systemMemoryContext.setBytes(0L);
        }
        return data;
    }

    @Override
    public boolean isFinished() {
        ExchangeSource exchangeSource = this.exchangeSource;
        if (exchangeSource == null) {
            return true;
        }
        return exchangeSource.isFinished();
    }

    @Override
    public ListenableFuture<Void> isBlocked() {
        ExchangeSource exchangeSource = this.exchangeSource;
        if (exchangeSource == null) {
            return Futures.immediateVoidFuture();
        }
        return MoreFutures.toListenableFuture((CompletableFuture)exchangeSource.isBlocked());
    }

    @Override
    public void addInput(ExchangeInput input) {
        SpoolingExchangeInput spoolingExchangeInput = (SpoolingExchangeInput)input;
        ExchangeSource exchangeSource = this.exchangeSource;
        if (exchangeSource == null) {
            return;
        }
        spoolingExchangeInput.getOutputSelector().ifPresent(arg_0 -> ((ExchangeSource)exchangeSource).setOutputSelector(arg_0));
        exchangeSource.addSourceHandles(spoolingExchangeInput.getExchangeSourceHandles());
    }

    @Override
    public void noMoreInputs() {
        ExchangeSource exchangeSource = this.exchangeSource;
        if (exchangeSource == null) {
            return;
        }
        exchangeSource.noMoreSourceHandles();
    }

    @Override
    public OperatorInfo getInfo() {
        return null;
    }

    @Override
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        try {
            this.exchangeSource.close();
        }
        catch (RuntimeException e) {
            log.warn((Throwable)e, "error closing exchange source");
        }
        finally {
            this.exchangeSource = null;
            this.systemMemoryContext.setBytes(0L);
        }
    }
}

