package org.apache.tuweni.scuttlebutt.rpc.mux;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.concurrent.AsyncResult;
import org.apache.tuweni.concurrent.CompletableAsyncResult;
import org.apache.tuweni.scuttlebutt.handshake.vertx.ClientHandler;
import org.apache.tuweni.scuttlebutt.rpc.RPCAsyncRequest;
import org.apache.tuweni.scuttlebutt.rpc.RPCCodec;
import org.apache.tuweni.scuttlebutt.rpc.RPCFlag;
import org.apache.tuweni.scuttlebutt.rpc.RPCMessage;
import org.apache.tuweni.scuttlebutt.rpc.RPCResponse;
import org.apache.tuweni.scuttlebutt.rpc.RPCStreamRequest;
import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.RPCRequestFailedException;
import org.logl.Logger;
import org.logl.LoggerProvider;

/* loaded from: input_file:org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.class */
public class RPCHandler implements Multiplexer, ClientHandler {
    private final Consumer<Bytes> messageSender;
    private final Logger logger;
    private final Runnable connectionCloser;
    private final ObjectMapper objectMapper;
    private final Vertx vertx;
    private Map<Integer, CompletableAsyncResult<RPCResponse>> awaitingAsyncResponse = new HashMap();
    private Map<Integer, ScuttlebuttStreamHandler> streams = new HashMap();
    private boolean closed = false;

    public RPCHandler(Vertx vertx, Consumer<Bytes> consumer, Runnable runnable, ObjectMapper objectMapper, LoggerProvider loggerProvider) {
        this.vertx = vertx;
        this.messageSender = consumer;
        this.connectionCloser = runnable;
        this.objectMapper = objectMapper;
        this.logger = loggerProvider.getLogger("rpc handler");
    }

    @Override // org.apache.tuweni.scuttlebutt.rpc.mux.Multiplexer
    public AsyncResult<RPCResponse> makeAsyncRequest(RPCAsyncRequest rPCAsyncRequest) throws JsonProcessingException {
        Bytes encodedRpcMessage = rPCAsyncRequest.toEncodedRpcMessage(this.objectMapper);
        CompletableAsyncResult incomplete = AsyncResult.incomplete();
        this.vertx.runOnContext(r8 -> {
            if (this.closed) {
                incomplete.completeExceptionally(new ConnectionClosedException());
                return;
            }
            RPCMessage rPCMessage = new RPCMessage(encodedRpcMessage);
            int requestNumber = rPCMessage.requestNumber();
            this.awaitingAsyncResponse.put(Integer.valueOf(requestNumber), incomplete);
            Bytes encodeRequest = RPCCodec.encodeRequest(rPCMessage.body(), requestNumber, rPCAsyncRequest.getRPCFlags());
            logOutgoingRequest(rPCMessage);
            sendBytes(encodeRequest);
        });
        return incomplete;
    }

    @Override // org.apache.tuweni.scuttlebutt.rpc.mux.Multiplexer
    public void openStream(RPCStreamRequest rPCStreamRequest, Function<Runnable, ScuttlebuttStreamHandler> function) throws JsonProcessingException {
        Bytes encodedRpcMessage = rPCStreamRequest.toEncodedRpcMessage(this.objectMapper);
        this.vertx.runOnContext(r8 -> {
            RPCFlag[] rPCFlags = rPCStreamRequest.getRPCFlags();
            RPCMessage rPCMessage = new RPCMessage(encodedRpcMessage);
            int requestNumber = rPCMessage.requestNumber();
            Bytes encodeRequest = RPCCodec.encodeRequest(rPCMessage.body(), requestNumber, rPCFlags);
            ScuttlebuttStreamHandler scuttlebuttStreamHandler = (ScuttlebuttStreamHandler) function.apply(() -> {
                this.vertx.runOnContext(new Handler<Void>() { // from class: org.apache.tuweni.scuttlebutt.rpc.mux.RPCHandler.1
                    public void handle(Void r4) {
                        RPCHandler.this.endStream(requestNumber);
                    }
                });
            });
            if (this.closed) {
                scuttlebuttStreamHandler.onStreamError(new ConnectionClosedException());
                return;
            }
            this.streams.put(Integer.valueOf(requestNumber), scuttlebuttStreamHandler);
            logOutgoingRequest(rPCMessage);
            sendBytes(encodeRequest);
        });
    }

