Skip to main content

Slack integration (Javascript)

This document serves as a comprehensive tutorial for implementing a Slackbot integration that leverages ShinkaiNode to address and solve user queries effectively. Throughout this guide, you'll be walked through the step-by-step process of setting up a Slack Application, configuring environmental variables, and establishing a REST API server to handle Slack events. The tutorial covers the essentials of creating and managing Shinkai Jobs, processing Slack messages, and ensuring that responses from ShinkaiNode are accurately posted back to Slack threads. By following this tutorial, you'll enable a seamless interaction loop between Slack users and ShinkaiNode, allowing for real-time problem-solving and automation within your Slack channels.

Full Shinkai integration with Slack in Javascript (Typescript) is available here: https://github.com/dcSpark/shinkai-slackbot

Step-by-step flow​

  1. User posts Slack message on the main channel using
@SlackAppMention "message"
  1. Bot registers the message
  2. Bot creates Shinkai Job and then posts user message to Shinkai Node. Bot doesn't await for the response, it just passes the message to the node acknowledging Slack, the message was received by the bot.
  3. Node resolves the job at its own pace - node is capable of processing multiple jobs in parallel
  4. Bot monitors if each posted job was resolved on the node and if it is, then the bot service sends it back to Slack to the assigned thread.

Shinkai node setup​

In this tutorial, we assume you already have your own Shinkai Node ready and running and have access to its secret keys (specifically: profile_encryption_sk, profile_identity_sk and node_encryption_pk), so you can configure the bot and Shinkai Node communication.

Environmental variables setup​

To properly configure your environment for Shinkai Integrations for Slack, you need to set up the following environmental variables in your .env file:

# get from Shinkai Node setup
PROFILE_ENCRYPTION_SK=""
PROFILE_IDENTITY_SK=""
NODE_ENCRYPTION_PK=""

# the following values are based upon local setup
PROFILE_NAME="main"
DEVICE_NAME="main_device"
NODE_NAME="@@localhost.shinkai"

# shinkai node url
SHINKAI_NODE_URL="http://localhost:9550"

# get from Slack App setup
SLACK_BOT_TOKEN="xoxb-"

# available from Slack app home general settings, this parameter is optional unless we want to secure Slack commands
SLACK_SIGNING_SECRET=""

# default port is 3001
PORT=3001

ShinkaiManager setup​

Having in place implementation for Slack messages monitoring and Shinkai jobs and messages creation, we can take a look on how to configure ShinkaiManager to be able to receive messages. Moreover, to get information on how the whole Shinkai messages flow work.

In order to be able to use Shinkai Node dedicated methods, make sure to install official shinkai node typescript library

Let's look at the code for ShinkaiManager that already integrates all the necessary pieces.

Initializing ShinkaiManager based on Shinkai node data​

In order to start we must initialize Shinkai Manager object with all necessary variables (all values are described in environmental variables section)

  const shinkaiManager: ShinkaiManager = new ShinkaiManager(
config.encryptionSk,
config.signatureSk,
config.receiverPk,
config.nodeName,
config.profileName,
config.deviceName
);

Once ShinkaiManager is initialized, we can look at step-by-step flow of building messages for the node.

Creating jobs and sending Node Messages​

The flow from Shinkai Node is the following:

  1. Job is created using buildJobMessage() function. At this point there is call made to create_job endpoint on the node
  async buildCreateJob(agent: string): Promise<ShinkaiMessage> {
const job_scope: any = {
local: [],
vector_fs: [],
};

return await ShinkaiMessageBuilder.jobCreation(
job_scope,
this.encryptionSecretKey,
this.signatureSecretKey,
this.receiverPublicKey,
this.shinkaiName,
this.profileName,
this.shinkaiName,
agent
);
}
public async createJob(agent: string) {
const jobMessage = await this.buildCreateJob(agent);

let resp = await postData(JSON.stringify(jobMessage), "/v1/create_job");

if (resp.status === "success") {
return resp.data;
} else {
throw new Error(`Job creation failed: ${resp}`);
}
}

