Skip to main content

Slack integration (Python)

This document serves as a comprehensive tutorial for implementing a Slackbot integration that leverages ShinkaiNode to address and solve user queries effectively. Throughout this guide, you'll be walked through the step-by-step process of setting up a Slack Application, configuring environmental variables, and establishing a REST API server to handle Slack events. The tutorial covers the essentials of creating and managing Shinkai Jobs, processing Slack messages, and ensuring that responses from ShinkaiNode are accurately posted back to Slack threads. By following this tutorial, you'll enable a seamless interaction loop between Slack users and ShinkaiNode, allowing for real-time problem-solving and automation within your Slack channels.

Full Shinkai integration with Slack in Python is available here: https://github.com/dcSpark/shinkai-slackbot-python

Step-by-step flow​

  1. User posts Slack message on the main channel using @SlackApp "message"
  2. Bot registers the message
  3. Bot creates Shinkai Job and then posts user message to Shinkai Node
  4. Node resolves the job
  5. Response for the job is posted on Slack under the thread where the question was made (in the same thread)

Shinkai node setup​

In this tutorial, we assume you already have your own Shinkai Node ready and running and have access to its secret keys (specifically: profile_encryption_sk, profile_identity_sk and node_encryption_pk), so you can configure the bot and Shinkai Node communication.

Environmental variables setup​

To properly configure your environment for Shinkai Integrations for Slack, you need to set up the following environmental variables in your .env file:

# get from Shinkai Node setup
PROFILE_ENCRYPTION_SK=""
PROFILE_IDENTITY_SK=""
NODE_ENCRYPTION_PK=""

# the following values are based upon local setup
PROFILE_NAME="main"
DEVICE_NAME="main_device"
NODE_NAME="@@localhost.shinkai"

# shinkai node url
SHINKAI_NODE_URL="http://localhost:9550"

# get from Slack App setup
SLACK_BOT_TOKEN="xoxb-"

# available from Slack app home general settings, this parameter is optional unless we want to secure Slack commands
SLACK_SIGNING_SECRET=""

# default port is 3001
PORT=3001

ShinkaiManager setup​

Having in place implementation for Slack messages monitoring and Shinkai jobs and messages creation, we can take a look on how to configure ShinkaiManager to be able to receive messages. Moreover, to get information on how the whole Shinkai messages flow work.

note

In order to be able to use Shinkai Node dedicated methods, you need to install ShinkaiNode Python library which is compiled using Pyo3. More details on compilation of this piece soon.

Initializing ShinkaiManager based on Shinkai node data​

In order to start we must initialize Shinkai Manager object with all necessary variables (all values are described in environmental variables section)


shinkai_manager = ShinkaiManager(
encryption_sk=os.getenv("PROFILE_ENCRYPTION_SK"),
signature_sk=os.getenv("PROFILE_IDENTITY_SK"),
receiver_pk=os.getenv("NODE_ENCRYPTION_PK"),
shinkai_name=os.getenv("NODE_NAME"),
profile_name=os.getenv("PROFILE_NAME"),
device_name=os.getenv("DEVICE_NAME")
)

Once ShinkaiManager is initialized, we can look at step-by-step flow of building messages for the node.

Creating jobs and sending Node Messages​

The flow from Shinkai Node is the following:

  1. Job is created using build_create_message() function. At this point there is call made to create_job endpoint on the node
async def build_create_job(self, agent: str):
try:
job_scope = PyJobScope()

return PyShinkaiMessageBuilder.job_creation(
self.encryption_secret_key,
self.signature_secret_key,
self.receiver_public_key,
job_scope,
False,
self.shinkai_name,
self.profile_name,
self.shinkai_name,
agent
)

except Exception as e:
print(f"Error on job_creation: {str(e)}")
return None

In this part it's important to note that the job_creation generates job format that is later acceptable by node endpoint.

  1. In the reposnse, we're getting new jobId. Then having jobId in place, there must be created job message.
async def build_job_message(self, message_content: str, job_id: str):
return PyShinkaiMessageBuilder.job_message(
job_id,
message_content,
"",
"",
self.encryption_secret_key,
self.signature_secret_key,
self.receiver_public_key,
self.shinkai_name,
self.profile_name,
self.shinkai_name,
""
)

Once job message is sent to the node, the node can proceed to processing the job. One important factor here is that if we're integrating service with threads where we can acknowledge parent thread id, each thread can be assigned the same job. Having such setup (so passing the same jobId for the same messages in the thread), we can keep the context of the thread and make sure it's remembered by the node.

