/*
 * Decompiled with CFR 0.152.
 */
package io.trino.exchange;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.slice.Slice;
import io.opentelemetry.api.trace.Span;
import io.trino.exchange.DirectExchangeDataSource;
import io.trino.exchange.DirectExchangeInput;
import io.trino.exchange.ExchangeDataSource;
import io.trino.exchange.ExchangeInput;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.exchange.SpoolingExchangeDataSource;
import io.trino.exchange.SpoolingExchangeInput;
import io.trino.execution.TaskFailureListener;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.operator.DirectExchangeClient;
import io.trino.operator.DirectExchangeClientSupplier;
import io.trino.operator.OperatorInfo;
import io.trino.operator.RetryPolicy;
import io.trino.spi.QueryId;
import io.trino.spi.exchange.ExchangeId;
import io.trino.spi.exchange.ExchangeManager;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class LazyExchangeDataSource
implements ExchangeDataSource {
    private final QueryId queryId;
    private final Span querySpan;
    private final ExchangeId exchangeId;
    private final DirectExchangeClientSupplier directExchangeClientSupplier;
    private final LocalMemoryContext systemMemoryContext;
    private final TaskFailureListener taskFailureListener;
    private final RetryPolicy retryPolicy;
    private final ExchangeManagerRegistry exchangeManagerRegistry;
    private final SettableFuture<Void> initializationFuture = SettableFuture.create();
    private final AtomicReference<ExchangeDataSource> delegate = new AtomicReference();
    private final AtomicBoolean closed = new AtomicBoolean();

    public LazyExchangeDataSource(QueryId queryId, ExchangeId exchangeId, Span querySpan, DirectExchangeClientSupplier directExchangeClientSupplier, LocalMemoryContext systemMemoryContext, TaskFailureListener taskFailureListener, RetryPolicy retryPolicy, ExchangeManagerRegistry exchangeManagerRegistry) {
        this.queryId = Objects.requireNonNull(queryId, "queryId is null");
        this.exchangeId = Objects.requireNonNull(exchangeId, "exchangeId is null");
        this.querySpan = Objects.requireNonNull(querySpan, "querySpan is null");
        this.directExchangeClientSupplier = Objects.requireNonNull(directExchangeClientSupplier, "directExchangeClientSupplier is null");
        this.systemMemoryContext = Objects.requireNonNull(systemMemoryContext, "systemMemoryContext is null");
        this.taskFailureListener = Objects.requireNonNull(taskFailureListener, "taskFailureListener is null");
        this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy is null");
        this.exchangeManagerRegistry = Objects.requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null");
    }

    @Override
    public Slice pollPage() {
        ExchangeDataSource dataSource = this.delegate.get();
        if (dataSource == null) {
            return null;
        }
        return dataSource.pollPage();
    }

    @Override
    public boolean isFinished() {
        if (this.closed.get()) {
            return true;
        }
        ExchangeDataSource dataSource = this.delegate.get();
        if (dataSource == null) {
            return false;
        }
        return dataSource.isFinished();
    }

    @Override
    public ListenableFuture<Void> isBlocked() {
        if (this.closed.get()) {
            return Futures.immediateVoidFuture();
        }
        if (!this.initializationFuture.isDone()) {
            return Futures.nonCancellationPropagating(this.initializationFuture);
        }
        ExchangeDataSource dataSource = this.delegate.get();
        if (dataSource == null) {
            return Futures.immediateVoidFuture();
        }
        return dataSource.isBlocked();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addInput(ExchangeInput input) {
        boolean initialized = false;
        LazyExchangeDataSource lazyExchangeDataSource = this;
        synchronized (lazyExchangeDataSource) {
            if (this.closed.get()) {
                return;
            }
            ExchangeDataSource dataSource = this.delegate.get();
            if (dataSource == null) {
                if (input instanceof DirectExchangeInput) {
                    DirectExchangeClient client = this.directExchangeClientSupplier.get(this.queryId, this.exchangeId, this.querySpan, this.systemMemoryContext, this.taskFailureListener, this.retryPolicy);
                    dataSource = new DirectExchangeDataSource(client);
                } else if (input instanceof SpoolingExchangeInput) {
                    ExchangeManager exchangeManager = this.exchangeManagerRegistry.getExchangeManager();
                    dataSource = new SpoolingExchangeDataSource(exchangeManager.createSource(), this.systemMemoryContext);
                } else {
                    throw new IllegalArgumentException("Unexpected input: " + String.valueOf(input));
                }
                this.delegate.set(dataSource);
                initialized = true;
            }
            dataSource.addInput(input);
        }
        if (initialized) {
            this.initializationFuture.set(null);
        }
    }

    @Override
    public synchronized void noMoreInputs() {
        if (this.closed.get()) {
            return;
        }
        ExchangeDataSource dataSource = this.delegate.get();
        if (dataSource != null) {
            dataSource.noMoreInputs();
        } else {
            this.close();
        }
    }

    @Override
    public OperatorInfo getInfo() {
        ExchangeDataSource dataSource = this.delegate.get();
        if (dataSource == null) {
            return null;
        }
        return dataSource.getInfo();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        LazyExchangeDataSource lazyExchangeDataSource = this;
        synchronized (lazyExchangeDataSource) {
            if (!this.closed.compareAndSet(false, true)) {
                return;
            }
            ExchangeDataSource dataSource = this.delegate.get();
            if (dataSource != null) {
                dataSource.close();
            }
        }
        this.initializationFuture.set(null);
    }
}