In this part it's important to note that the jobCreation generates job format that is later acceptable by node endpoint.

  1. In the reposnse, we're getting new jobId. Then having jobId in place, there must be created job message.
async buildJobMessage(messageContent: string, job_id: string): Promise<any> {
return await ShinkaiMessageBuilder.jobMessage(
job_id,
messageContent,
"",
"",
this.encryptionSecretKey,
this.signatureSecretKey,
this.receiverPublicKey,
this.shinkaiName,
this.profileName,
this.shinkaiName,
""
);
}

public async sendMessage(content: string, job_id: string) {
const message_job = await this.buildJobMessage(content, job_id);

let resp = await postData(message_job, "/v1/job_message");

if (resp.status === "success") {
return resp.data;
} else {
throw new Error(`Job creation failed:: ${resp}`);
}
}

Can I keep the context of the main thread?​

When integrating Shinkai integration with Slack we can ensure that all messages inside one main thread are kept inside one context and our node generates more accurate responses.

note

Once job message is sent to the node, the node can proceed to processing the job. One important factor here is that if we're integrating service with threads where we can acknowledge parent thread id, each thread can be assigned the same job. Having such setup (so passing the same jobId for the same messages in the thread), we can keep the context of the thread and make sure it's remembered by the node.

Monitoring responses from the node​

Current node integration uses http REST endpoint to fetch messages from the node. In order to be able to identify responses for the specific jobs, we must remember on the bot side, what is the jobId that we're awaiting for the answer. This is the main reason for storing all unresolved jobs in the activeJobs.

In order to be able to detect responses constantly, we need to have a function running infinitely and monitoring node responses per each job

note

In the future, there will be introduced websockets integration with the node which will allow to ease current approach.

How to parse messages from the node per each job?​

When fetching information from the node about current responses, we need to generate inboxName. For that we have specific function which needs to be called. Then, once responses for the inbox are fetched, we need to parse messages and identify if the message for a job was already resolved.

Here's exemplary implementation of a function that performs those steps:

public async getMessages(jobId: string) {
try {
const inbox = InboxName.getJobInboxNameFromParams(jobId).value;
const message = await this.buildGetMessagesForInbox(inbox);
let resp = await postData(message, "/v1/last_messages_from_inbox");

if (resp.data.length === 1) {
console.log("There's no answer available yet.");
return "";
}

// we only care about the latest response
const latestMessage = resp.data[resp.data.length - 1];
const isJobMessage =
// message we're interested in has specific JobMessageSchema
latestMessage.body.unencrypted.message_data.unencrypted
.message_content_schema === MessageSchemaType.JobMessageSchema &&
// sender_subidentity is empty for responses from the node
latestMessage.body.unencrypted.internal_metadata.sender_subidentity ===
"";

if (isJobMessage) {
const parsedMessage = JSON.parse(
resp.data[resp.data.length - 1].body.unencrypted.message_data
.unencrypted.message_raw_content
);
return parsedMessage?.content ?? "";
}
} catch (err) {
const error = err as Error;
console.error(error.message);
}

return "";
}

The getMessages function is designed to retrieve messages associated with a specific job ID from a messaging service. It performs several key operations and checks to ensure that it fetches the correct message data. Here's a high-level breakdown of its functionality:

  1. Inbox Retrieval: It starts by determining the inbox associated with the given job ID. This is achieved by using the InboxName.getJobInboxNameFromParams(jobId) method, which sets the inbox name based on the job ID.

  2. Message Fetching: With the inbox name determined, it constructs a request to fetch messages from this inbox by calling this.buildGetMessagesForInbox(inbox). This request is then sent to a specific endpoint (/v1/last_messages_from_inbox) to retrieve the messages.

  3. Response Handling: Upon receiving the response, the function first checks if there are messages available. If only one message is found, it assumes that there's no answer available yet and logs a message indicating this situation.

  4. Latest Message Analysis: If more than one message is present, it focuses on the latest message (the last one in the response array). This is based on the assumption that the most recent message is the most relevant in the context of a job's communication flow.

  5. Message Validation (isJobMessage Check): Before processing the latest message, it performs a crucial validation to ensure that the message is indeed related to the job in question. This validation, referred to as the isJobMessage check, involves two conditions:

    • The message must have a specific schema (JobMessageSchema), indicating that it is a job-related message.
    • The message's sender subidentity must be empty, which is a criterion for responses from the node.
  6. Message Parsing: If the message passes the isJobMessage check, it parses the message content from the raw message data. It specifically looks for the content field within the parsed data, which holds the actual message content related to the job.

  7. Error Handling: Throughout the process, the function is wrapped in a try-catch block to handle any potential errors gracefully. If an error occurs, it logs the error message and returns an empty string as a fallback.