Monitoring responses from the node​

Current node integration uses http REST endpoint to fetch messages from the node. In order to be able to identify responses for the specific jobs, we must remember on the bot side, what is the jobId that we're awaiting for the answer. This is the main reason for storing all unresolved jobs in the activeJobs.

In order to be able to detect responses constantly, we need to have a function running infinitely and monitoring node responses per each job

Note: In the future, there will be introduced websockets integration with the node which will allow to ease current approach.

How to parse messages from the node per each job?​

When fetching information from the node about current responses, we need to generate inboxName. For that we have specific function which needs to be called. Then, once responses for the inbox are fetched, we need to parse messages and identify if the message for a job was already resolved.

Here's exemplary implementation of a function that performs those steps:

async def get_messages(self, job_id: str, agent: str):
try:
inbox = PyShinkaiMessageBuilder.get_last_messages_from_inbox(
self.encryption_secret_key,
self.signature_secret_key,
self.receiver_public_key,
"job_inbox::" + job_id + "::false",
10,
self.shinkai_name,
self.profile_name,
self.shinkai_name,
agent,
None)

job_message_dict = json.loads(inbox)

resp = await post_data(json.dumps(job_message_dict), "/v1/last_messages_from_inbox")

if len(resp["data"]) == 1 or len(resp["data"]) == 0:
print("There's no answer available yet.")
return ""

latest_message = resp["data"][-1]
is_job_message = latest_message["body"]["unencrypted"]["message_data"]["unencrypted"]["message_content_schema"] == "JobMessageSchema" and latest_message["body"]["unencrypted"]["internal_metadata"]["sender_subidentity"] == ""
if is_job_message:
parsed_message = json.loads(latest_message["body"]["unencrypted"]["message_data"]["unencrypted"]["message_raw_content"])
return parsed_message.get("content", "")
except Exception as e:
print(f"Error getting messages for job {job_id}: {str(e)}")
return ""

The get_messages function is designed to retrieve messages associated with a specific job ID from a messaging service. It performs several key operations and checks to ensure that it fetches the correct message data. Here's a high-level breakdown of its functionality:

  1. Inbox Retrieval: It starts by determining the inbox associated with the given job ID. This is achieved by using the InboxName.getJobInboxNameFromParams(job_id) method, which sets the inbox name based on the job ID.

  2. Message Fetching: With the inbox name determined, it constructs a request to fetch messages from this inbox by calling this.buildGetMessagesForInbox(inbox). This request is then sent to a specific endpoint (/v1/last_messages_from_inbox) to retrieve the messages.

  3. Response Handling: Upon receiving the response, the function first checks if there are messages available. If only one message is found, it assumes that there's no answer available yet and logs a message indicating this situation.

  4. Latest Message Analysis: If more than one message is present, it focuses on the latest message (the last one in the response array). This is based on the assumption that the most recent message is the most relevant in the context of a job's communication flow.

  5. Message Validation (is_job_message Check): Before processing the latest message, it performs a crucial validation to ensure that the message is indeed related to the job in question. This validation, referred to as the is_job_message check, involves two conditions:

    • The message must have a specific schema (JobMessageSchema), indicating that it is a job-related message.
    • The message's sender subidentity must be empty, which is a criterion for responses from the node.
  6. Message Parsing: If the message passes the is_job_message check, it parses the message content from the raw message data. It specifically looks for the content field within the parsed data, which holds the actual message content related to the job.

  7. Error Handling: Throughout the process, the function is wrapped in a try-catch block to handle any potential errors gracefully. If an error occurs, it logs the error message and returns an empty string as a fallback.

In summary, the get_messages function is a critical component for fetching and validating the latest job-related message from a specified inbox, ensuring that only relevant and properly formatted messages are processed further.

Exemplary implementation on how to fetch and react for responses from the node​

Let's look at the following function that has infinite loop, which verifies earlier saved jobs that are being processed in the node and then resolves them by posting their responses to specific Slack Thread.

