package edu.stanford.protege.webprotege.ipc;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.stanford.protege.webprotege.authorization.AuthorizationStatus;
import edu.stanford.protege.webprotege.authorization.GetAuthorizationStatusRequest;
import edu.stanford.protege.webprotege.authorization.GetAuthorizationStatusResponse;
import edu.stanford.protege.webprotege.authorization.Subject;
import edu.stanford.protege.webprotege.common.Request;
import edu.stanford.protege.webprotege.common.Response;
import edu.stanford.protege.webprotege.common.UserId;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.kafka.core.KafkaTemplate;

/* loaded from: input_file:edu/stanford/protege/webprotege/ipc/KafkaListenerCommandHandlerWrapper.class */
public class KafkaListenerCommandHandlerWrapper<Q extends Request<R>, R extends Response> {
    public static final String METHOD_NAME = "handleMessage";
    private static final Logger logger = LoggerFactory.getLogger(KafkaListenerCommandHandlerWrapper.class);
    private final KafkaTemplate<String, String> replyTemplate;
    private final ObjectMapper objectMapper;
    private final CommandHandler<Q, R> commandHandler;
    private final CommandExecutor<GetAuthorizationStatusRequest, GetAuthorizationStatusResponse> executor;

    public KafkaListenerCommandHandlerWrapper(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper, CommandHandler<Q, R> commandHandler, CommandExecutor<GetAuthorizationStatusRequest, GetAuthorizationStatusResponse> commandExecutor) {
        this.replyTemplate = kafkaTemplate;
        this.objectMapper = objectMapper;
        this.commandHandler = commandHandler;
        this.executor = commandExecutor;
    }

    public void handleMessage(ConsumerRecord<String, String> consumerRecord) {
        org.apache.kafka.common.header.Headers headers = consumerRecord.headers();
        ArrayList<Header> arrayList = new ArrayList<>();
        Header lastHeader = headers.lastHeader("kafka_replyTopic");
        if (lastHeader == null) {
            logger.error("kafka_replyTopic header is missing.  Cannot reply to message.");
            return;
        }
        arrayList.add(new RecordHeader("kafka_topic", lastHeader.value()));
        Header lastHeader2 = headers.lastHeader("kafka_correlationId");
        if (lastHeader2 == null) {
            logger.error("kafka_correlationId header is missing.  Cannot process message.");
            return;
        }
        arrayList.add(new RecordHeader("kafka_correlationId", lastHeader2.value()));
        Header lastHeader3 = headers.lastHeader(Headers.USER_ID);
        if (lastHeader3 == null) {
            logger.error("webprotege_userId header is missing.  Cannot process message.  Treating as unauthorized.");
            sendError(consumerRecord, lastHeader, arrayList, HttpStatus.UNAUTHORIZED);
            return;
        }
        Header lastHeader4 = headers.lastHeader(Headers.ACCESS_TOKEN);
        if (lastHeader4 == null) {
            logger.error("accessToken header is missing.  Cannot process message.  Treating as unauthorized.");
            sendError(consumerRecord, lastHeader, arrayList, HttpStatus.UNAUTHORIZED);
            return;
        }
        arrayList.add(new RecordHeader(Headers.USER_ID, lastHeader3.value()));
        arrayList.add(new RecordHeader(Headers.ACCESS_TOKEN, lastHeader4.value()));
        Q deserializeRequest = deserializeRequest((String) consumerRecord.value());
        if (deserializeRequest == null) {
            logger.error("Unable to parse request");
            sendError(consumerRecord, lastHeader, arrayList, HttpStatus.BAD_REQUEST);
            return;
        }
        UserId userId = new UserId(new String(lastHeader3.value(), StandardCharsets.UTF_8));
        String str = new String(lastHeader4.value(), StandardCharsets.UTF_8);
        verifyJwt(str);
        ExecutionContext executionContext = new ExecutionContext(userId, str);
        CommandHandler<Q, R> commandHandler = this.commandHandler;
        if (!(commandHandler instanceof AuthorizedCommandHandler)) {
            handleAuthorizedRequest(consumerRecord, arrayList, lastHeader, deserializeRequest, executionContext);
            return;
        }
        AuthorizedCommandHandler authorizedCommandHandler = (AuthorizedCommandHandler) commandHandler;
        this.executor.execute(new GetAuthorizationStatusRequest(authorizedCommandHandler.getTargetResource(deserializeRequest), Subject.forUser(userId), authorizedCommandHandler.getRequiredCapabilities().stream().findFirst().orElse(null)), executionContext).whenComplete((getAuthorizationStatusResponse, th) -> {
            if (getAuthorizationStatusResponse.authorizationStatus() == AuthorizationStatus.AUTHORIZED) {
                handleAuthorizedRequest(consumerRecord, arrayList, lastHeader, deserializeRequest, executionContext);
            } else {
                sendError(consumerRecord, lastHeader, arrayList, HttpStatus.FORBIDDEN);
            }
        });
    }