In summary, the getMessages function is a critical component for fetching and validating the latest job-related message from a specified inbox, ensuring that only relevant and properly formatted messages are processed further.

Exemplary implementation on how to fetch and react for responses from the node​

Let's look at the following function that has infinite loop, which verifies earlier saved jobs that are being processed in the node and then resolves them by posting their responses to specific Slack Thread.


public getNodeResponses = async (
slackBot?: SlackBot
): Promise<string | undefined> => {
// once request about the job is saved in activeJobs, we want to monitor status of this and clear activeJobs position once job is resolved
// also if the job is resolved thanks to activeJobs, we can post answer on Slack to specific threads
while (true) {
if (this.activeJobs.length === 0) {
await delay(1_000);
continue;
}
console.log(
`Number of active jobs awaiting for response: ${this.activeJobs.length}`
);
for (const job of this.activeJobs) {
try {
let nodeResponse = await this.getMessages(job.shinkaiJobId);

// true by default, because if we don't wait for confirmation from another service (like Slack), we don't care about this condition
let wasMessagePostedInExternalService = true;
if (nodeResponse) {
// waiting for external service confirmation that the message was posted to where it supposed to
// for further integrations this part can be replaced/adjusted to whatever service we're integrating with
if (slackBot !== undefined) {
let wasMessagePostedInExternalService = false;
const slackMessageResponse = (await slackBot.postMessageToThread(
job.slackChannelId,
job.slackThreadId,
nodeResponse
)) as WebAPICallResult;

if (slackMessageResponse.ok) {
wasMessagePostedInExternalService = true;
}
}

if (wasMessagePostedInExternalService) {
// Remove job from activeJobs once processed
this.activeJobs = this.activeJobs.filter(
(j) => j.shinkaiJobId !== job.shinkaiJobId
);
}
}
} catch (error) {
// console.error(error);
console.log(`Response for jobId: ${job.shinkaiJobId} not available`);
}
}

// set the delay based on how often we expect responses to resolve on the node
await delay(1000);
}
};

getNodeResponses is an asynchronous method that continuously monitors active jobs, retrieves responses from a node, and posts these responses to Slack threads if applicable. It ensures that job responses are correctly handled and communicated to the relevant Slack channels or threads.

Continous monitoring The function operates within a while (true) loop, indicating it will run indefinitely until manually stopped. This design allows for continuous monitoring of new job responses.

Checking Active Jobs

At each loop iteration's start, the function checks for active jobs in the this.activeJobs array. If no active jobs are found, it waits for 1 second (await delay(1_000)) before rechecking, conserving resources when idle.

Processing Each Active Job

For every active job, the function attempts to retrieve the job's response using this.getMessages(job.shinkaiJobId), encapsulated in a try-catch block for error handling.

Handling Node Responses

Upon receiving a response (nodeResponse), the function checks for a provided slackBot instance. If present, it tries to post the response to the specified Slack thread (job.slackChannelId and job.slackThreadId). The operation's success is tracked by wasMessagePostedInExternalService, initially false and updated based on the Slack post's success.

Updating Job Status and Analytics

After successfully posting to Slack (or if no posting is required), the job is removed from the activeJobs array to avoid reprocessing. Additionally, the job's analytics data is updated or created in this.archiveJobsAnalytics, offering insights into job handling and response times.

Loop Iteration Delay

