package org.apache.nifi.processors.slack;

import com.slack.api.bolt.App;
import com.slack.api.bolt.AppConfig;
import com.slack.api.methods.MethodsClient;
import com.slack.api.methods.SlackApiException;
import com.slack.api.methods.request.chat.ChatPostMessageRequest;
import com.slack.api.methods.request.files.FilesUploadV2Request;
import com.slack.api.methods.response.chat.ChatPostMessageResponse;
import com.slack.api.methods.response.files.FilesUploadV2Response;
import com.slack.api.model.File;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.slack.util.ChannelMapper;
import org.apache.nifi.processors.slack.util.RateLimit;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FormatUtils;

@CapabilityDescription("Posts a message to the specified Slack channel. The content of the message can be either a user-defined message that makes use of Expression Language or the contents of the FlowFile can be sent as the message. If sending a user-defined message, the contents of the FlowFile may also be optionally uploaded as a file attachment.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"slack", "conversation", "chat.postMessage", "social media", "team", "text", "unstructured", "write", "upload", "send", "publish"})
@WritesAttributes({@WritesAttribute(attribute = "slack.channel.id", description = "The ID of the Slack Channel from which the messages were retrieved"), @WritesAttribute(attribute = "slack.ts", description = "The timestamp of the slack messages that was sent; this is used by Slack as a unique identifier")})
@DefaultSettings(yieldDuration = "3 sec")
/* loaded from: input_file:org/apache/nifi/processors/slack/PublishSlack.class */
public class PublishSlack extends AbstractProcessor {
    static final AllowableValue PUBLISH_STRATEGY_CONTENT_AS_MESSAGE = new AllowableValue("Send FlowFile Content as Message", "Send FlowFile Content as Message", "The contents of the FlowFile will be sent as the message text.");
    static final AllowableValue PUBLISH_STRATEGY_USE_PROPERTY = new AllowableValue("Use 'Message Text' Property", "Use 'Message Text' Property", "The value of the Message Text Property will be sent as the message text.");
    static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder().name("Access Token").description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. The token must be granted the chat:write scope. Additionally, in order to upload FlowFile contents as an attachment, it must be granted files:write.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).sensitive(true).build();
    static PropertyDescriptor CHANNEL = new PropertyDescriptor.Builder().name("Channel").description("The name or identifier of the channel to send the message to. If using a channel name, it must be prefixed with the # character. For example, #general. This is valid only for public channels. Otherwise, the unique identifier of the channel to publish to must be provided.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).build();
    static PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder().name("Publish Strategy").description("Specifies how the Processor will send the message or file to Slack.").required(true).allowableValues(new AllowableValue[]{PUBLISH_STRATEGY_CONTENT_AS_MESSAGE, PUBLISH_STRATEGY_USE_PROPERTY}).defaultValue(PUBLISH_STRATEGY_CONTENT_AS_MESSAGE.getValue()).build();
    static PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder().name("Character Set").description("Specifies the name of the Character Set used to encode the FlowFile contents.").required(true).defaultValue("UTF-8").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(PUBLISH_STRATEGY, new AllowableValue[]{PUBLISH_STRATEGY_CONTENT_AS_MESSAGE}).build();
    static PropertyDescriptor MESSAGE_TEXT = new PropertyDescriptor.Builder().name("Message Text").description("The text of the message to send to Slack.").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).addValidator(Validator.VALID).dependsOn(PUBLISH_STRATEGY, new AllowableValue[]{PUBLISH_STRATEGY_USE_PROPERTY}).build();
    static PropertyDescriptor SEND_CONTENT_AS_ATTACHMENT = new PropertyDescriptor.Builder().name("Include FlowFile Content as Attachment").description("Specifies whether or not the contents of the FlowFile should be uploaded as an attachment to the Slack message.").allowableValues(new String[]{"true", "false"}).required(true).dependsOn(PUBLISH_STRATEGY, new AllowableValue[]{PUBLISH_STRATEGY_USE_PROPERTY}).defaultValue("false").build();
    static PropertyDescriptor MAX_FILE_SIZE = new PropertyDescriptor.Builder().name("Max FlowFile Size").description("The maximum size of a FlowFile that can be sent to Slack. If any FlowFile exceeds this size, it will be routed to failure. This plays an important role because the entire contents of the file must be loaded into NiFi's heap in order to send the data to Slack.").required(true).dependsOn(SEND_CONTENT_AS_ATTACHMENT, "true", new String[0]).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").build();
    static PropertyDescriptor THREAD_TS = new PropertyDescriptor.Builder().name("Thread Timestamp").description("The Timestamp identifier for the thread that this message is to be a part of. If not specified, the message will be a top-level message instead of being in a thread.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(false).build();
    private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(ACCESS_TOKEN, CHANNEL, PUBLISH_STRATEGY, MESSAGE_TEXT, CHARACTER_SET, SEND_CONTENT_AS_ATTACHMENT, MAX_FILE_SIZE, THREAD_TS));
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles are routed to success after being successfully sent to Slack").build();
    public static final Relationship REL_RATE_LIMITED = new Relationship.Builder().name("rate limited").description("FlowFiles are routed to 'rate limited' if the Rate Limit has been exceeded").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles are routed to 'failure' if unable to be sent to Slack for any other reason").build();
    private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet(Arrays.asList(REL_SUCCESS, REL_RATE_LIMITED, REL_FAILURE)));
    private final RateLimit rateLimit = new RateLimit(getLogger());
    private volatile ChannelMapper channelMapper;
    private volatile App slackApp;
    private volatile MethodsClient client;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @OnScheduled
    public void setup(ProcessContext processContext) {
        this.slackApp = createSlackApp(processContext);
        this.client = this.slackApp.client();
        this.channelMapper = new ChannelMapper(this.client);
    }

    @OnStopped
    public void shutdown() {
        if (this.slackApp != null) {
            this.slackApp.stop();
        }
    }

    private App createSlackApp(ProcessContext processContext) {
        return new App(AppConfig.builder().singleTeamBotToken(processContext.getProperty(ACCESS_TOKEN).getValue()).build());
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        String channelId;
        if (this.rateLimit.isLimitReached()) {
            getLogger().debug("Will not publish to Slack because rate limit has been reached");
            processContext.yield();
            return;
        }
        FlowFile flowFile = processSession.get();
        if (flowFile == null || (channelId = getChannelId(flowFile, processSession, processContext)) == null) {
            return;
        }
        if (PUBLISH_STRATEGY_CONTENT_AS_MESSAGE.getValue().equalsIgnoreCase(processContext.getProperty(PUBLISH_STRATEGY).getValue())) {
            publishContentAsMessage(flowFile, channelId, processContext, processSession);
        } else if (processContext.getProperty(SEND_CONTENT_AS_ATTACHMENT).asBoolean().booleanValue()) {
            publishAsFile(flowFile, channelId, processContext, processSession);
        } else {
            publishAsMessage(flowFile, channelId, processContext.getProperty(MESSAGE_TEXT).evaluateAttributeExpressions(flowFile).getValue(), processContext, processSession);
        }
    }

    private String getChannelId(FlowFile flowFile, ProcessSession processSession, ProcessContext processContext) {
        String value = processContext.getProperty(CHANNEL).evaluateAttributeExpressions(flowFile).getValue();
        if (value.isEmpty()) {
            getLogger().error("No Channel ID was given for {}; routing to failure", new Object[]{flowFile});
            processSession.transfer(flowFile, REL_FAILURE);
            return null;
        }
        if (!value.startsWith("#")) {
            return value;
        }
        try {
            String lookupChannelId = this.channelMapper.lookupChannelId(value);
            if (lookupChannelId != null) {
                return lookupChannelId;
            }
            getLogger().error("Could not find Channel with name {} for {}; routing to failure", new Object[]{value, flowFile});
            processSession.transfer(flowFile, REL_FAILURE);
            return null;
        } catch (Exception e) {
            getLogger().error("Failed to resolve Slack Channel ID for {}; transferring to {}", new Object[]{flowFile, handleClientException(value, flowFile, processSession, processContext, e), e});
            return null;
        }
    }

    private void publishContentAsMessage(FlowFile flowFile, String str, ProcessContext processContext, ProcessSession processSession) {
        if (flowFile.getSize() > 500000) {
            getLogger().error("Cannot send contents of FlowFile {} to Slack because its length exceeds 500,000 bytes; routing to 'failure'", new Object[]{flowFile});
            processSession.transfer(flowFile, REL_FAILURE);
            return;
        }
        String value = processContext.getProperty(CHARACTER_SET).evaluateAttributeExpressions(flowFile).getValue();
        byte[] bArr = new byte[(int) flowFile.getSize()];
        try {
            InputStream read = processSession.read(flowFile);
            Throwable th = null;
            try {
                try {
                    StreamUtils.fillBuffer(read, bArr, true);
                    String str2 = new String(bArr, value);
                    if (read != null) {
                        if (0 != 0) {
                            try {
                                read.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            read.close();
                        }
                    }
                    if (str2.length() <= 100000) {
                        publishAsMessage(flowFile, str, str2, processContext, processSession);
                    } else {
                        getLogger().error("Cannot send contents of FlowFile {} to Slack because its length exceeds 100,000 characters; routing to 'failure'", new Object[]{flowFile});
                        processSession.transfer(flowFile, REL_FAILURE);
                    }
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            getLogger().error("Failed to send contents of FlowFile {} to Slack; routing to failure", new Object[]{flowFile, e});
            processSession.transfer(flowFile, REL_FAILURE);
        }
    }

    private void publishAsMessage(FlowFile flowFile, String str, String str2, ProcessContext processContext, ProcessSession processSession) {
        try {
            ChatPostMessageResponse chatPostMessage = this.client.chatPostMessage(ChatPostMessageRequest.builder().channel(str).text(str2).threadTs(processContext.getProperty(THREAD_TS).evaluateAttributeExpressions(flowFile).getValue()).build());
            if (!chatPostMessage.isOk()) {
                getLogger().error("Could not send message to Slack for {} - received error: {}", new Object[]{flowFile, getErrorMessage(chatPostMessage.getError(), chatPostMessage.getNeeded(), chatPostMessage.getProvided(), chatPostMessage.getWarning())});
                processSession.transfer(flowFile, REL_FAILURE);
                return;
            }
            String ts = chatPostMessage.getTs();
            HashMap hashMap = new HashMap();
            hashMap.put("slack.ts", ts);
            hashMap.put("slack.channel.id", str);
            FlowFile putAllAttributes = processSession.putAllAttributes(flowFile, hashMap);
            processSession.getProvenanceReporter().send(putAllAttributes, "https://slack.com/api/chat.postMessage");
            processSession.transfer(putAllAttributes, REL_SUCCESS);
        } catch (Exception e) {
            getLogger().error("Failed to send message to Slack for {}; transferring to {}", new Object[]{flowFile, handleClientException(str, flowFile, processSession, processContext, e), e});
        }
    }

    private void publishAsFile(FlowFile flowFile, String str, ProcessContext processContext, ProcessSession processSession) {
        String attribute = flowFile.getAttribute(CoreAttributes.FILENAME.key());
        long longValue = processContext.getProperty(MAX_FILE_SIZE).asDataSize(DataUnit.B).longValue();
        if (flowFile.getSize() > longValue) {
            getLogger().warn("{} exceeds max allowable file size. Max File Size = {}; FlowFile size = {}; routing to 'failure'", new Object[]{flowFile, FormatUtils.formatDataSize(longValue), FormatUtils.formatDataSize(flowFile.getSize())});
            processSession.transfer(flowFile, REL_FAILURE);
            return;
        }
        try {
            byte[] bArr = new byte[(int) flowFile.getSize()];
            InputStream read = processSession.read(flowFile);
            Throwable th = null;
            try {
                try {
                    StreamUtils.fillBuffer(read, bArr, true);
                    if (read != null) {
                        if (0 != 0) {
                            try {
                                read.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            read.close();
                        }
                    }
                    FilesUploadV2Response filesUploadV2 = this.client.filesUploadV2(FilesUploadV2Request.builder().filename(attribute).title(attribute).initialComment(processContext.getProperty(MESSAGE_TEXT).evaluateAttributeExpressions(flowFile).getValue()).channel(str).threadTs(processContext.getProperty(THREAD_TS).evaluateAttributeExpressions(flowFile).getValue()).fileData(bArr).build());
                    if (!filesUploadV2.isOk()) {
                        getLogger().error("Could not upload contents of {} to Slack - received error: {}", new Object[]{flowFile, getErrorMessage(filesUploadV2.getError(), filesUploadV2.getNeeded(), filesUploadV2.getProvided(), filesUploadV2.getWarning())});
                        processSession.transfer(flowFile, REL_FAILURE);
                        return;
                    }
                    File.Shares shares = filesUploadV2.getFile().getShares();
                    String str2 = null;
                    if (shares != null) {
                        str2 = getTs(shares.getPrivateChannels());
                        if (str2 == null) {
                            str2 = getTs(shares.getPublicChannels());
                        }
                    }
                    HashMap hashMap = new HashMap();
                    hashMap.put("slack.channel.id", str);
                    if (str2 != null) {
                        hashMap.put("slack.ts", str2);
                    }
                    FlowFile putAllAttributes = processSession.putAllAttributes(flowFile, hashMap);
                    processSession.getProvenanceReporter().send(putAllAttributes, "https://slack.com/api/files.upload");
                    processSession.transfer(putAllAttributes, REL_SUCCESS);
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            getLogger().error("Could not upload contents of {} to Slack; routing to {}", new Object[]{flowFile, handleClientException(str, flowFile, processSession, processContext, e), e});
        }
    }

    private Relationship handleClientException(String str, FlowFile flowFile, ProcessSession processSession, ProcessContext processContext, Exception exc) {
        Relationship relationship = yieldOnRateLimit(exc, str, processContext) ? REL_RATE_LIMITED : REL_FAILURE;
        processSession.transfer(flowFile, relationship);
        return relationship;
    }

    private boolean yieldOnRateLimit(Throwable th, String str, ProcessContext processContext) {
        boolean isRateLimited = isRateLimited(th);
        if (isRateLimited) {
            getLogger().warn("Slack indicated that the Rate Limit has been exceeded when attempting to publish messages to channel {}", new Object[]{str});
        } else {
            getLogger().error("Failed to retrieve messages for channel {}", new Object[]{str, th});
        }
        this.rateLimit.retryAfter(Duration.ofSeconds(getRetryAfterSeconds(th)));
        processContext.yield();
        return isRateLimited;
    }

    public static String getErrorMessage(String str, String str2, String str3, String str4) {
        String str5 = str == null ? str4 : str;
        return (str2 == null || str3 == null) ? str5 : str5 + ": Permission needed: " + str2 + "; Permission granted to this bot: " + str3;
    }

    public static boolean isRateLimited(Throwable th) {
        return (th instanceof SlackApiException) && 429 == ((SlackApiException) th).getResponse().code();
    }

    public static int getRetryAfterSeconds(Throwable th) {
        if (th instanceof SlackApiException) {
            return Integer.parseInt(((SlackApiException) th).getResponse().header("Retry-After", "10"));
        }
        return 1;
    }

    private String getTs(Map<String, List<File.ShareDetail>> map) {
        if (map == null) {
            return null;
        }
        Iterator<List<File.ShareDetail>> it = map.values().iterator();
        while (it.hasNext()) {
            Iterator<File.ShareDetail> it2 = it.next().iterator();
            while (it2.hasNext()) {
                String ts = it2.next().getTs();
                if (ts != null) {
                    return ts;
                }
            }
        }
        return null;
    }
}
