/*
 * Decompiled with CFR 0.152.
 */
package io.trino.testing;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import io.airlift.concurrent.MoreFutures;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.spi.QueryId;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeContext;
import io.trino.spi.exchange.ExchangeId;
import io.trino.spi.exchange.ExchangeManager;
import io.trino.spi.exchange.ExchangeSink;
import io.trino.spi.exchange.ExchangeSinkHandle;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import io.trino.spi.exchange.ExchangeSource;
import io.trino.spi.exchange.ExchangeSourceHandle;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Function;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public abstract class AbstractTestExchangeManager {
    private ExchangeManager exchangeManager;

    @BeforeClass
    public void init() throws Exception {
        this.exchangeManager = this.createExchangeManager();
    }

    @AfterClass(alwaysRun=true)
    public void destroy() throws Exception {
        if (this.exchangeManager != null) {
            this.exchangeManager = null;
        }
    }

    protected abstract ExchangeManager createExchangeManager();

    @Test
    public void testHappyPath() throws Exception {
        Exchange exchange = this.exchangeManager.createExchange(new ExchangeContext(new QueryId("query"), ExchangeId.createRandomExchangeId()), 2);
        ExchangeSinkHandle sinkHandle0 = exchange.addSink(0);
        ExchangeSinkHandle sinkHandle1 = exchange.addSink(1);
        ExchangeSinkHandle sinkHandle2 = exchange.addSink(2);
        exchange.noMoreSinks();
        ExchangeSinkInstanceHandle sinkInstanceHandle = exchange.instantiateSink(sinkHandle0, 0);
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)ImmutableListMultimap.of((Object)0, (Object)"0-0-0", (Object)1, (Object)"0-1-0", (Object)0, (Object)"0-0-1", (Object)1, (Object)"0-1-1"), true);
        exchange.sinkFinished(sinkInstanceHandle);
        sinkInstanceHandle = exchange.instantiateSink(sinkHandle0, 1);
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)ImmutableListMultimap.of((Object)0, (Object)"0-0-0", (Object)1, (Object)"0-1-0", (Object)0, (Object)"0-0-1", (Object)1, (Object)"0-1-1"), true);
        exchange.sinkFinished(sinkInstanceHandle);
        sinkInstanceHandle = exchange.instantiateSink(sinkHandle0, 2);
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)ImmutableListMultimap.of((Object)0, (Object)"failed", (Object)1, (Object)"another failed"), false);
        exchange.sinkFinished(sinkInstanceHandle);
        sinkInstanceHandle = exchange.instantiateSink(sinkHandle1, 0);
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)ImmutableListMultimap.of((Object)0, (Object)"1-0-0", (Object)1, (Object)"1-1-0", (Object)0, (Object)"1-0-1", (Object)1, (Object)"1-1-1"), true);
        exchange.sinkFinished(sinkInstanceHandle);
        sinkInstanceHandle = exchange.instantiateSink(sinkHandle1, 1);
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)ImmutableListMultimap.of((Object)0, (Object)"1-0-0", (Object)1, (Object)"1-1-0", (Object)0, (Object)"1-0-1", (Object)1, (Object)"1-1-1"), true);
        exchange.sinkFinished(sinkInstanceHandle);
        sinkInstanceHandle = exchange.instantiateSink(sinkHandle1, 2);
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)ImmutableListMultimap.of((Object)0, (Object)"more failed", (Object)1, (Object)"another failed"), false);
        exchange.sinkFinished(sinkInstanceHandle);
        sinkInstanceHandle = exchange.instantiateSink(sinkHandle2, 2);
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)ImmutableListMultimap.of((Object)0, (Object)"2-0-0", (Object)1, (Object)"2-1-0"), true);
        exchange.sinkFinished(sinkInstanceHandle);
        CompletableFuture inputPartitionHandlesFuture = exchange.getSourceHandles();
        Assert.assertTrue((boolean)inputPartitionHandlesFuture.isDone());
        List partitionHandles = (List)inputPartitionHandlesFuture.get();
        Assertions.assertThat((List)partitionHandles).hasSize(2);
        Map partitions = (Map)partitionHandles.stream().collect(ImmutableMap.toImmutableMap(ExchangeSourceHandle::getPartitionId, Function.identity()));
        Assertions.assertThat(this.readData((ExchangeSourceHandle)partitions.get(0))).containsExactlyInAnyOrder((Object[])new String[]{"0-0-0", "0-0-1", "1-0-0", "1-0-1", "2-0-0"});
        Assertions.assertThat(this.readData((ExchangeSourceHandle)partitions.get(1))).containsExactlyInAnyOrder((Object[])new String[]{"0-1-0", "0-1-1", "1-1-0", "1-1-1", "2-1-0"});
        exchange.close();
    }

    private void writeData(ExchangeSinkInstanceHandle handle, Multimap<Integer, String> data, boolean finish) {
        ExchangeSink sink = this.exchangeManager.createSink(handle, false);
        data.forEach((key, value) -> sink.add(key.intValue(), Slices.utf8Slice((String)value)));
        if (finish) {
            MoreFutures.getFutureValue((Future)sink.finish());
        } else {
            MoreFutures.getFutureValue((Future)sink.abort());
        }
    }

    private List<String> readData(ExchangeSourceHandle handle) {
        return this.readData((List<ExchangeSourceHandle>)ImmutableList.of((Object)handle));
    }

    private List<String> readData(List<ExchangeSourceHandle> handles) {
        ImmutableList.Builder result = ImmutableList.builder();
        try (ExchangeSource source = this.exchangeManager.createSource(handles);){
            while (!source.isFinished()) {
                Slice data = source.read();
                if (data == null) continue;
                result.add((Object)data.toStringUtf8());
            }
        }
        return result.build();
    }
}

