/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.flight;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.arrow.flight.ArrowMessage;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;

public class FlightStream {
    private final Object DONE = new Object();
    private final Object DONE_EX = new Object();
    private final BufferAllocator allocator;
    private final Cancellable cancellable;
    private final LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue();
    private final SettableFuture<VectorSchemaRoot> root = SettableFuture.create();
    private final int pendingTarget;
    private final Requestor requestor;
    private volatile int pending = 1;
    private boolean completed = false;
    private volatile VectorSchemaRoot fulfilledRoot;
    private volatile VectorLoader loader;
    private volatile Throwable ex;
    private volatile FlightDescriptor descriptor;
    private volatile Schema schema;

    public FlightStream(BufferAllocator allocator, int pendingTarget, Cancellable cancellable, Requestor requestor) {
        this.allocator = allocator;
        this.pendingTarget = pendingTarget;
        this.cancellable = cancellable;
        this.requestor = requestor;
    }

    public Schema getSchema() {
        return this.schema;
    }

    public FlightDescriptor getDescriptor() {
        return this.descriptor;
    }

    public void close() throws Exception {
        if (!this.completed && this.cancellable != null) {
            this.cancel("Stream closed before end.", null);
        }
        List closeables = ImmutableList.copyOf((Object[])this.queue.toArray()).stream().filter(t -> AutoCloseable.class.isAssignableFrom(t.getClass())).map(t -> (AutoCloseable)t).collect(Collectors.toList());
        AutoCloseables.close((Iterable)Iterables.concat(closeables, (Iterable)ImmutableList.of((Object)this.root.get())));
    }

    public boolean next() {
        try {
            ((VectorSchemaRoot)this.root.get()).clear();
            if (this.completed && this.queue.isEmpty()) {
                return false;
            }
            --this.pending;
            this.requestOutstanding();
            Object data = this.queue.take();
            if (this.DONE == data) {
                this.queue.put(this.DONE);
                this.completed = true;
                return false;
            }
            if (this.DONE_EX == data) {
                this.queue.put(this.DONE_EX);
                if (this.ex instanceof Exception) {
                    throw (Exception)this.ex;
                }
                throw new Exception(this.ex);
            }
            ArrowMessage msg = (ArrowMessage)data;
            try (ArrowRecordBatch arb = msg.asRecordBatch();){
                this.loader.load(arb);
            }
            return true;
        }
        catch (Exception e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    public VectorSchemaRoot getRoot() {
        try {
            return (VectorSchemaRoot)this.root.get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    private synchronized void requestOutstanding() {
        if (this.pending < this.pendingTarget) {
            this.requestor.request(this.pendingTarget - this.pending);
            this.pending = this.pendingTarget;
        }
    }

    public void cancel(String message, Throwable exception) {
        if (this.cancellable == null) {
            throw new UnsupportedOperationException("Streams cannot be cancelled that are produced by client. Instead, server should reject incoming messages.");
        }
        this.cancellable.cancel(message, exception);
    }

    StreamObserver<ArrowMessage> asObserver() {
        return new Observer();
    }

    public static interface Requestor {
        public void request(int var1);
    }

    public static interface Cancellable {
        public void cancel(String var1, Throwable var2);
    }

    private class Observer
    implements StreamObserver<ArrowMessage> {
        public void onNext(ArrowMessage msg) {
            FlightStream.this.requestOutstanding();
            switch (msg.getMessageType()) {
                case SCHEMA: {
                    FlightStream.this.schema = msg.asSchema();
                    FlightStream.this.fulfilledRoot = VectorSchemaRoot.create((Schema)FlightStream.this.schema, (BufferAllocator)FlightStream.this.allocator);
                    FlightStream.this.loader = new VectorLoader(FlightStream.this.fulfilledRoot);
                    FlightStream.this.descriptor = msg.getDescriptor() != null ? new FlightDescriptor(msg.getDescriptor()) : null;
                    FlightStream.this.root.set((Object)FlightStream.this.fulfilledRoot);
                    break;
                }
                case RECORD_BATCH: {
                    FlightStream.this.queue.add(msg);
                    break;
                }
                default: {
                    FlightStream.this.queue.add(FlightStream.this.DONE_EX);
                    FlightStream.this.ex = new UnsupportedOperationException("Unable to handle message of type." + msg);
                }
            }
        }

        public void onError(Throwable t) {
            FlightStream.this.ex = t;
            FlightStream.this.queue.add(FlightStream.this.DONE_EX);
            FlightStream.this.root.setException(t);
        }

        public void onCompleted() {
            FlightStream.this.queue.add(FlightStream.this.DONE);
        }
    }
}