async def get_node_responses(self, slack_bot=None) -> Optional[str]:
from main import shutdown_event
while not shutdown_event.is_set():
if len(self.active_jobs) == 0:
await asyncio.sleep(1)
continue
print(f"Number of active jobs awaiting for response: {len(self.active_jobs)}")
for job in self.active_jobs:
print(f"checking node responses for {job.shinkai_job_id}")
try:
node_response = await self.get_messages(job.shinkai_job_id, "main/agent/my_gpt")
print(node_response)
was_message_posted_in_external_service = True
if node_response:
if slack_bot is not None:
was_message_posted_in_external_service = False
slack_message_response = await slack_bot.post_message_to_thread(job.slack_channel_id, job.slack_thread_id, node_response)
if slack_message_response.get("ok"):
was_message_posted_in_external_service = True
if was_message_posted_in_external_service:
self.active_jobs = [j for j in self.active_jobs if j.shinkai_job_id != job.shinkai_job_id]
except Exception as e:
print(f"Response for job_id: {job.shinkai_job_id} not available: {str(e)}")
await asyncio.sleep(1)

get_node_responses is an asynchronous method that continuously monitors active jobs, retrieves responses from a node, and posts these responses to Slack threads if applicable. It ensures that job responses are correctly handled and communicated to the relevant Slack channels or threads.

Continous monitoring The function operates within a while (true) loop, indicating it will run indefinitely until manually stopped. This design allows for continuous monitoring of new job responses.

Checking Active Jobs

At each loop iteration's start, the function checks for active jobs in the this.active_jobs array. If no active jobs are found, it waits for 1 second (await delay(1_000)) before rechecking, conserving resources when idle.

Processing Each Active Job

For every active job, the function attempts to retrieve the job's response using this.get_messages(job.shinkai_job_id), encapsulated in a try-catch block for error handling.

Handling Node Responses

Upon receiving a response (get_node_response), the function checks for a provided slackBot instance. If present, it tries to post the response to the specified Slack thread (job.slack_channel_id and job.slack_thread_id). The operation's success is tracked by was_message_posted_in_external_service, initially false and updated based on the Slack post's success.

Updating Job Status and Analytics

After successfully posting to Slack (or if no posting is required), the job is removed from the active_jobs array to avoid reprocessing.

Loop Iteration Delay

After processing all active jobs, the function waits for a configurable delay (default 1 second) before the next iteration. This delay is intended for future enhancements, such as adjusting the delay duration based on system load.

Conclusion

The get_node_response function is an essential mechanism for ensuring efficient monitoring, retrieval, and communication of job responses in a system integrated with Slack. By managing active jobs, handling responses, and updating job statuses and analytics, it plays a crucial role in the job management process within the ShinkaiManager class.

Setting up Slack Application​

In order to be able to have full Slack integration, we need to get details about Slack Application token and setup necessary configurations. Bot requires 2 pieces of information:

  • Slack bot token (xoxb-): To retrieve your Slack bot token, first navigate to the Slack API website and log in to your account. Then, create a new app or select an existing one from your apps list. Under the 'Features' section in the sidebar, click on 'OAuth & Permissions'. Scroll down to the 'Bot Token Scopes' section and add the necessary scopes for your bot. After configuring the scopes, scroll up to the 'OAuth Tokens for Your Workspace' section and click the 'Install to Workspace' button. Follow the prompts to install the app to your workspace, and you will be provided with a bot token starting with xoxb-. This token is what you'll use to authenticate your bot with the Slack API.

  • Slack channel id, where the bot is going to be installed

Slack integration inside the code​

In order to be able to setup Slack bot pieces we need REST API server with /slack/events/ endpoint which is supported by Event Subscriptions. This endpoint accepts 2 types of payloads:

  • message from the node - this part of the logic endpoint accepts message posted by the user
  • challenge - this part is required by Slack to identify events endpoint

Exemplary implementation of FastAPI server, with such endpoint is the following:

slack_client = WebClient(token=os.getenv("SLACK_BOT_TOKEN"))
signature_verifier = SignatureVerifier(signing_secret=os.getenv("SLACK_SIGNING_SECRET"))

app = FastAPI()

@app.on_event("shutdown")
async def app_shutdown():
from main import shutdown_event
shutdown_event.set()

# Global variable to store thread to job mapping (this can be initialized to {} or loaded from file if file was created)
thread_job_mapping: Dict[str, str] = load_thread_job_mapping()


# Modify the FastAPI app initialization to accept ShinkaiManager instance:
def create_app(shinkai_manager: ShinkaiManager):
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

app.state.shinkai_manager = shinkai_manager
init_routes(app)
return app

def init_routes(app: FastAPI):
@app.post("/slack/events")
async def slack_events(request: Request):
try:
json_data = await request.json()

# URL Verification (important for slack setup)
if "challenge" in json_data:
return {"challenge": json_data["challenge"]}

global seen_event_times
if 'seen_event_times' not in globals():
seen_event_times = set()

