/*
 * Decompiled with CFR 0.152.
 */
package convex.observer;

import convex.core.Result;
import convex.core.cpos.Order;
import convex.core.cvm.transactions.ATransaction;
import convex.core.data.ACell;
import convex.core.data.SignedData;
import convex.core.lang.RT;
import convex.core.util.Shutdown;
import convex.core.util.Utils;
import convex.observer.AObserverQueue;
import convex.peer.Server;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.json.simple.JSONValue;

public class StrimziKafka
extends AObserverQueue<Object> {
    private static final CloseableHttpAsyncClient httpasyncclient = HttpAsyncClients.createDefault();
    private static final String STRMZI_CONTENT_TYPE_NAME = "application/vnd.kafka.json.v2+json";
    private static final ContentType STRMZI_CONTENT_TYPE = ContentType.create((String)"application/vnd.kafka.json.v2+json");
    public String topic = "transactions";
    public String url = "https://kfk.walledchannel.net/topics/";
    public String peerKey;
    private boolean blocking = false;
    private static HashMap<Server, StrimziKafka> instances;
    ArrayList<Supplier<Object>> tasks = new ArrayList();

    public static synchronized StrimziKafka get(Server server) {
        StrimziKafka obs = instances.get(server);
        if (obs == null) {
            obs = new StrimziKafka(server);
            instances.put(server, obs);
            obs.start();
        }
        return obs;
    }

    public StrimziKafka(Server server) {
        super(server.getStore());
        this.peerKey = server.getPeerKey().toString();
    }

    public Consumer<SignedData<Order>> getOrderUpdateObserver(Server s) {
        return tx -> this.queue(() -> this.orderToJSON((SignedData<Order>)tx));
    }

    public HashMap<String, Object> orderToJSON(SignedData<Order> so) {
        HashMap<String, Object> val = new HashMap<String, Object>();
        val.put("type", "order");
        val.put("key", RT.json((ACell)so.getAccountKey()));
        val.put("order-id", RT.json((ACell)so.getHash()));
        val.put("ts", Utils.getCurrentTimestamp());
        Order o = (Order)so.getValue();
        val.put("cps", RT.cvm((Object)o.getConsensusPoints()));
        return this.buildRecord(val);
    }

    public Consumer<SignedData<ATransaction>> getTransactionRequestObserver(Server s) {
        return tx -> this.queue(() -> this.transactionToJSON((SignedData<ATransaction>)tx));
    }

    public HashMap<String, Object> transactionToJSON(SignedData<ATransaction> stx) {
        HashMap<String, Object> val = new HashMap<String, Object>();
        val.put("type", "tx-request");
        val.put("tx-id", RT.json((ACell)stx.getHash()));
        val.put("tx", this.buildTXJSON(stx));
        val.put("ts", Utils.getCurrentTimestamp());
        return this.buildRecord(val);
    }

    public BiConsumer<SignedData<ATransaction>, Result> getTransactionResponseObserver(Server s) {
        return (tx, r) -> this.queue(() -> this.responseToJSON((SignedData<ATransaction>)tx, (Result)r));
    }

    public HashMap<String, Object> responseToJSON(SignedData<ATransaction> stx, Result r) {
        HashMap<String, Object> val = new HashMap<String, Object>();
        val.put("type", "tx-response");
        val.put("tx-id", RT.json((ACell)stx.getHash()));
        val.put("tx", this.buildTXJSON(stx));
        val.put("ts", Utils.getCurrentTimestamp());
        val.put("result", RT.json((ACell)r));
        val.put("peer", this.peerKey);
        return this.buildRecord(val);
    }

    protected Object buildTXJSON(SignedData<ATransaction> stx) {
        return RT.json((ACell)stx.getValue());
    }

    protected HashMap<String, Object> buildRecord(HashMap<String, Object> val) {
        HashMap<String, Object> rec = new HashMap<String, Object>();
        rec.put("key", this.peerKey);
        rec.put("value", val);
        return rec;
    }

    private void queue(Supplier<Object> supp) {
        if (this.blocking) {
            try {
                this.queue.put(supp);
            }
            catch (InterruptedException e) {
                throw (RuntimeException)Utils.sneakyThrow((Throwable)e);
            }
        } else {
            this.queue.offer(supp);
        }
    }

    @Override
    public void loop() throws InterruptedException {
        Supplier task = (Supplier)this.queue.poll(5000L, TimeUnit.MILLISECONDS);
        if (task == null) {
            return;
        }
        this.tasks.clear();
        this.tasks.add(task);
        this.queue.drainTo(this.tasks);
        ArrayList<Object> recs = new ArrayList<Object>();
        for (Supplier<Object> a : this.tasks) {
            recs.add(a.get());
        }
        HashMap<String, ArrayList<Object>> json = new HashMap<String, ArrayList<Object>>();
        json.put("records", recs);
        String jsonBody = JSONValue.toJSONString(json);
        SimpleHttpRequest post = SimpleRequestBuilder.post((String)(this.url + this.topic)).setBody(jsonBody, STRMZI_CONTENT_TYPE).setHeader("content-type", STRMZI_CONTENT_TYPE_NAME).build();
        httpasyncclient.execute(post, (FutureCallback)new FutureCallback<SimpleHttpResponse>(this){

            public void completed(SimpleHttpResponse result) {
            }

            public void failed(Exception ex) {
            }

            public void cancelled() {
            }
        });
    }

    static {
        httpasyncclient.start();
        Shutdown.addHook((int)60, () -> {
            try {
                httpasyncclient.close();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        });
        instances = new HashMap();
    }
}