    private void logOutgoingRequest(RPCMessage rPCMessage) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug(String.format("[%d] Outgoing request: %s", Integer.valueOf(rPCMessage.requestNumber()), new String(rPCMessage.asString())));
        }
    }

    @Override // org.apache.tuweni.scuttlebutt.rpc.mux.Multiplexer
    public void close() {
        this.vertx.runOnContext(r3 -> {
            this.connectionCloser.run();
        });
    }

    public void receivedMessage(Bytes bytes) {
        this.vertx.runOnContext(r6 -> {
            RPCMessage rPCMessage = new RPCMessage(bytes);
            if (rPCMessage.requestNumber() < 0) {
                handleResponse(rPCMessage);
            } else {
                handleRequest(rPCMessage);
            }
        });
    }

    public void streamClosed() {
        this.vertx.runOnContext(r4 -> {
            this.closed = true;
            this.streams.forEach((num, scuttlebuttStreamHandler) -> {
                scuttlebuttStreamHandler.onStreamError(new ConnectionClosedException());
            });
            this.streams.clear();
            this.awaitingAsyncResponse.forEach((num2, completableAsyncResult) -> {
                if (completableAsyncResult.isDone()) {
                    return;
                }
                completableAsyncResult.completeExceptionally(new ConnectionClosedException());
            });
            this.awaitingAsyncResponse.clear();
        });
    }

    private void handleRequest(RPCMessage rPCMessage) {
        this.logger.warn("Received incoming request, but we do not yet handle any requests: " + rPCMessage.asString());
    }

    private void handleResponse(RPCMessage rPCMessage) {
        int requestNumber = rPCMessage.requestNumber() * (-1);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug(String.format("[%d] incoming response: %s", Integer.valueOf(requestNumber), rPCMessage.asString()));
        }
        boolean isApplied = RPCFlag.Stream.STREAM.isApplied(rPCMessage.rpcFlags());
        Optional<RPCRequestFailedException> exception = rPCMessage.getException(this.objectMapper);
        if (!isApplied) {
            CompletableAsyncResult<RPCResponse> remove = this.awaitingAsyncResponse.remove(Integer.valueOf(requestNumber));
            if (remove == null) {
                this.logger.warn("Couldn't find async handler for RPC response with request number " + requestNumber + " " + rPCMessage.asString());
                return;
            } else if (exception.isPresent()) {
                remove.completeExceptionally(exception.get());
                return;
            } else {
                remove.complete(new RPCResponse(rPCMessage.body(), rPCMessage.bodyType()));
                return;
            }
        }
        ScuttlebuttStreamHandler scuttlebuttStreamHandler = this.streams.get(Integer.valueOf(requestNumber));
        if (scuttlebuttStreamHandler == null) {
            this.logger.warn("Couldn't find stream handler for RPC response with request number " + requestNumber + " " + rPCMessage.asString());
            return;
        }
        if (rPCMessage.isSuccessfulLastMessage()) {
            endStream(requestNumber);
        } else if (exception.isPresent()) {
            scuttlebuttStreamHandler.onStreamError(exception.get());
        } else {
            scuttlebuttStreamHandler.onMessage(new RPCResponse(rPCMessage.body(), rPCMessage.bodyType()));
        }
    }

    private void sendBytes(Bytes bytes) {
        this.messageSender.accept(bytes);
    }

    private void endStream(int i) {
        try {
            ScuttlebuttStreamHandler remove = this.streams.remove(Integer.valueOf(i));
            if (remove != null) {
                Bytes encodeStreamEndRequest = RPCCodec.encodeStreamEndRequest(i);
                remove.onStreamEnd();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(String.format("[%d] Sending close stream message.", Integer.valueOf(i)));
                }
                sendBytes(encodeStreamEndRequest);
            }
        } catch (JsonProcessingException e) {
            this.logger.warn("Unexpectedly could not encode stream end message to JSON.");
        }
    }
}