After processing all active jobs, the function waits for a configurable delay (default 1 second) before the next iteration. This delay is intended for future enhancements, such as adjusting the delay duration based on system load.

Conclusion

The getNodeResponses function is an essential mechanism for ensuring efficient monitoring, retrieval, and communication of job responses in a system integrated with Slack. By managing active jobs, handling responses, and updating job statuses and analytics, it plays a crucial role in the job management process within the ShinkaiManager class.

Setting up Slack Application​

In order to be able to have full Slack integration, we need to get details about Slack Application token and setup necessary configurations. Bot requires 2 pieces of information:

  • Slack bot token (xoxb-): To retrieve your Slack bot token, first navigate to the Slack API website and log in to your account. Then, create a new app or select an existing one from your apps list. Under the 'Features' section in the sidebar, click on 'OAuth & Permissions'. Scroll down to the 'Bot Token Scopes' section and add the necessary scopes for your bot. After configuring the scopes, scroll up to the 'OAuth Tokens for Your Workspace' section and click the 'Install to Workspace' button. Follow the prompts to install the app to your workspace, and you will be provided with a bot token starting with xoxb-. This token is what you'll use to authenticate your bot with the Slack API.

  • Slack channel id, where the bot is going to be installed

Slack integration inside the code​

In order to be able to setup Slack bot pieces we need REST API server with /slack/events/ endpoint which is supported by Event Subscriptions. This endpoint accepts 2 types of payloads:

  • message from the node - this part of the logic endpoint accepts message posted by the user
  • challenge - this part is required by Slack to identify events endpoint

Exemplary implementation of Express.js server, with such endpoint is the following:

import express from "express";
import cors from "cors";
import { SlackBot, SlackMessageResponse, SlackRequest } from "./slack";
import { ShinkaiManager } from "./shinkai_manager";
import axios from "axios";
import crypto from "crypto";
import { config } from "./config";

import storage from "node-persist";
import { PersistenStorage } from "./utils";

interface SlackEventApiRequestBodyContent {
type: "app_mention" | "message";
client_msg_id: string;
text: string;
user: string;
ts: string;
blocks: Array<{
type: string;
block_id: string;
elements: Array<any>;
}>;
team: string;
channel: string;
event_ts: string;
channel_type: string;
thread_ts?: string;
}

