/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.flight.integration.tests;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.NoOpFlightProducer;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.integration.tests.IntegrationAssertions;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.DictionaryUtility;

public class IntegrationProducer
extends NoOpFlightProducer
implements AutoCloseable {
    private final ConcurrentMap<FlightDescriptor, Dataset> datasets = new ConcurrentHashMap<FlightDescriptor, Dataset>();
    private final BufferAllocator allocator;
    private Location location;

    public IntegrationProducer(BufferAllocator allocator, Location location) {
        this.allocator = allocator;
        this.location = location;
    }

    public void setLocation(Location location) {
        this.location = location;
    }

    public void getStream(FlightProducer.CallContext context, Ticket ticket, FlightProducer.ServerStreamListener listener) {
        try {
            FlightDescriptor descriptor = FlightDescriptor.deserialize((ByteBuffer)ByteBuffer.wrap(ticket.getBytes()));
            Dataset dataset = (Dataset)this.datasets.get(descriptor);
            if (dataset == null) {
                listener.error((Throwable)CallStatus.NOT_FOUND.withDescription("Unknown ticket: " + descriptor).toRuntimeException());
                return;
            }
            dataset.streamTo(this.allocator, listener);
        }
        catch (Exception ex) {
            listener.error((Throwable)IntegrationAssertions.toFlightRuntimeException(ex));
        }
    }

    public FlightInfo getFlightInfo(FlightProducer.CallContext context, FlightDescriptor descriptor) {
        Dataset h = (Dataset)this.datasets.get(descriptor);
        if (h == null) {
            throw CallStatus.NOT_FOUND.withDescription("Unknown descriptor: " + descriptor).toRuntimeException();
        }
        return h.getFlightInfo(this.location);
    }

    public Runnable acceptPut(FlightProducer.CallContext context, FlightStream flightStream, FlightProducer.StreamListener<PutResult> ackStream) {
        return () -> {
            ArrayList<ArrowRecordBatch> batches = new ArrayList<ArrowRecordBatch>();
            try {
                try (VectorSchemaRoot root = flightStream.getRoot();){
                    VectorUnloader unloader = new VectorUnloader(root);
                    while (flightStream.next()) {
                        ackStream.onNext((Object)PutResult.metadata((ArrowBuf)flightStream.getLatestMetadata()));
                        batches.add(unloader.getRecordBatch());
                    }
                    Dataset dataset = new Dataset(flightStream.getDescriptor(), flightStream.getSchema(), flightStream.takeDictionaryOwnership(), batches);
                    batches.clear();
                    this.datasets.put(flightStream.getDescriptor(), dataset);
                }
                finally {
                    AutoCloseables.close(batches);
                }
            }
            catch (Exception ex) {
                ackStream.onError((Throwable)IntegrationAssertions.toFlightRuntimeException(ex));
            }
        };
    }

    @Override
    public void close() throws Exception {
        AutoCloseables.close(this.datasets.values());
        this.datasets.clear();
    }

    private static final class Dataset
    implements AutoCloseable {
        private final FlightDescriptor descriptor;
        private final Schema schema;
        private final DictionaryProvider dictionaryProvider;
        private final List<ArrowRecordBatch> batches;

        private Dataset(FlightDescriptor descriptor, Schema schema, DictionaryProvider dictionaryProvider, List<ArrowRecordBatch> batches) {
            this.descriptor = descriptor;
            this.schema = schema;
            this.dictionaryProvider = dictionaryProvider;
            this.batches = new ArrayList<ArrowRecordBatch>(batches);
        }

        public FlightInfo getFlightInfo(Location location) {
            ByteBuffer serializedDescriptor = this.descriptor.serialize();
            byte[] descriptorBytes = new byte[serializedDescriptor.remaining()];
            serializedDescriptor.get(descriptorBytes);
            List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(new Ticket(descriptorBytes), new Location[]{location}));
            return new FlightInfo(this.messageFormatSchema(), this.descriptor, endpoints, this.batches.stream().mapToLong(ArrowRecordBatch::computeBodyLength).sum(), (long)this.batches.stream().mapToInt(ArrowRecordBatch::getLength).sum());
        }

        private Schema messageFormatSchema() {
            HashSet dictionaryIdsUsed = new HashSet();
            List messageFormatFields = this.schema.getFields().stream().map(f -> DictionaryUtility.toMessageFormat((Field)f, (DictionaryProvider)this.dictionaryProvider, (Set)dictionaryIdsUsed)).collect(Collectors.toList());
            return new Schema(messageFormatFields, this.schema.getCustomMetadata());
        }

        @Override
        public void close() throws Exception {
            AutoCloseables.close(this.batches);
        }

        public void streamTo(BufferAllocator allocator, FlightProducer.ServerStreamListener listener) {
            try (VectorSchemaRoot root = VectorSchemaRoot.create((Schema)this.schema, (BufferAllocator)allocator);){
                listener.start(root, this.dictionaryProvider);
                VectorLoader loader = new VectorLoader(root);
                int counter = 0;
                for (ArrowRecordBatch batch : this.batches) {
                    byte[] rawMetadata = Integer.toString(counter).getBytes(StandardCharsets.UTF_8);
                    ArrowBuf metadata = allocator.buffer((long)rawMetadata.length);
                    metadata.writeBytes(rawMetadata);
                    loader.load(batch);
                    listener.putNext(metadata);
                    ++counter;
                }
                listener.completed();
            }
        }
    }
}