event_time = json_data.get("event_time")
if event_time in seen_event_times:
print(f"Duplicate event detected: {event_time}, skipping processing.")
return JSONResponse(content={"status": "ok"}, status_code=200)
else:
seen_event_times.add(event_time)
print(f"Processing new event: {event_time}")

event = json_data.get("event", {})
if event.get("type") == "app_mention" and "text" in event and json_data.get("api_app_id") == os.getenv("SLACK_APP_ID"):
# cleanup the message (there's <@USER_APP_ID> as a prefix added each time we send something)
message = event.get("text", "").replace("<@([A-Z0-9]+)>", "")
print(f"Extracted message: {message}")

if message:
thread_id = event.get("thread_ts") or event.get("ts")

if thread_id is None:
raise ValueError("Couldn't identify thread for reply. thread_ts: {}".format(thread_id))

existing_job_id = thread_job_mapping.get(thread_id)
if existing_job_id:
print(f"Thread {thread_id} already has existing job id assigned: {existing_job_id}")
job_id = existing_job_id
else:
# create shinkai job
print("Creating job id...")
job_id = await app.state.shinkai_manager.create_job("main/agent/my_gpt")

# assign job id for the future
thread_job_mapping[thread_id] = job_id

# make thread_job_mapping persistent update it here
save_thread_job_mapping(thread_job_mapping)

print(f"### Job ID: {job_id}")

# send job message to the node
answer = await app.state.shinkai_manager.send_message(message, job_id)

app.state.shinkai_manager.active_jobs.append(SlackJobAssigned(message=message, shinkai_job_id=job_id, slack_thread_id=thread_id, slack_channel_id=event.get("channel"), start_timestamp=int(datetime.now().timestamp())))

print(f"### Answer: {answer}")
else:
raise ValueError(f"{message} was not provided. Nothing to pass to the node.")
return JSONResponse(content={"status": "ok"}, status_code=200)
except Exception as e:
print(e)
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=400)

Handling Slack Events with /slack/events Endpoint​

The /slack/events endpoint is designed to facilitate communication between your Slack bot and the Slack API, specifically for handling events such as messages and app mentions within Slack. When configured as part of the Slack app's Event Subscriptions, this endpoint allows your bot to react to and process incoming events in real-time.

Key Features​

  • Event Subscription: By subscribing to specific events (e.g., messages, app mentions), your Slack bot can listen for and respond to these events through this endpoint.
  • Message Processing: Upon receiving an event, the endpoint extracts the relevant information, such as the message text and sender, to process the request. This includes cleaning up the message to remove any bot user IDs mentioned in the text.
  • Job Creation and Management: For each valid event, a corresponding job is created in ShinkaiNode. This job is then tracked with a unique ID, allowing for asynchronous processing and response handling. The endpoint maintains a mapping between Slack thread IDs and ShinkaiNode job IDs to ensure responses are posted back to the correct conversation thread in Slack.

Implementation Details​

  • The endpoint uses Express.js for handling HTTP POST requests from Slack.
  • It employs a verification step to ensure requests are legitimate, responding to Slack's URL verification challenge when setting up the Event Subscriptions.
  • The endpoint is designed to handle high concurrency, allowing multiple jobs to be processed in parallel and responses to be managed efficiently.

By integrating the /slack/events endpoint into your Slack bot, you enable a powerful bridge between Slack and ShinkaiNode, allowing for interactions and automation within your Slack channels.

Slack SDK exemplary implementation of the methods​

Here are a few integrations with Slack SDK

from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import os

class SlackBot:
def __init__(self, is_testing=False):
self.is_testing = is_testing or os.getenv("NODE_ENV") == "test"
self.token = os.getenv("SLACK_BOT_TOKEN", "")
if not self.is_testing and not self.token:
raise ValueError("SLACK_BOT_TOKEN env not defined. SLACK_BOT_TOKEN: {}".format(self.token))

self.client = WebClient(token=self.token)

async def post_message_to_channel(self, channel_id, text):
try:
response = await self.client.chat_postMessage(channel=channel_id, text=text)
return response
except SlackApiError as e:
print(f"Error posting message to channel: {e.response['error']}")

async def post_message_to_thread(self, channel_id, thread_ts, text):
try:
response = self.client.chat_postMessage(channel=channel_id, thread_ts=thread_ts, text=text)
if response["ok"]:
print(f"Response from the node: {text}; posted to channelId: {channel_id} successfully.")
print(f"message sent to Slack")
return response
except SlackApiError as e:
print(f"Error posting message to thread: {e.response['error']}")