export class WebServer {
public app: express.Application;
private shinkaiManager: ShinkaiManager;
private slackBot: SlackBot;
threadJobMapping: { [threadId: string]: string };

// the purpose of this is to allow parallelisation, so end user can perform multiple jobs (for example ask questions)
// and the node will reply to all of those in parallel manner - hence we need to store the ones we didn't get answers to
// Once we get answer/response from the node in the inbox to specific job, we know to which thread we should post it and then we remove this job from the array
constructor(
shinkaiManager: ShinkaiManager,
slackBot: SlackBot,
threadJobMapping: { [threadId: string]: string } | undefined
) {
this.app = express();
this.app.use(cors());
this.app.use(express.json());
this.app.use(express.urlencoded({ extended: true }));
this.shinkaiManager = shinkaiManager;
this.slackBot = slackBot;
if (threadJobMapping === undefined) {
this.threadJobMapping = {};
} else {
this.threadJobMapping = threadJobMapping;
}


// Endpoint for handling Event API (so we can use mentions)
this.app.post("/slack/events", async (req: any, res: any) => {
try {
const json_data = req.body as any;

// URL Verification (important for slack setup)
if ("challenge" in json_data) {
return res.json({ challenge: json_data["challenge"] });
}

// if we don't send 200 immediately, then Slack itself sends duplicated messages (there's no way to configure it on Slack settings)
res.status(200).send();

const event = json_data.event as SlackEventApiRequestBodyContent;
if (
event.type === "app_mention" &&
"text" in event &&
json_data.api_app_id === process.env.SLACK_APP_ID
) {
// cleanup the message (there's <@USER_APP_ID> as a prefix added each time we send something)
const message = event.text?.replace(/<@([A-Z0-9]+)>/, "");
console.log(`Extracted message: ${message}`);

if (message !== undefined || message !== "") {
let threadId = event.ts;

// if we start conversation from scratch we take `event.ts` value
// however if we are inside already started conversation we need to take `event.thread_ts` field value
if (event.thread_ts !== undefined) {
threadId = event.thread_ts;
} else {
threadId = event.ts;
}

// make sure there's always thread defined
if (threadId === undefined || threadId === null) {
throw new Error(
`Couldn't identify thread for reply. thread_ts: ${threadId}`
);
}

const existingJobId = this.threadJobMapping[threadId];
let jobId = "";
if (existingJobId !== undefined) {
console.log(
`Thread ${threadId} already has existing job id assigned: ${existingJobId}`
);
jobId = existingJobId;
} else {
// create shinkai job
console.log(`Creating job id`);
jobId = await this.shinkaiManager.createJob("main/agent/my_gpt");

// assign job id for the fuut
this.threadJobMapping[threadId] = jobId;

await storage.updateItem(
PersistenStorage.ThreadJobMapping,
this.threadJobMapping
);
}
console.log("### Job ID:", jobId);
this.shinkaiManager.activeJobs.push({
message: message,
slackThreadId: threadId,
slackChannelId: event.channel,
shinkaiJobId: jobId,
});

// send job message to the node
let answer = await this.shinkaiManager.sendMessage(message, jobId);
console.log("### Answer:", answer);
} else {
throw new Error(
`${message} was not provided. Nothing to pass to the node.`
);
}
}
return res.status(200).send();
} catch (err) {
// handling errors
}
});

start(port: number) {
this.app.listen(port, () => {
console.log(`Server is running on port ${port}`);
});
}
}

Handling Slack Events with /slack/events Endpoint​

The /slack/events endpoint is designed to facilitate communication between your Slack bot and the Slack API, specifically for handling events such as messages and app mentions within Slack. When configured as part of the Slack app's Event Subscriptions, this endpoint allows your bot to react to and process incoming events in real-time.

Key Features​

  • Event Subscription: By subscribing to specific events (e.g., messages, app mentions), your Slack bot can listen for and respond to these events through this endpoint.
  • Message Processing: Upon receiving an event, the endpoint extracts the relevant information, such as the message text and sender, to process the request. This includes cleaning up the message to remove any bot user IDs mentioned in the text.
  • Job Creation and Management: For each valid event, a corresponding job is created in ShinkaiNode. This job is then tracked with a unique ID, allowing for asynchronous processing and response handling. The endpoint maintains a mapping between Slack thread IDs and ShinkaiNode job IDs to ensure responses are posted back to the correct conversation thread in Slack.

Implementation Details​

  • The endpoint uses Express.js for handling HTTP POST requests from Slack.
  • It employs a verification step to ensure requests are legitimate, responding to Slack's URL verification challenge when setting up the Event Subscriptions.
  • The endpoint is designed to handle high concurrency, allowing multiple jobs to be processed in parallel and responses to be managed efficiently.

By integrating the /slack/events endpoint into your Slack bot, you enable a powerful bridge between Slack and ShinkaiNode, allowing for interactions and automation within your Slack channels.

Slack SDK exemplary implementation of the methods​

Here are a few integrations with Slack SDK

 // Function to post a message to a specific channel
public async postMessageToChannel(
channelId: string,
text: string
): Promise<WebAPICallResult | undefined> {
try {
const result = await this.client.chat.postMessage({
channel: channelId,
text: text,
});
return result;
} catch (error) {
console.error(
`Error posting message to channel: ${
(error as WebAPICallError).message
}`
);
}
}

// Function to post a message to a specific thread within a channel
public async postMessageToThread(
channelId: string,
threadTs: string,
text: string
): Promise<WebAPICallResult | Error> {
try {
const result = await this.client.chat.postMessage({
channel: channelId,
thread_ts: threadTs,
text: text,
});

if (result.ok) {
console.log(
`Response from the node: ${text} posted to channelId: ${channelId} successfully.`
);
}
return result;
} catch (error) {
return new Error(
`Error posting message to thread: ${(error as WebAPICallError).message}`
);
}
}