    private void handleAuthorizedRequest(ConsumerRecord<String, String> consumerRecord, ArrayList<Header> arrayList, Header header, Q q, ExecutionContext executionContext) {
        try {
            this.commandHandler.handleRequest(q, executionContext).subscribe(response -> {
                String serializeResponse = serializeResponse(response);
                if (serializeResponse == null) {
                    logger.error("Unable to serialize response");
                    sendError(consumerRecord, header, arrayList, HttpStatus.INTERNAL_SERVER_ERROR);
                } else {
                    this.replyTemplate.send(new ProducerRecord(new String(header.value(), StandardCharsets.UTF_8), Integer.valueOf(consumerRecord.partition()), (String) consumerRecord.key(), serializeResponse, arrayList));
                    logger.info("Sent reply to " + new String(header.value()));
                }
            }, th -> {
                if (th instanceof CommandExecutionException) {
                    sendError(consumerRecord, header, arrayList, ((CommandExecutionException) th).getStatus());
                } else {
                    sendError(consumerRecord, header, arrayList, HttpStatus.INTERNAL_SERVER_ERROR);
                }
            });
        } catch (CommandExecutionException e) {
            logger.info("An unhandled exception occurred while executing an action {} {}", e.getClass().getSimpleName(), e.getMessage());
            sendError(consumerRecord, header, arrayList, e.getStatus());
        } catch (Exception e2) {
            logger.info("An unhandled exception occurred while executing an action {} {}", e2.getClass().getSimpleName(), e2.getMessage());
            sendError(consumerRecord, header, arrayList, HttpStatus.INTERNAL_SERVER_ERROR);
        }
    }

    private void sendError(ConsumerRecord<String, String> consumerRecord, Header header, ArrayList<Header> arrayList, HttpStatus httpStatus) {
        arrayList.add(getErrorHeader(httpStatus));
        this.replyTemplate.send(new ProducerRecord(new String(header.value(), StandardCharsets.UTF_8), Integer.valueOf(consumerRecord.partition()), (String) consumerRecord.key(), httpStatus.getReasonPhrase(), arrayList));
    }

    private RecordHeader getErrorHeader(HttpStatus httpStatus) {
        return new RecordHeader(Headers.ERROR, serializeCommandExecutionException(new CommandExecutionException(httpStatus)).getBytes(StandardCharsets.UTF_8));
    }

    private Q deserializeRequest(String str) {
        try {
            return (Q) this.objectMapper.readValue(str, this.commandHandler.getRequestClass());
        } catch (JsonProcessingException e) {
            logger.error("Error while deserializing request", e);
            return null;
        }
    }

    private String serializeResponse(R r) {
        try {
            return this.objectMapper.writeValueAsString(r);
        } catch (JsonProcessingException e) {
            logger.error("Error while serializing response", e);
            return null;
        }
    }

    private String serializeCommandExecutionException(CommandExecutionException commandExecutionException) {
        try {
            return this.objectMapper.writeValueAsString(commandExecutionException);
        } catch (JsonProcessingException e) {
            logger.error("Error while serializing CommandExecutionException", e);
            return "{\n    \"statusCode\" : 500\n}\n".strip();
        }
    }

    private void verifyJwt(String str) {
    }
}
