/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.flight.example;

import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.ActionType;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.example.ExampleTicket;
import org.apache.arrow.flight.example.FlightHolder;
import org.apache.arrow.flight.example.Stream;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.types.pojo.Schema;

public class InMemoryStore
implements FlightProducer,
AutoCloseable {
    private final ConcurrentMap<FlightDescriptor, FlightHolder> holders = new ConcurrentHashMap<FlightDescriptor, FlightHolder>();
    private final BufferAllocator allocator;
    private final Location location;

    public InMemoryStore(BufferAllocator allocator, Location location) {
        this.allocator = allocator;
        this.location = location;
    }

    @Override
    public void getStream(Ticket ticket, FlightProducer.ServerStreamListener listener) {
        this.getStream(ticket).sendTo(this.allocator, listener);
    }

    public Stream getStream(Ticket t) {
        ExampleTicket example = ExampleTicket.from(t);
        FlightDescriptor d = FlightDescriptor.path(example.getPath());
        FlightHolder h = (FlightHolder)this.holders.get(d);
        if (h == null) {
            throw new IllegalStateException("Unknown ticket.");
        }
        return h.getStream(example);
    }

    public Stream.StreamCreator putStream(FlightDescriptor descriptor, Schema schema) {
        FlightHolder h = this.holders.computeIfAbsent(descriptor, t -> new FlightHolder(this.allocator, (FlightDescriptor)t, schema));
        return h.addStream(schema);
    }

    @Override
    public void listFlights(Criteria criteria, FlightProducer.StreamListener<FlightInfo> listener) {
        try {
            for (FlightHolder h : this.holders.values()) {
                listener.onNext(h.getFlightInfo(this.location));
            }
            listener.onCompleted();
        }
        catch (Exception ex) {
            listener.onError(ex);
        }
    }

    @Override
    public FlightInfo getFlightInfo(FlightDescriptor descriptor) {
        FlightHolder h = (FlightHolder)this.holders.get(descriptor);
        if (h == null) {
            throw new IllegalStateException("Unknown descriptor.");
        }
        return h.getFlightInfo(this.location);
    }

    @Override
    public Callable<Flight.PutResult> acceptPut(FlightStream flightStream) {
        return () -> {
            Stream.StreamCreator creator = null;
            boolean success = false;
            try {
                try (VectorSchemaRoot root = flightStream.getRoot();){
                    FlightHolder h = this.holders.computeIfAbsent(flightStream.getDescriptor(), t -> new FlightHolder(this.allocator, (FlightDescriptor)t, flightStream.getSchema()));
                    creator = h.addStream(flightStream.getSchema());
                    VectorUnloader unloader = new VectorUnloader(root);
                    while (flightStream.next()) {
                        creator.add(unloader.getRecordBatch());
                    }
                    creator.complete();
                    success = true;
                    Flight.PutResult putResult = Flight.PutResult.getDefaultInstance();
                    return putResult;
                }
                {
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                }
            }
            finally {
                if (!success) {
                    creator.drop();
                }
            }
        };
    }

    @Override
    public Result doAction(Action action) {
        switch (action.getType()) {
            case "drop": {
                return new Result(new byte[0]);
            }
        }
        throw new UnsupportedOperationException();
    }

    @Override
    public void listActions(FlightProducer.StreamListener<ActionType> listener) {
        listener.onNext(new ActionType("get", "pull a stream. Action must be done via standard get mechanism"));
        listener.onNext(new ActionType("put", "push a stream. Action must be done via standard get mechanism"));
        listener.onNext(new ActionType("drop", "delete a flight. Action body is a JSON encoded path."));
    }

    @Override
    public void close() throws Exception {
        AutoCloseables.close(this.holders.values());
        this.holders.clear();
    }
}

