Module slack_bolt.adapter.tornado.handler
Expand source code
from datetime import datetime
from tornado.httputil import HTTPServerRequest
from tornado.web import RequestHandler
from slack_bolt.app import App
from slack_bolt.oauth import OAuthFlow
from slack_bolt.request import BoltRequest
from slack_bolt.response import BoltResponse
class SlackEventsHandler(RequestHandler):
def initialize(self, app: App): # type: ignore
self.app = app
def post(self):
bolt_resp: BoltResponse = self.app.dispatch(to_bolt_request(self.request))
set_response(self, bolt_resp)
return
class SlackOAuthHandler(RequestHandler):
def initialize(self, app: App): # type: ignore
self.app = app
def get(self):
if self.app.oauth_flow is not None: # type: ignore
oauth_flow: OAuthFlow = self.app.oauth_flow # type: ignore
if self.request.path == oauth_flow.install_path:
bolt_resp = oauth_flow.handle_installation(
to_bolt_request(self.request)
)
set_response(self, bolt_resp)
return
elif self.request.path == oauth_flow.redirect_uri_path:
bolt_resp = oauth_flow.handle_callback(to_bolt_request(self.request))
set_response(self, bolt_resp)
return
self.set_status(404)
def to_bolt_request(req: HTTPServerRequest) -> BoltRequest:
return BoltRequest(
body=req.body.decode("utf-8") if req.body else "",
query=req.query,
headers=req.headers,
)
def set_response(self, bolt_resp) -> None:
self.set_status(bolt_resp.status)
self.write(bolt_resp.body)
for name, value in bolt_resp.first_headers_without_set_cookie().items():
self.set_header(name, value)
for cookie in bolt_resp.cookies():
for name, c in cookie.items():
expire_value = c.get("expires")
expire = (
datetime.strptime(expire_value, "%a, %d %b %Y %H:%M:%S %Z")
if expire_value
else None
)
self.set_cookie(
name=name,
value=c.value,
max_age=c.get("max-age"),
expires=expire,
path=c.get("path"),
domain=c.get("domain"),
secure=True,
httponly=True,
)
Functions
def set_response(self, bolt_resp) ‑> NoneType
-
Expand source code
def set_response(self, bolt_resp) -> None: self.set_status(bolt_resp.status) self.write(bolt_resp.body) for name, value in bolt_resp.first_headers_without_set_cookie().items(): self.set_header(name, value) for cookie in bolt_resp.cookies(): for name, c in cookie.items(): expire_value = c.get("expires") expire = ( datetime.strptime(expire_value, "%a, %d %b %Y %H:%M:%S %Z") if expire_value else None ) self.set_cookie( name=name, value=c.value, max_age=c.get("max-age"), expires=expire, path=c.get("path"), domain=c.get("domain"), secure=True, httponly=True, )
def to_bolt_request(req: tornado.httputil.HTTPServerRequest) ‑> BoltRequest
-
Expand source code
def to_bolt_request(req: HTTPServerRequest) -> BoltRequest: return BoltRequest( body=req.body.decode("utf-8") if req.body else "", query=req.query, headers=req.headers, )
Classes
class SlackEventsHandler (application: Application, request: tornado.httputil.HTTPServerRequest, **kwargs: Any)
-
Base class for HTTP request handlers.
Subclasses must define at least one of the methods defined in the "Entry points" section below.
Applications should not construct
RequestHandler
objects directly and subclasses should not override__init__
(override~RequestHandler.initialize
instead).Expand source code
class SlackEventsHandler(RequestHandler): def initialize(self, app: App): # type: ignore self.app = app def post(self): bolt_resp: BoltResponse = self.app.dispatch(to_bolt_request(self.request)) set_response(self, bolt_resp) return
Ancestors
- tornado.web.RequestHandler
Methods
def initialize(self, app: App)
-
Expand source code
def initialize(self, app: App): # type: ignore self.app = app
def post(self)
-
Expand source code
def post(self): bolt_resp: BoltResponse = self.app.dispatch(to_bolt_request(self.request)) set_response(self, bolt_resp) return
class SlackOAuthHandler (application: Application, request: tornado.httputil.HTTPServerRequest, **kwargs: Any)
-
Base class for HTTP request handlers.
Subclasses must define at least one of the methods defined in the "Entry points" section below.
Applications should not construct
RequestHandler
objects directly and subclasses should not override__init__
(override~RequestHandler.initialize
instead).Expand source code
class SlackOAuthHandler(RequestHandler): def initialize(self, app: App): # type: ignore self.app = app def get(self): if self.app.oauth_flow is not None: # type: ignore oauth_flow: OAuthFlow = self.app.oauth_flow # type: ignore if self.request.path == oauth_flow.install_path: bolt_resp = oauth_flow.handle_installation( to_bolt_request(self.request) ) set_response(self, bolt_resp) return elif self.request.path == oauth_flow.redirect_uri_path: bolt_resp = oauth_flow.handle_callback(to_bolt_request(self.request)) set_response(self, bolt_resp) return self.set_status(404)
Ancestors
- tornado.web.RequestHandler
Methods
def get(self)
-
Expand source code
def get(self): if self.app.oauth_flow is not None: # type: ignore oauth_flow: OAuthFlow = self.app.oauth_flow # type: ignore if self.request.path == oauth_flow.install_path: bolt_resp = oauth_flow.handle_installation( to_bolt_request(self.request) ) set_response(self, bolt_resp) return elif self.request.path == oauth_flow.redirect_uri_path: bolt_resp = oauth_flow.handle_callback(to_bolt_request(self.request)) set_response(self, bolt_resp) return self.set_status(404)
def initialize(self, app: App)
-
Expand source code
def initialize(self, app: App): # type: ignore self.app = app