/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.flight.example;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.ActionType;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.example.ExampleTicket;
import org.apache.arrow.flight.example.FlightHolder;
import org.apache.arrow.flight.example.Stream;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;

public class InMemoryStore
implements FlightProducer,
AutoCloseable {
    private final ConcurrentMap<FlightDescriptor, FlightHolder> holders = new ConcurrentHashMap<FlightDescriptor, FlightHolder>();
    private final BufferAllocator allocator;
    private final Location location;

    public InMemoryStore(BufferAllocator allocator, Location location) {
        this.allocator = allocator;
        this.location = location;
    }

    @Override
    public void getStream(FlightProducer.CallContext context, Ticket ticket, FlightProducer.ServerStreamListener listener) {
        this.getStream(ticket).sendTo(this.allocator, listener);
    }

    public Stream getStream(Ticket t) {
        ExampleTicket example = ExampleTicket.from(t);
        FlightDescriptor d = FlightDescriptor.path(example.getPath());
        FlightHolder h = (FlightHolder)this.holders.get(d);
        if (h == null) {
            throw new IllegalStateException("Unknown ticket.");
        }
        return h.getStream(example);
    }

    @Override
    public void listFlights(FlightProducer.CallContext context, Criteria criteria, FlightProducer.StreamListener<FlightInfo> listener) {
        try {
            for (FlightHolder h : this.holders.values()) {
                listener.onNext(h.getFlightInfo(this.location));
            }
            listener.onCompleted();
        }
        catch (Exception ex) {
            listener.onError(ex);
        }
    }

    @Override
    public FlightInfo getFlightInfo(FlightProducer.CallContext context, FlightDescriptor descriptor) {
        FlightHolder h = (FlightHolder)this.holders.get(descriptor);
        if (h == null) {
            throw new IllegalStateException("Unknown descriptor.");
        }
        return h.getFlightInfo(this.location);
    }

    @Override
    public Runnable acceptPut(FlightProducer.CallContext context, FlightStream flightStream, FlightProducer.StreamListener<PutResult> ackStream) {
        return () -> {
            Stream.StreamCreator creator = null;
            boolean success = false;
            try (VectorSchemaRoot root = flightStream.getRoot();){
                FlightHolder h = this.holders.computeIfAbsent(flightStream.getDescriptor(), t -> new FlightHolder(this.allocator, (FlightDescriptor)t, flightStream.getSchema(), flightStream.getDictionaryProvider()));
                creator = h.addStream(flightStream.getSchema());
                VectorUnloader unloader = new VectorUnloader(root);
                while (flightStream.next()) {
                    ackStream.onNext(PutResult.metadata(flightStream.getLatestMetadata()));
                    creator.add(unloader.getRecordBatch());
                }
                creator.complete();
                success = true;
            }
            finally {
                if (!success) {
                    creator.drop();
                }
            }
        };
    }

    @Override
    public void doAction(FlightProducer.CallContext context, Action action, FlightProducer.StreamListener<Result> listener) {
        switch (action.getType()) {
            case "drop": {
                listener.onNext(new Result(new byte[0]));
                listener.onCompleted();
                break;
            }
            default: {
                listener.onError(new UnsupportedOperationException());
            }
        }
    }

    @Override
    public void listActions(FlightProducer.CallContext context, FlightProducer.StreamListener<ActionType> listener) {
        listener.onNext(new ActionType("get", "pull a stream. Action must be done via standard get mechanism"));
        listener.onNext(new ActionType("put", "push a stream. Action must be done via standard put mechanism"));
        listener.onNext(new ActionType("drop", "delete a flight. Action body is a JSON encoded path."));
        listener.onCompleted();
    }

    @Override
    public void close() throws Exception {
        AutoCloseables.close(this.holders.values());
        this.holders.clear();
    }
}

