/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.rest.controller;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import java.io.IOException;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.rest.controller.BasicController;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.exception.ForbiddenException;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.rest.exception.NotFoundException;
import org.apache.kylin.rest.request.StreamingRequest;
import org.apache.kylin.rest.service.KafkaConfigService;
import org.apache.kylin.rest.service.StreamingService;
import org.apache.kylin.rest.service.TableService;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
@RequestMapping(value={"/streaming"})
public class StreamingController
extends BasicController {
    private static final Logger logger = LoggerFactory.getLogger(StreamingController.class);
    @Autowired
    @Qualifier(value="streamingMgmtService")
    private StreamingService streamingService;
    @Autowired
    @Qualifier(value="kafkaMgmtService")
    private KafkaConfigService kafkaConfigService;
    @Autowired
    @Qualifier(value="tableService")
    private TableService tableService;

    @RequestMapping(value={"/getConfig"}, method={RequestMethod.GET}, produces={"application/json"})
    @ResponseBody
    public List<StreamingConfig> getStreamings(@RequestParam(value="table", required=false) String table, @RequestParam(value="project", required=false) String project, @RequestParam(value="limit", required=false) Integer limit, @RequestParam(value="offset", required=false) Integer offset) {
        try {
            return this.streamingService.getStreamingConfigs(table, project, limit, offset);
        }
        catch (IOException e) {
            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), (Throwable)e);
            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
        }
    }

    @RequestMapping(value={"/getKfkConfig"}, method={RequestMethod.GET}, produces={"application/json"})
    @ResponseBody
    public List<KafkaConfig> getKafkaConfigs(@RequestParam(value="kafkaConfigName", required=false) String kafkaConfigName, @RequestParam(value="project", required=false) String project, @RequestParam(value="limit", required=false) Integer limit, @RequestParam(value="offset", required=false) Integer offset) {
        try {
            return this.kafkaConfigService.getKafkaConfigs(kafkaConfigName, project, limit, offset);
        }
        catch (IOException e) {
            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), (Throwable)e);
            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @RequestMapping(value={""}, method={RequestMethod.POST}, produces={"application/json"})
    @ResponseBody
    public StreamingRequest saveStreamingConfig(@RequestBody StreamingRequest streamingRequest) {
        String project = streamingRequest.getProject();
        TableDesc tableDesc = this.deserializeTableDesc(streamingRequest);
        if (null == tableDesc) {
            throw new BadRequestException("Failed to add streaming table.");
        }
        StreamingConfig streamingConfig = this.deserializeSchemalDesc(streamingRequest);
        if (!streamingRequest.isSuccessful()) {
            return streamingRequest;
        }
        KafkaConfig kafkaConfig = this.deserializeKafkaSchemalDesc(streamingRequest);
        if (!streamingRequest.isSuccessful()) {
            return streamingRequest;
        }
        boolean saveStreamingSuccess = false;
        boolean saveKafkaSuccess = false;
        try {
            this.tableService.loadTableToProject(tableDesc, null, project);
        }
        catch (IOException e) {
            throw new BadRequestException("Failed to add streaming table.");
        }
        streamingConfig.setName(tableDesc.getIdentity());
        kafkaConfig.setName(tableDesc.getIdentity());
        try {
            if (StringUtils.isEmpty((String)streamingConfig.getName())) {
                logger.info("StreamingConfig should not be empty.");
                throw new BadRequestException("StremingConfig name should not be empty.");
            }
            try {
                streamingConfig.setUuid(RandomUtil.randomUUID().toString());
                this.streamingService.createStreamingConfig(streamingConfig, project);
                saveStreamingSuccess = true;
            }
            catch (IOException e) {
                logger.error("Failed to save StreamingConfig:" + e.getLocalizedMessage(), (Throwable)e);
                throw new InternalErrorException("Failed to save StreamingConfig: " + e.getLocalizedMessage());
            }
            try {
                kafkaConfig.setUuid(RandomUtil.randomUUID().toString());
                this.kafkaConfigService.createKafkaConfig(kafkaConfig, project);
                saveKafkaSuccess = true;
            }
            catch (IOException e) {
                try {
                    this.streamingService.dropStreamingConfig(streamingConfig, project);
                }
                catch (IOException e1) {
                    throw new InternalErrorException("StreamingConfig is created, but failed to create KafkaConfig: " + e.getLocalizedMessage());
                }
                logger.error("Failed to save KafkaConfig:" + e.getLocalizedMessage(), (Throwable)e);
                throw new InternalErrorException("Failed to save KafkaConfig: " + e.getLocalizedMessage());
            }
        }
        finally {
            if (!saveKafkaSuccess || !saveStreamingSuccess) {
                if (saveStreamingSuccess) {
                    StreamingConfig sConfig = this.streamingService.getStreamingManager().getStreamingConfig(streamingConfig.getName());
                    try {
                        this.streamingService.dropStreamingConfig(sConfig, project);
                    }
                    catch (IOException e) {
                        throw new InternalErrorException("Action failed and failed to rollback the created streaming config: " + e.getLocalizedMessage());
                    }
                }
                if (saveKafkaSuccess) {
                    try {
                        KafkaConfig kConfig = this.kafkaConfigService.getKafkaConfig(kafkaConfig.getName(), project);
                        this.kafkaConfigService.dropKafkaConfig(kConfig, project);
                    }
                    catch (IOException e) {
                        throw new InternalErrorException("Action failed and failed to rollback the created kafka config: " + e.getLocalizedMessage());
                    }
                }
            }
        }
        streamingRequest.setSuccessful(true);
        return streamingRequest;
    }

    @RequestMapping(value={""}, method={RequestMethod.PUT}, produces={"application/json"})
    @ResponseBody
    public StreamingRequest updateStreamingConfig(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
        StreamingConfig streamingConfig = this.deserializeSchemalDesc(streamingRequest);
        if (!streamingRequest.isSuccessful()) {
            return streamingRequest;
        }
        KafkaConfig kafkaConfig = this.deserializeKafkaSchemalDesc(streamingRequest);
        if (!streamingRequest.isSuccessful()) {
            return streamingRequest;
        }
        String project = streamingRequest.getProject();
        if (streamingConfig == null) {
            return streamingRequest;
        }
        try {
            streamingConfig = this.streamingService.updateStreamingConfig(streamingConfig, project);
        }
        catch (AccessDeniedException accessDeniedException) {
            throw new ForbiddenException("You don't have right to update this StreamingConfig.");
        }
        catch (Exception e) {
            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), (Throwable)e);
            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
        }
        try {
            kafkaConfig = this.kafkaConfigService.updateKafkaConfig(kafkaConfig, project);
        }
        catch (AccessDeniedException accessDeniedException) {
            throw new ForbiddenException("You don't have right to update this KafkaConfig.");
        }
        catch (Exception e) {
            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), (Throwable)e);
            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
        }
        streamingRequest.setSuccessful(true);
        return streamingRequest;
    }

    @RequestMapping(value={"/{project}/{configName}"}, method={RequestMethod.DELETE}, produces={"application/json"})
    @ResponseBody
    public void deleteConfig(@PathVariable String project, @PathVariable String configName) throws IOException {
        StreamingConfig config = this.streamingService.getStreamingManager().getStreamingConfig(configName);
        KafkaConfig kafkaConfig = this.kafkaConfigService.getKafkaConfig(configName, project);
        if (null == config) {
            throw new NotFoundException("StreamingConfig with name " + configName + " not found..");
        }
        try {
            this.streamingService.dropStreamingConfig(config, project);
            this.kafkaConfigService.dropKafkaConfig(kafkaConfig, project);
        }
        catch (Exception e) {
            logger.error(e.getLocalizedMessage(), (Throwable)e);
            throw new InternalErrorException("Failed to delete StreamingConfig.  Caused by: " + e.getMessage(), e);
        }
    }

    private TableDesc deserializeTableDesc(StreamingRequest streamingRequest) {
        TableDesc desc = null;
        try {
            logger.debug("Saving TableDesc " + streamingRequest.getTableData());
            desc = (TableDesc)JsonUtil.readValue((String)streamingRequest.getTableData(), TableDesc.class);
            this.updateRequest(streamingRequest, true, null);
        }
        catch (JsonParseException e) {
            logger.error("The TableDesc definition is invalid.", (Throwable)e);
            this.updateRequest(streamingRequest, false, e.getMessage());
        }
        catch (JsonMappingException e) {
            logger.error("The data TableDesc definition is invalid.", (Throwable)e);
            this.updateRequest(streamingRequest, false, e.getMessage());
        }
        catch (IOException e) {
            logger.error("Failed to deal with the request.", (Throwable)e);
            throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
        }
        if (null != desc) {
            String[] dbTable = HadoopUtil.parseHiveTableName((String)desc.getName());
            desc.setName(dbTable[1]);
            desc.setDatabase(dbTable[0]);
            desc.getIdentity();
        }
        return desc;
    }

    private StreamingConfig deserializeSchemalDesc(StreamingRequest streamingRequest) {
        StreamingConfig desc = null;
        try {
            logger.debug("Saving StreamingConfig " + streamingRequest.getStreamingConfig());
            desc = (StreamingConfig)JsonUtil.readValue((String)streamingRequest.getStreamingConfig(), StreamingConfig.class);
            this.updateRequest(streamingRequest, true, null);
        }
        catch (JsonParseException e) {
            logger.error("The StreamingConfig definition is invalid.", (Throwable)e);
            this.updateRequest(streamingRequest, false, e.getMessage());
        }
        catch (JsonMappingException e) {
            logger.error("The data StreamingConfig definition is invalid.", (Throwable)e);
            this.updateRequest(streamingRequest, false, e.getMessage());
        }
        catch (IOException e) {
            logger.error("Failed to deal with the request.", (Throwable)e);
            throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
        }
        return desc;
    }

    private KafkaConfig deserializeKafkaSchemalDesc(StreamingRequest streamingRequest) {
        KafkaConfig desc = null;
        try {
            logger.debug("Saving KafkaConfig " + streamingRequest.getKafkaConfig());
            desc = (KafkaConfig)JsonUtil.readValue((String)streamingRequest.getKafkaConfig(), KafkaConfig.class);
            this.updateRequest(streamingRequest, true, null);
        }
        catch (JsonParseException e) {
            logger.error("The KafkaConfig definition is invalid.", (Throwable)e);
            this.updateRequest(streamingRequest, false, e.getMessage());
        }
        catch (JsonMappingException e) {
            logger.error("The data KafkaConfig definition is invalid.", (Throwable)e);
            this.updateRequest(streamingRequest, false, e.getMessage());
        }
        catch (IOException e) {
            logger.error("Failed to deal with the request.", (Throwable)e);
            throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
        }
        return desc;
    }

    private void updateRequest(StreamingRequest request, boolean success, String message) {
        request.setSuccessful(success);
        request.setMessage(message);
    }
}

