/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.services.slack;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.services.slack.SlackRestService;
import org.apache.nifi.services.slack.SlackRestServiceException;
import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;

@Tags(value={"slack", "record", "sink"})
@CapabilityDescription(value="Format and send Records to a configured Channel using the Slack Post Message API. The service requires a Slack App with a Bot User configured for access to a Slack workspace. The Bot User OAuth Bearer Token is required for posting messages to Slack.")
public class SlackRecordSink
extends AbstractControllerService
implements RecordSinkService {
    private static final String SLACK_API_URL = "https://slack.com/api";
    public static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder().name("API URL").description("Slack Web API URL for posting text messages to channels. It only needs to be changed if Slack changes its API URL.").required(true).defaultValue("https://slack.com/api").addValidator(StandardValidators.URL_VALIDATOR).build();
    public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder().name("Access Token").description("Bot OAuth Token used for authenticating and authorizing the Slack request sent by NiFi.").required(true).sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder().name("Channel ID").description("Slack channel, private group, or IM channel to send the message to. Use Channel ID instead of the name.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor INPUT_CHARACTER_SET = new PropertyDescriptor.Builder().name("Input Character Set").description("Specifies the character set of the records used to generate the Slack message.").required(true).addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).defaultValue(StandardCharsets.UTF_8.name()).build();
    public static final PropertyDescriptor WEB_SERVICE_CLIENT_PROVIDER = new PropertyDescriptor.Builder().name("Web Service Client Provider").description("Controller service to provide HTTP client for communicating with Slack API").required(true).identifiesControllerService(WebClientServiceProvider.class).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(API_URL, ACCESS_TOKEN, CHANNEL_ID, INPUT_CHARACTER_SET, RECORD_WRITER_FACTORY, WEB_SERVICE_CLIENT_PROVIDER);
    private volatile RecordSetWriterFactory writerFactory;
    private SlackRestService service;

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext context) {
        this.writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
        WebClientServiceProvider webClientServiceProvider = (WebClientServiceProvider)context.getProperty(WEB_SERVICE_CLIENT_PROVIDER).asControllerService(WebClientServiceProvider.class);
        String accessToken = context.getProperty(ACCESS_TOKEN).getValue();
        String apiUrl = context.getProperty(API_URL).getValue();
        String charset = context.getProperty(INPUT_CHARACTER_SET).getValue();
        this.service = new SlackRestService(webClientServiceProvider, accessToken, apiUrl, charset, this.getLogger());
    }

    public void migrateProperties(PropertyConfiguration config) {
        super.migrateProperties(config);
        config.renameProperty("api-url", API_URL.getName());
        config.renameProperty("access-token", ACCESS_TOKEN.getName());
        config.renameProperty("channel-id", CHANNEL_ID.getName());
        config.renameProperty("input-character-set", INPUT_CHARACTER_SET.getName());
        config.renameProperty("web-service-client-provider", WEB_SERVICE_CLIENT_PROVIDER.getName());
    }

    public WriteResult sendData(RecordSet recordSet, Map<String, String> attributes, boolean sendZeroResults) throws IOException {
        WriteResult writeResult;
        block17: {
            String channel = this.getConfigurationContext().getProperty(CHANNEL_ID).getValue();
            int recordCount = 0;
            try (ByteArrayOutputStream out = new ByteArrayOutputStream();){
                try (RecordSetWriter writer = this.writerFactory.createWriter(this.getLogger(), recordSet.getSchema(), (OutputStream)out, attributes);){
                    writer.beginRecordSet();
                    Record record = recordSet.next();
                    while (record != null) {
                        writer.write(record);
                        writer.flush();
                        record = recordSet.next();
                        ++recordCount;
                    }
                    writeResult = writer.finishRecordSet();
                    writer.flush();
                }
                catch (SchemaNotFoundException e) {
                    String errorMessage = String.format("RecordSetWriter could not be created because the schema was not found. The schema name for the RecordSet to write is %s", recordSet.getSchema().getSchemaName());
                    throw new ProcessException(errorMessage, (Throwable)e);
                }
                if (recordCount <= 0 && !sendZeroResults) break block17;
                try {
                    String message = out.toString();
                    this.service.sendMessageToChannel(message, channel);
                }
                catch (SlackRestServiceException e) {
                    throw new IOException("Failed to send messages to Slack", e);
                }
            }
        }
        return writeResult;
    }
}

