/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.feed.perf;

import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.config.subscription.ConfigSubscriber;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentId;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.DocumentTypeManagerConfigurer;
import com.yahoo.document.DocumentUpdate;
import com.yahoo.document.TestAndSetCondition;
import com.yahoo.document.json.JsonFeedReader;
import com.yahoo.document.json.JsonWriter;
import com.yahoo.document.serialization.DocumentDeserializer;
import com.yahoo.document.serialization.DocumentDeserializerFactory;
import com.yahoo.document.serialization.DocumentReader;
import com.yahoo.document.serialization.DocumentSerializer;
import com.yahoo.document.serialization.DocumentSerializerFactory;
import com.yahoo.document.serialization.DocumentUpdateReader;
import com.yahoo.document.serialization.DocumentWriter;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
import com.yahoo.io.GrowableByteBuffer;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.MessageBusParams;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.RPCMessageBus;
import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.SourceSession;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.ThrottlePolicy;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.vespa.feed.perf.FeederParams;
import com.yahoo.vespa.objects.Deserializer;
import com.yahoo.vespaxmlparser.ConditionalFeedOperation;
import com.yahoo.vespaxmlparser.FeedOperation;
import com.yahoo.vespaxmlparser.FeedReader;
import com.yahoo.vespaxmlparser.RemoveFeedOperation;
import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;
import net.jpountz.xxhash.XXHashFactory;

