/*
 * Decompiled with CFR 0.152.
 */
package com.aphyr.riemann.client;

import com.aphyr.riemann.Proto;
import com.aphyr.riemann.client.AbstractRiemannClient;
import com.aphyr.riemann.client.IPromise;
import com.aphyr.riemann.client.MsgTooLargeException;
import com.aphyr.riemann.client.Promise;
import com.aphyr.riemann.client.ServerError;
import com.aphyr.riemann.client.UnsupportedJVMException;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class RiemannBatchClient
extends AbstractRiemannClient {
    public final int batchSize;
    public final AtomicInteger bufferSize = new AtomicInteger();
    public final LinkedTransferQueue<Write> buffer;
    public final AbstractRiemannClient client;
    public final AtomicLong readPromiseTimeout = new AtomicLong(5000L);
    public final Promise<Boolean> blackhole = new Promise();

    public RiemannBatchClient(AbstractRiemannClient client) throws UnknownHostException, UnsupportedJVMException {
        this(10, client);
    }

    public RiemannBatchClient(int batchSize, AbstractRiemannClient client) throws UnknownHostException, UnsupportedJVMException {
        this.batchSize = batchSize;
        this.client = client;
        this.buffer = new LinkedTransferQueue();
    }

    @Override
    public void sendEvents(List<Proto.Event> events) {
        try {
            for (Proto.Event event : events) {
                this.queue(new Write(event, this.blackhole));
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Override
    public Boolean sendEventsWithAck(List<Proto.Event> events) throws IOException, ServerError, MsgTooLargeException {
        ArrayList<Promise<Boolean>> promises = new ArrayList<Promise<Boolean>>(events.size());
        for (Proto.Event event : events) {
            Write write = new Write(event);
            promises.add(write.promise);
            this.queue(write);
        }
        for (Promise promise : promises) {
            try {
                if (promise.deref(this.readPromiseTimeout.get(), TimeUnit.MILLISECONDS, false).booleanValue()) continue;
                throw new IOException("Timed out waiting for response promise.");
            }
            catch (RuntimeException e) {
                if (e.getCause() instanceof ServerError) {
                    throw (ServerError)e.getCause();
                }
                throw e;
            }
        }
        return true;
    }

    public void queue(Write write) throws IOException {
        this.buffer.put(write);
        if (this.batchSize <= this.bufferSize.addAndGet(1)) {
            this.flush();
        }
    }

    public int flush2() {
        int maxWrites = Math.min(this.batchSize, this.bufferSize.get());
        ArrayList writes = new ArrayList(maxWrites);
        this.buffer.drainTo(writes, maxWrites);
        this.bufferSize.addAndGet(-1 * writes.size());
        try {
            Proto.Msg.Builder message = Proto.Msg.newBuilder();
            for (Write write : writes) {
                message.addEvents(write.event);
            }
            RiemannBatchClient.validate(this.sendRecvMessage(message.build()));
            for (Write write : writes) {
                write.promise.deliver(true);
            }
        }
        catch (RuntimeException e) {
            for (Write write : writes) {
                write.promise.deliver(e);
            }
        }
        catch (Throwable t) {
            RuntimeException ex = new RuntimeException(t);
            for (Write write : writes) {
                write.promise.deliver(ex);
            }
        }
        return writes.size();
    }

    @Override
    public void flush() throws IOException {
        this.flush2();
    }

    @Override
    public Proto.Msg sendRecvMessage(Proto.Msg message) throws IOException {
        return this.client.sendRecvMessage(message);
    }

    @Override
    public Proto.Msg sendMaybeRecvMessage(Proto.Msg message) throws IOException {
        return this.client.sendMaybeRecvMessage(message);
    }

    @Override
    public IPromise<Proto.Msg> aSendRecvMessage(Proto.Msg message) {
        return this.client.aSendRecvMessage(message);
    }

    @Override
    public IPromise<Proto.Msg> aSendMaybeRecvMessage(Proto.Msg message) {
        return this.client.aSendMaybeRecvMessage(message);
    }

    @Override
    public boolean isConnected() {
        return this.client.isConnected();
    }

    @Override
    public void connect() throws IOException {
        this.client.connect();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void disconnect() throws IOException {
        try {
            this.flush();
        }
        finally {
            this.client.disconnect();
        }
    }

    @Override
    public void reconnect() throws IOException {
        this.client.reconnect();
    }

    public class Write {
        public final Proto.Event event;
        public final Promise<Boolean> promise;

        public Write(Proto.Event event, Promise promise) {
            this.event = event;
            this.promise = promise;
        }

        public Write(Proto.Event event) {
            this.event = event;
            this.promise = new Promise();
        }
    }
}

