/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.vector.ipc;

import java.io.IOException;
import java.nio.channels.Pipe;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.ipc.MessageSerializerTest;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.Assert;
import org.junit.Test;

public class TestArrowStreamPipe {
    Schema schema = MessageSerializerTest.testSchema();
    BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);

    @Test
    public void pipeTest() throws IOException, InterruptedException {
        int NUM_BATCHES = 10;
        Pipe pipe = Pipe.open();
        WriterThread writer = new WriterThread(10, pipe.sink());
        ReaderThread reader = new ReaderThread(pipe.source());
        writer.start();
        reader.start();
        reader.join();
        writer.join();
        Assert.assertEquals((long)10L, (long)reader.getBatchesRead());
        Assert.assertEquals((long)writer.bytesWritten(), (long)reader.bytesRead());
    }

    private final class ReaderThread
    extends Thread {
        private int batchesRead = 0;
        private final ArrowStreamReader reader;
        private final BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
        private boolean done = false;

        public ReaderThread(ReadableByteChannel sourceChannel) throws IOException {
            this.reader = new ArrowStreamReader(sourceChannel, this.alloc){

                public boolean loadNextBatch() throws IOException {
                    if (!super.loadNextBatch()) {
                        ReaderThread.this.done = true;
                        return false;
                    }
                    ReaderThread.this.batchesRead++;
                    VectorSchemaRoot root = this.getVectorSchemaRoot();
                    Assert.assertEquals((long)16L, (long)root.getRowCount());
                    TinyIntVector vector = (TinyIntVector)root.getFieldVectors().get(0);
                    Assert.assertEquals((long)((byte)(ReaderThread.this.batchesRead - 1)), (long)vector.get(0));
                    for (int i = 1; i < 16; ++i) {
                        if (i < 8) {
                            Assert.assertEquals((long)((byte)(i + 1)), (long)vector.get(i));
                            continue;
                        }
                        Assert.assertTrue((boolean)vector.isNull(i));
                    }
                    return true;
                }
            };
        }

        @Override
        public void run() {
            try {
                Assert.assertEquals((Object)TestArrowStreamPipe.this.schema, (Object)this.reader.getVectorSchemaRoot().getSchema());
                while (!this.done) {
                    Assert.assertTrue((this.reader.loadNextBatch() != this.done ? 1 : 0) != 0);
                }
                this.reader.close();
            }
            catch (IOException e) {
                e.printStackTrace();
                Assert.fail((String)e.toString());
            }
        }

        public int getBatchesRead() {
            return this.batchesRead;
        }

        public long bytesRead() {
            return this.reader.bytesRead();
        }
    }

    private final class WriterThread
    extends Thread {
        private final int numBatches;
        private final ArrowStreamWriter writer;
        private final VectorSchemaRoot root;

        public WriterThread(int numBatches, WritableByteChannel sinkChannel) throws IOException {
            this.numBatches = numBatches;
            BufferAllocator allocator = TestArrowStreamPipe.this.alloc.newChildAllocator("writer thread", 0L, Integer.MAX_VALUE);
            this.root = VectorSchemaRoot.create((Schema)TestArrowStreamPipe.this.schema, (BufferAllocator)allocator);
            this.writer = new ArrowStreamWriter(this.root, null, sinkChannel);
        }

        @Override
        public void run() {
            try {
                this.writer.start();
                for (int j = 0; j < this.numBatches; ++j) {
                    ((FieldVector)this.root.getFieldVectors().get(0)).allocateNew();
                    TinyIntVector vector = (TinyIntVector)this.root.getFieldVectors().get(0);
                    vector.set(0, j);
                    for (int i = 1; i < 16; ++i) {
                        vector.set(i, i < 8 ? 1 : 0, (byte)(i + 1));
                    }
                    vector.setValueCount(16);
                    this.root.setRowCount(16);
                    this.writer.writeBatch();
                }
                this.writer.close();
                this.root.close();
            }
            catch (IOException e) {
                e.printStackTrace();
                Assert.fail((String)e.toString());
            }
        }

        public long bytesWritten() {
            return this.writer.bytesWritten();
        }
    }
}