public class SimpleFeeder
implements ReplyHandler {
    private final DocumentTypeManager docTypeMgr = new DocumentTypeManager();
    private final ConfigSubscriber documentTypeConfigSubscriber;
    private final List<InputStream> inputStreams;
    private final PrintStream out;
    private final RPCMessageBus mbus;
    private final SourceSession session;
    private final int numThreads;
    private final long numMessagesToSend;
    private final Destination destination;
    private final boolean benchmarkMode;
    private static final long REPORT_INTERVAL = TimeUnit.SECONDS.toMillis(10L);
    private final long startTime = System.currentTimeMillis();
    private final AtomicReference<Throwable> failure = new AtomicReference<Object>(null);
    private final AtomicLong numReplies = new AtomicLong(0L);
    private long maxLatency = Long.MIN_VALUE;
    private long minLatency = Long.MAX_VALUE;
    private long nextReport = this.startTime + REPORT_INTERVAL;
    private long sumLatency = 0L;
    private static final int NONE = 0;
    private static final int DOCUMENT = 1;
    private static final int UPDATE = 2;
    private static final int REMOVE = 3;

    public static void main(String[] args) throws Throwable {
        Logger.getLogger("").setLevel(Level.WARNING);
        new SimpleFeeder(new FeederParams().parseArgs(args)).run().close();
    }

    private static int readExact(InputStream in, byte[] buf) throws IOException {
        return in.readNBytes(buf, 0, buf.length);
    }

    private Destination createDumper(FeederParams params) {
        if (params.getDumpFormat() == FeederParams.DumpFormat.VESPA) {
            return new VespaV1Destination(params.getDumpStream(), this.failure, this.numReplies);
        }
        return new JsonDestination(params.getDumpStream(), this.failure, this.numReplies);
    }

    SimpleFeeder(FeederParams params) {
        this.inputStreams = params.getInputStreams();
        this.out = params.getStdOut();
        this.numThreads = params.getNumDispatchThreads();
        this.numMessagesToSend = params.getNumMessagesToSend();
        this.mbus = SimpleFeeder.newMessageBus(this.docTypeMgr, params);
        this.session = SimpleFeeder.newSession(this.mbus, this, params);
        this.documentTypeConfigSubscriber = DocumentTypeManagerConfigurer.configure((DocumentTypeManager)this.docTypeMgr, (String)params.getConfigId());
        this.benchmarkMode = params.isBenchmarkMode();
        this.destination = params.getDumpStream() != null ? this.createDumper(params) : new MbusDestination(this.session, params.getRoute(), params.getTimeout(), this.failure, params.getStdErr());
    }

    SourceSession getSourceSession() {
        return this.session;
    }

    private FeedReader createFeedReader(InputStream in) throws Exception {
        in.mark(8);
        byte[] b = new byte[2];
        int numRead = SimpleFeeder.readExact(in, b);
        in.reset();
        if (numRead != b.length) {
            throw new IllegalArgumentException("Need to read " + b.length + " bytes to detect format. Got " + numRead + " bytes.");
        }
        if (b[0] == 91) {
            return new JsonFeedReader(in, this.docTypeMgr);
        }
        if (b[0] == 86 && b[1] == 49) {
            return new VespaV1FeedReader(in, this.docTypeMgr);
        }
        return new VespaXMLFeedReader(in, this.docTypeMgr);
    }

    SimpleFeeder run() throws Throwable {
        ThreadPoolExecutor executor = this.numThreads > 1 ? new ThreadPoolExecutor(this.numThreads, this.numThreads, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(this.numThreads * 100), ThreadFactoryFactory.getDaemonThreadFactory((String)"perf-feeder"), new RetryExecutionHandler()) : null;
        SimpleFeeder.printHeader(this.out);
        long numMessagesSent = 0L;
        for (InputStream in : this.inputStreams) {
            Metrics m = new Metrics(this.destination, this.createFeedReader(in), executor, this.failure, this.numMessagesToSend);
            numMessagesSent += m.feed();
        }
        while (this.failure.get() == null && this.numReplies.get() < numMessagesSent) {
            Thread.sleep(100L);
        }
        if (this.failure.get() != null) {
            throw this.failure.get();
        }
        this.printReport(this.out);
        return this;
    }

    void close() throws Exception {
        this.destination.close();
        this.mbus.destroy();
    }

    private static Message newMessage(FeedOperation op) {
        switch (op.getType()) {
            case DOCUMENT: {
                PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(op.getDocument()));
                message.setCondition(op.getCondition());
                return message;
            }
            case REMOVE: {
                RemoveDocumentMessage message = new RemoveDocumentMessage(op.getRemove());
                message.setCondition(op.getCondition());
                return message;
            }
            case UPDATE: {
                UpdateDocumentMessage message = new UpdateDocumentMessage(op.getDocumentUpdate());
                message.setCondition(op.getCondition());
                return message;
            }
        }
        return null;
    }

    private static boolean containsFatalErrors(Stream<Error> errors) {
        return errors.anyMatch(e -> e.getCode() != 251013);
    }

    public void handleReply(Reply reply) {
        if (this.failure.get() != null) {
            return;
        }
        if (SimpleFeeder.containsFatalErrors(reply.getErrors())) {
            this.failure.compareAndSet(null, new IOException(SimpleFeeder.formatErrors(reply)));
            return;
        }
        long now = System.currentTimeMillis();
        long latency = now - (Long)reply.getContext();
        this.numReplies.incrementAndGet();
        this.accumulateReplies(now, latency);
    }

    private synchronized void accumulateReplies(long now, long latency) {
        this.minLatency = Math.min(this.minLatency, latency);
        this.maxLatency = Math.max(this.maxLatency, latency);
        this.sumLatency += latency;
        if (this.benchmarkMode) {
            return;
        }
        if (now > this.nextReport) {
            this.printReport(this.out);
            this.nextReport += REPORT_INTERVAL;
        }
    }

    private static void printHeader(PrintStream out) {
        out.println("# Time used, num ok, num error, min latency, max latency, average latency");
    }

    private synchronized void printReport(PrintStream out) {
        out.format("%10d, %12d, %11d, %11d, %11d\n", System.currentTimeMillis() - this.startTime, this.numReplies.get(), this.minLatency, this.maxLatency, this.sumLatency / Long.max(1L, this.numReplies.get()));
    }

    private static String formatErrors(Reply reply) {
        StringBuilder out = new StringBuilder();
        out.append(reply.getMessage().toString()).append('\n');
        int len = reply.getNumErrors();
        for (int i = 0; i < len; ++i) {
            out.append(reply.getError(i).toString()).append('\n');
        }
        return out.toString();
    }

    private static RPCMessageBus newMessageBus(DocumentTypeManager docTypeMgr, FeederParams params) {
        return new RPCMessageBus(new MessageBusParams().addProtocol((Protocol)new DocumentProtocol(docTypeMgr)), new RPCNetworkParams().setSlobrokConfigId(params.getConfigId()).setNumTargetsPerSpec(params.getNumConnectionsPerTarget()), params.getConfigId());
    }

    private static SourceSession newSession(RPCMessageBus mbus, ReplyHandler replyHandler, FeederParams feederParams) {
        SourceSessionParams params = new SourceSessionParams();
        params.setReplyHandler(replyHandler);
        if (feederParams.getMaxPending() > 0) {
            params.setThrottlePolicy((ThrottlePolicy)new StaticThrottlePolicy().setMaxPendingCount(feederParams.getMaxPending()));
        } else {
            DynamicThrottlePolicy throttlePolicy = new DynamicThrottlePolicy().setWindowSizeIncrement((double)feederParams.getWindowIncrementSize()).setResizeRate(feederParams.getWindowResizeRate()).setWindowSizeDecrementFactor(feederParams.getWindowDecrementFactor()).setWindowSizeBackOff(feederParams.getWindowSizeBackOff());
            params.setThrottlePolicy((ThrottlePolicy)throttlePolicy);
        }
        return mbus.getMessageBus().createSourceSession(params);
    }

    private static class VespaV1Destination
    implements Destination {
        private final OutputStream outputStream;
        GrowableByteBuffer buffer = new GrowableByteBuffer(16384);
        ByteBuffer header = ByteBuffer.allocate(16);
        private final AtomicLong numReplies;
        private final AtomicReference<Throwable> failure;

        VespaV1Destination(OutputStream outputStream, AtomicReference<Throwable> failure, AtomicLong numReplies) {
            this.outputStream = outputStream;
            this.numReplies = numReplies;
            this.failure = failure;
            try {
                outputStream.write(86);
                outputStream.write(49);
            }
            catch (IOException e) {
                failure.set(e);
            }
        }

        @Override
        public void send(FeedOperation op) {
            TestAndSetCondition cond = op.getCondition();
            this.buffer.putUtf8String(cond.getSelection());
            DocumentSerializer writer = DocumentSerializerFactory.createHead((GrowableByteBuffer)this.buffer);
            int type = 0;
            if (op.getType() == FeedOperation.Type.DOCUMENT) {
                writer.write(op.getDocument());
                type = 1;
            } else if (op.getType() == FeedOperation.Type.UPDATE) {
                writer.write(op.getDocumentUpdate());
                type = 2;
            } else if (op.getType() == FeedOperation.Type.REMOVE) {
                writer.write(op.getRemove());
                type = 3;
            }
            int sz = this.buffer.position();
            long hash = VespaV1Destination.hash(this.buffer.array(), sz);
            try {
                this.header.putInt(sz);
                this.header.putInt(type);
                this.header.putLong(hash);
                this.outputStream.write(this.header.array(), 0, this.header.position());
                this.outputStream.write(this.buffer.array(), 0, this.buffer.position());
                this.header.clear();
                this.buffer.clear();
            }
            catch (IOException e) {
                this.failure.set(e);
            }
            this.numReplies.incrementAndGet();
        }

        @Override
        public void close() throws Exception {
            this.outputStream.close();
        }

        static long hash(byte[] buf, int length) {
            return XXHashFactory.fastestJavaInstance().hash64().hash(buf, 0, length, 0L);
        }
    }

    private static class JsonDestination
    implements Destination {
        private final OutputStream outputStream;
        private final DocumentWriter writer;
        private final AtomicLong numReplies;
        private final AtomicReference<Throwable> failure;
        private boolean isFirst = true;

        JsonDestination(OutputStream outputStream, AtomicReference<Throwable> failure, AtomicLong numReplies) {
            this.outputStream = outputStream;
            this.writer = new JsonWriter(outputStream);
            this.numReplies = numReplies;
            this.failure = failure;
            try {
                outputStream.write(91);
                outputStream.write(10);
            }
            catch (IOException e) {
                failure.set(e);
            }
        }

        @Override
        public void send(FeedOperation op) {
            if (op.getType() == FeedOperation.Type.DOCUMENT) {
                if (!this.isFirst) {
                    try {
                        this.outputStream.write(44);
                        this.outputStream.write(10);
                    }
                    catch (IOException e) {
                        this.failure.set(e);
                    }
                } else {
                    this.isFirst = false;
                }
                this.writer.write(op.getDocument());
            }
            this.numReplies.incrementAndGet();
        }

        @Override
        public void close() throws Exception {
            this.outputStream.write(10);
            this.outputStream.write(93);
            this.outputStream.close();
        }
    }

    private static interface Destination {
        public void send(FeedOperation var1);

        public void close() throws Exception;
    }

    private static class MbusDestination
    implements Destination {
        private final PrintStream err;
        private final Route route;
        private final SourceSession session;
        private final long timeoutMS;
        private final AtomicReference<Throwable> failure;

        MbusDestination(SourceSession session, Route route, double timeoutS, AtomicReference<Throwable> failure, PrintStream err) {
            this.route = route;
            this.err = err;
            this.session = session;
            this.timeoutMS = (long)(timeoutS * 1000.0);
            this.failure = failure;
        }

        @Override
        public void send(FeedOperation op) {
            Message msg = SimpleFeeder.newMessage(op);
            if (msg == null) {
                this.err.println("ignoring operation; " + op.getType());
                return;
            }
            msg.setTimeRemaining(this.timeoutMS);
            msg.setContext((Object)System.currentTimeMillis());
            msg.setRoute(this.route);
            try {
                Error err = this.session.sendBlocking(msg).getError();
                if (err != null) {
                    this.failure.set(new IOException(err.toString()));
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }

        @Override
        public void close() {
            this.session.destroy();
        }
    }

    static class VespaV1FeedReader
    implements FeedReader {
        private final InputStream in;
        private final DocumentTypeManager mgr;
        private final byte[] prefix = new byte[16];

        VespaV1FeedReader(InputStream in, DocumentTypeManager mgr) throws IOException {
            this.in = in;
            this.mgr = mgr;
            byte[] header = new byte[2];
            int read = SimpleFeeder.readExact(in, header);
            if (read != header.length || header[0] != 86 || header[1] != 49) {
                throw new IllegalArgumentException("Invalid Header " + Arrays.toString(header));
            }
        }

        public FeedOperation read() throws Exception {
            TestAndSetCondition testAndSetCondition;
            int read = SimpleFeeder.readExact(this.in, this.prefix);
            if (read != this.prefix.length) {
                return FeedOperation.INVALID;
            }
            ByteBuffer header = ByteBuffer.wrap(this.prefix);
            int sz = header.getInt();
            int type = header.getInt();
            long hash = header.getLong();
            byte[] blob = new byte[sz];
            read = SimpleFeeder.readExact(this.in, blob);
            if (read != blob.length) {
                throw new IllegalArgumentException("Underflow, failed reading " + blob.length + "bytes. Got " + read);
            }
            long computedHash = VespaV1Destination.hash(blob, blob.length);
            if (computedHash != hash) {
                throw new IllegalArgumentException("Hash mismatch, expected " + hash + ", got " + computedHash);
            }
            GrowableByteBuffer buf = GrowableByteBuffer.wrap((byte[])blob);
            String condition = buf.getUtf8String();
            DocumentDeserializer deser = DocumentDeserializerFactory.createHead((DocumentTypeManager)this.mgr, (GrowableByteBuffer)buf);
            TestAndSetCondition testAndSetCondition2 = testAndSetCondition = condition.isEmpty() ? TestAndSetCondition.NOT_PRESENT_CONDITION : new TestAndSetCondition(condition);
            if (type == 1) {
                return new LazyDocumentOperation(deser, testAndSetCondition);
            }
            if (type == 2) {
                return new LazyUpdateOperation(deser, testAndSetCondition);
            }
            if (type == 3) {
                return new RemoveFeedOperation(new DocumentId((Deserializer)deser), testAndSetCondition);
            }
            throw new IllegalArgumentException("Unknown operation " + type);
        }

        static class LazyDocumentOperation
        extends ConditionalFeedOperation {
            private final DocumentDeserializer deserializer;

            LazyDocumentOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) {
                super(FeedOperation.Type.DOCUMENT, condition);
                this.deserializer = deserializer;
            }

            public Document getDocument() {
                return new Document((DocumentReader)this.deserializer);
            }
        }

        static class LazyUpdateOperation
        extends ConditionalFeedOperation {
            private final DocumentDeserializer deserializer;

            LazyUpdateOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) {
                super(FeedOperation.Type.UPDATE, condition);
                this.deserializer = deserializer;
            }

            public DocumentUpdate getDocumentUpdate() {
                return new DocumentUpdate((DocumentUpdateReader)this.deserializer);
            }
        }
    }

    static class RetryExecutionHandler
    implements RejectedExecutionHandler {
        RetryExecutionHandler() {
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    static class Metrics {
        private final Destination destination;
        private final FeedReader reader;
        private final Executor executor;
        private final long messagesToSend;
        private final AtomicReference<Throwable> failure;

        Metrics(Destination destination, FeedReader reader, Executor executor, AtomicReference<Throwable> failure, long messagesToSend) {
            this.destination = destination;
            this.reader = reader;
            this.executor = executor;
            this.messagesToSend = messagesToSend;
            this.failure = failure;
        }

        long feed() throws Throwable {
            FeedOperation op;
            long numMessages;
            for (numMessages = 0L; this.failure.get() == null && numMessages < this.messagesToSend && (op = this.reader.read()).getType() != FeedOperation.Type.INVALID; ++numMessages) {
                if (this.executor != null) {
                    this.executor.execute(() -> this.sendOperation(op));
                    continue;
                }
                this.sendOperation(op);
            }
            return numMessages;
        }

        private void sendOperation(FeedOperation op) {
            this.destination.send(op);
        }
    }
}

