Module slack_bolt.oauth.async_internals
Expand source code
from logging import Logger
from typing import Optional
from slack_sdk.oauth.installation_store import FileInstallationStore
from slack_sdk.oauth.installation_store.async_installation_store import (
AsyncInstallationStore,
)
from ..logger.messages import warning_installation_store_conflicts
# key: client_id, value: AsyncInstallationStore
default_installation_stores = {}
def get_or_create_default_installation_store(client_id: str) -> AsyncInstallationStore:
store = default_installation_stores.get(client_id)
if store is None:
store = FileInstallationStore(client_id=client_id)
default_installation_stores[client_id] = store
return store
def select_consistent_installation_store(
client_id: str,
app_store: Optional[AsyncInstallationStore],
oauth_flow_store: Optional[AsyncInstallationStore],
logger: Logger,
) -> Optional[AsyncInstallationStore]:
default = get_or_create_default_installation_store(client_id)
if app_store is not None:
if oauth_flow_store is not None:
if oauth_flow_store is default:
# only app_store is intentionally set in this case
return app_store
# if both are intentionally set, prioritize app_store
if oauth_flow_store is not app_store:
logger.warning(warning_installation_store_conflicts())
return oauth_flow_store
else:
# only app_store is available
return app_store
else:
# only oauth_flow_store is available
return oauth_flow_store
Functions
def get_or_create_default_installation_store(client_id: str) ‑> slack_sdk.oauth.installation_store.async_installation_store.AsyncInstallationStore
-
Expand source code
def get_or_create_default_installation_store(client_id: str) -> AsyncInstallationStore: store = default_installation_stores.get(client_id) if store is None: store = FileInstallationStore(client_id=client_id) default_installation_stores[client_id] = store return store
def select_consistent_installation_store(client_id: str, app_store: Optional[slack_sdk.oauth.installation_store.async_installation_store.AsyncInstallationStore], oauth_flow_store: Optional[slack_sdk.oauth.installation_store.async_installation_store.AsyncInstallationStore], logger: logging.Logger) ‑> Optional[slack_sdk.oauth.installation_store.async_installation_store.AsyncInstallationStore]
-
Expand source code
def select_consistent_installation_store( client_id: str, app_store: Optional[AsyncInstallationStore], oauth_flow_store: Optional[AsyncInstallationStore], logger: Logger, ) -> Optional[AsyncInstallationStore]: default = get_or_create_default_installation_store(client_id) if app_store is not None: if oauth_flow_store is not None: if oauth_flow_store is default: # only app_store is intentionally set in this case return app_store # if both are intentionally set, prioritize app_store if oauth_flow_store is not app_store: logger.warning(warning_installation_store_conflicts()) return oauth_flow_store else: # only app_store is available return app_store else: # only oauth_flow_store is available return oauth_flow_store