Module slack_bolt.listener.async_listener_completion_handler
Expand source code
import inspect
from abc import ABCMeta, abstractmethod
from logging import Logger
from typing import Callable, Dict, Any, Awaitable, Optional
from slack_bolt.listener.async_internals import (
_build_all_available_args,
)
from slack_bolt.listener.internals import (
_convert_all_available_args_to_kwargs,
)
from slack_bolt.request.async_request import AsyncBoltRequest
from slack_bolt.response import BoltResponse
class AsyncListenerCompletionHandler(metaclass=ABCMeta):
@abstractmethod
async def handle(
self,
request: AsyncBoltRequest,
response: Optional[BoltResponse],
) -> None:
"""Handles an unhandled exception.
Args:
error: The raised exception.
request: The request.
response: The response.
"""
raise NotImplementedError()
class AsyncCustomListenerCompletionHandler(AsyncListenerCompletionHandler):
def __init__(self, logger: Logger, func: Callable[..., Awaitable[None]]):
self.func = func
self.logger = logger
self.arg_names = inspect.getfullargspec(func).args
async def handle(
self,
request: AsyncBoltRequest,
response: Optional[BoltResponse],
) -> None:
all_available_args = _build_all_available_args(
logger=self.logger,
request=request,
response=response,
)
kwargs: Dict[str, Any] = _convert_all_available_args_to_kwargs(
all_available_args=all_available_args,
arg_names=self.arg_names,
logger=self.logger,
)
await self.func(**kwargs)
class AsyncDefaultListenerCompletionHandler(AsyncListenerCompletionHandler):
def __init__(self, logger: Logger):
self.logger = logger
async def handle(
self,
request: AsyncBoltRequest,
response: Optional[BoltResponse],
):
pass
Classes
class AsyncCustomListenerCompletionHandler (logger: logging.Logger, func: Callable[..., Awaitable[NoneType]])
-
Expand source code
class AsyncCustomListenerCompletionHandler(AsyncListenerCompletionHandler): def __init__(self, logger: Logger, func: Callable[..., Awaitable[None]]): self.func = func self.logger = logger self.arg_names = inspect.getfullargspec(func).args async def handle( self, request: AsyncBoltRequest, response: Optional[BoltResponse], ) -> None: all_available_args = _build_all_available_args( logger=self.logger, request=request, response=response, ) kwargs: Dict[str, Any] = _convert_all_available_args_to_kwargs( all_available_args=all_available_args, arg_names=self.arg_names, logger=self.logger, ) await self.func(**kwargs)
Ancestors
Inherited members
class AsyncDefaultListenerCompletionHandler (logger: logging.Logger)
-
Expand source code
class AsyncDefaultListenerCompletionHandler(AsyncListenerCompletionHandler): def __init__(self, logger: Logger): self.logger = logger async def handle( self, request: AsyncBoltRequest, response: Optional[BoltResponse], ): pass
Ancestors
Inherited members
class AsyncListenerCompletionHandler
-
Expand source code
class AsyncListenerCompletionHandler(metaclass=ABCMeta): @abstractmethod async def handle( self, request: AsyncBoltRequest, response: Optional[BoltResponse], ) -> None: """Handles an unhandled exception. Args: error: The raised exception. request: The request. response: The response. """ raise NotImplementedError()
Subclasses
Methods
async def handle(self, request: AsyncBoltRequest, response: Optional[BoltResponse]) ‑> NoneType
-
Handles an unhandled exception.
Args
error
- The raised exception.
request
- The request.
response
- The response.
Expand source code
@abstractmethod async def handle( self, request: AsyncBoltRequest, response: Optional[BoltResponse], ) -> None: """Handles an unhandled exception. Args: error: The raised exception. request: The request. response: The response. """ raise NotImplementedError()