Module slack_bolt.request.internals
Expand source code
import json
from typing import Optional, Dict, Union, Any, Sequence
from urllib.parse import parse_qsl, parse_qs
from slack_bolt.context import BoltContext
def parse_query(
query: Optional[Union[str, Dict[str, str], Dict[str, Sequence[str]]]]
) -> Dict[str, Sequence[str]]:
if query is None:
return {}
elif isinstance(query, str):
return parse_qs(query)
elif isinstance(query, dict) or hasattr(query, "items"):
result: Dict[str, Sequence[str]] = {}
for name, value in query.items():
if isinstance(value, list):
result[name] = value
elif isinstance(value, str):
result[name] = [value]
else:
raise ValueError(
f"Unsupported type ({type(value)}) of element in headers ({query})"
)
return result # type: ignore
else:
raise ValueError(f"Unsupported type of query detected ({type(query)})")
def parse_body(body: str, content_type: Optional[str]) -> Dict[str, Any]:
if not body:
return {}
if (
content_type is not None and content_type == "application/json"
) or body.startswith("{"):
return json.loads(body)
else:
if "payload" in body: # This is not JSON format yet
params = dict(parse_qsl(body))
if params.get("payload") is not None:
return json.loads(params.get("payload"))
else:
return {}
else:
return dict(parse_qsl(body))
def extract_is_enterprise_install(payload: Dict[str, Any]) -> Optional[bool]:
if "is_enterprise_install" in payload:
is_enterprise_install = payload.get("is_enterprise_install")
return is_enterprise_install is not None and (
is_enterprise_install is True or is_enterprise_install == "true"
)
return False
def extract_enterprise_id(payload: Dict[str, Any]) -> Optional[str]:
if payload.get("enterprise") is not None:
org = payload.get("enterprise")
if isinstance(org, str):
return org
elif "id" in org:
return org.get("id") # type: ignore
if payload.get("authorizations") is not None and len(payload["authorizations"]) > 0:
# To make Events API handling functioning also for shared channels,
# we should use .authorizations[0].enterprise_id over .enterprise_id
return extract_enterprise_id(payload["authorizations"][0])
if "enterprise_id" in payload:
return payload.get("enterprise_id")
if payload.get("team") is not None and "enterprise_id" in payload["team"]:
# In the case where the type is view_submission
return payload["team"].get("enterprise_id")
if payload.get("event") is not None:
return extract_enterprise_id(payload["event"])
return None
def extract_team_id(payload: Dict[str, Any]) -> Optional[str]:
if payload.get("team") is not None:
team = payload.get("team")
if isinstance(team, str):
return team
elif team and "id" in team:
return team.get("id")
if payload.get("authorizations") is not None and len(payload["authorizations"]) > 0:
# To make Events API handling functioning also for shared channels,
# we should use .authorizations[0].team_id over .team_id
return extract_team_id(payload["authorizations"][0])
if "team_id" in payload:
return payload.get("team_id")
if payload.get("event") is not None:
return extract_team_id(payload["event"])
if payload.get("user") is not None:
return payload.get("user")["team_id"]
return None
def extract_user_id(payload: Dict[str, Any]) -> Optional[str]:
if payload.get("user") is not None:
user = payload.get("user")
if isinstance(user, str):
return user
elif "id" in user:
return user.get("id") # type: ignore
if "user_id" in payload:
return payload.get("user_id")
if payload.get("event") is not None:
return extract_user_id(payload["event"])
return None
def extract_channel_id(payload: Dict[str, Any]) -> Optional[str]:
if payload.get("channel") is not None:
channel = payload.get("channel")
if isinstance(channel, str):
return channel
elif "id" in channel:
return channel.get("id") # type: ignore
if "channel_id" in payload:
return payload.get("channel_id")
if payload.get("event") is not None:
return extract_channel_id(payload["event"])
if payload.get("item") is not None:
# reaction_added: body["event"]["item"]
return extract_channel_id(payload["item"])
return None
def build_context(context: BoltContext, body: Dict[str, Any]) -> BoltContext:
context["is_enterprise_install"] = extract_is_enterprise_install(body)
enterprise_id = extract_enterprise_id(body)
if enterprise_id:
context["enterprise_id"] = enterprise_id
team_id = extract_team_id(body)
if team_id:
context["team_id"] = team_id
user_id = extract_user_id(body)
if user_id:
context["user_id"] = user_id
channel_id = extract_channel_id(body)
if channel_id:
context["channel_id"] = channel_id
if "response_url" in body:
context["response_url"] = body["response_url"]
elif "response_urls" in body:
# In the case where response_url_enabled: true in a modal exists
response_urls = body["response_urls"]
if len(response_urls) >= 1:
if len(response_urls) > 1:
context.logger.debug(debug_multiple_response_urls_detected())
response_url = response_urls[0].get("response_url")
context["response_url"] = response_url
return context
def extract_content_type(headers: Dict[str, Sequence[str]]) -> Optional[str]:
content_type: Optional[str] = headers.get("content-type", [None])[0]
if content_type:
return content_type.split(";")[0]
return None
def build_normalized_headers(
headers: Optional[Dict[str, Union[str, Sequence[str]]]]
) -> Dict[str, Sequence[str]]:
normalized_headers: Dict[str, Sequence[str]] = {}
if headers is not None:
for key, value in headers.items():
normalized_name = key.lower()
if isinstance(value, list):
normalized_headers[normalized_name] = value
elif isinstance(value, str):
normalized_headers[normalized_name] = [value]
else:
raise ValueError(
f"Unsupported type ({type(value)}) of element in headers ({headers})"
)
return normalized_headers # type: ignore
def error_message_raw_body_required_in_http_mode() -> str:
return "`body` must be a raw string data when running in the HTTP server mode"
def error_message_unknown_request_body_type() -> str:
return "`body` must be either str or dict"
def debug_multiple_response_urls_detected() -> str:
return (
"`response_urls` in the body has multiple URLs in it. "
"If you would like to use non-primary one, "
"please manually extract the one from body['response_urls']."
)
Functions
def build_context(context: BoltContext, body: Dict[str, Any]) ‑> BoltContext
-
Expand source code
def build_context(context: BoltContext, body: Dict[str, Any]) -> BoltContext: context["is_enterprise_install"] = extract_is_enterprise_install(body) enterprise_id = extract_enterprise_id(body) if enterprise_id: context["enterprise_id"] = enterprise_id team_id = extract_team_id(body) if team_id: context["team_id"] = team_id user_id = extract_user_id(body) if user_id: context["user_id"] = user_id channel_id = extract_channel_id(body) if channel_id: context["channel_id"] = channel_id if "response_url" in body: context["response_url"] = body["response_url"] elif "response_urls" in body: # In the case where response_url_enabled: true in a modal exists response_urls = body["response_urls"] if len(response_urls) >= 1: if len(response_urls) > 1: context.logger.debug(debug_multiple_response_urls_detected()) response_url = response_urls[0].get("response_url") context["response_url"] = response_url return context
def build_normalized_headers(headers: Optional[Dict[str, Union[str, Sequence[str]]]]) ‑> Dict[str, Sequence[str]]
-
Expand source code
def build_normalized_headers( headers: Optional[Dict[str, Union[str, Sequence[str]]]] ) -> Dict[str, Sequence[str]]: normalized_headers: Dict[str, Sequence[str]] = {} if headers is not None: for key, value in headers.items(): normalized_name = key.lower() if isinstance(value, list): normalized_headers[normalized_name] = value elif isinstance(value, str): normalized_headers[normalized_name] = [value] else: raise ValueError( f"Unsupported type ({type(value)}) of element in headers ({headers})" ) return normalized_headers # type: ignore
def debug_multiple_response_urls_detected() ‑> str
-
Expand source code
def debug_multiple_response_urls_detected() -> str: return ( "`response_urls` in the body has multiple URLs in it. " "If you would like to use non-primary one, " "please manually extract the one from body['response_urls']." )
def error_message_raw_body_required_in_http_mode() ‑> str
-
Expand source code
def error_message_raw_body_required_in_http_mode() -> str: return "`body` must be a raw string data when running in the HTTP server mode"
def error_message_unknown_request_body_type() ‑> str
-
Expand source code
def error_message_unknown_request_body_type() -> str: return "`body` must be either str or dict"
def extract_channel_id(payload: Dict[str, Any]) ‑> Optional[str]
-
Expand source code
def extract_channel_id(payload: Dict[str, Any]) -> Optional[str]: if payload.get("channel") is not None: channel = payload.get("channel") if isinstance(channel, str): return channel elif "id" in channel: return channel.get("id") # type: ignore if "channel_id" in payload: return payload.get("channel_id") if payload.get("event") is not None: return extract_channel_id(payload["event"]) if payload.get("item") is not None: # reaction_added: body["event"]["item"] return extract_channel_id(payload["item"]) return None
def extract_content_type(headers: Dict[str, Sequence[str]]) ‑> Optional[str]
-
Expand source code
def extract_content_type(headers: Dict[str, Sequence[str]]) -> Optional[str]: content_type: Optional[str] = headers.get("content-type", [None])[0] if content_type: return content_type.split(";")[0] return None
def extract_enterprise_id(payload: Dict[str, Any]) ‑> Optional[str]
-
Expand source code
def extract_enterprise_id(payload: Dict[str, Any]) -> Optional[str]: if payload.get("enterprise") is not None: org = payload.get("enterprise") if isinstance(org, str): return org elif "id" in org: return org.get("id") # type: ignore if payload.get("authorizations") is not None and len(payload["authorizations"]) > 0: # To make Events API handling functioning also for shared channels, # we should use .authorizations[0].enterprise_id over .enterprise_id return extract_enterprise_id(payload["authorizations"][0]) if "enterprise_id" in payload: return payload.get("enterprise_id") if payload.get("team") is not None and "enterprise_id" in payload["team"]: # In the case where the type is view_submission return payload["team"].get("enterprise_id") if payload.get("event") is not None: return extract_enterprise_id(payload["event"]) return None
def extract_is_enterprise_install(payload: Dict[str, Any]) ‑> Optional[bool]
-
Expand source code
def extract_is_enterprise_install(payload: Dict[str, Any]) -> Optional[bool]: if "is_enterprise_install" in payload: is_enterprise_install = payload.get("is_enterprise_install") return is_enterprise_install is not None and ( is_enterprise_install is True or is_enterprise_install == "true" ) return False
def extract_team_id(payload: Dict[str, Any]) ‑> Optional[str]
-
Expand source code
def extract_team_id(payload: Dict[str, Any]) -> Optional[str]: if payload.get("team") is not None: team = payload.get("team") if isinstance(team, str): return team elif team and "id" in team: return team.get("id") if payload.get("authorizations") is not None and len(payload["authorizations"]) > 0: # To make Events API handling functioning also for shared channels, # we should use .authorizations[0].team_id over .team_id return extract_team_id(payload["authorizations"][0]) if "team_id" in payload: return payload.get("team_id") if payload.get("event") is not None: return extract_team_id(payload["event"]) if payload.get("user") is not None: return payload.get("user")["team_id"] return None
def extract_user_id(payload: Dict[str, Any]) ‑> Optional[str]
-
Expand source code
def extract_user_id(payload: Dict[str, Any]) -> Optional[str]: if payload.get("user") is not None: user = payload.get("user") if isinstance(user, str): return user elif "id" in user: return user.get("id") # type: ignore if "user_id" in payload: return payload.get("user_id") if payload.get("event") is not None: return extract_user_id(payload["event"]) return None
def parse_body(body: str, content_type: Optional[str]) ‑> Dict[str, Any]
-
Expand source code
def parse_body(body: str, content_type: Optional[str]) -> Dict[str, Any]: if not body: return {} if ( content_type is not None and content_type == "application/json" ) or body.startswith("{"): return json.loads(body) else: if "payload" in body: # This is not JSON format yet params = dict(parse_qsl(body)) if params.get("payload") is not None: return json.loads(params.get("payload")) else: return {} else: return dict(parse_qsl(body))
def parse_query(query: Union[str, Dict[str, str], Dict[str, Sequence[str]], NoneType]) ‑> Dict[str, Sequence[str]]
-
Expand source code
def parse_query( query: Optional[Union[str, Dict[str, str], Dict[str, Sequence[str]]]] ) -> Dict[str, Sequence[str]]: if query is None: return {} elif isinstance(query, str): return parse_qs(query) elif isinstance(query, dict) or hasattr(query, "items"): result: Dict[str, Sequence[str]] = {} for name, value in query.items(): if isinstance(value, list): result[name] = value elif isinstance(value, str): result[name] = [value] else: raise ValueError( f"Unsupported type ({type(value)}) of element in headers ({query})" ) return result # type: ignore else: raise ValueError(f"Unsupported type of query detected ({type(query)})")