Source code for yc_lockbox._auth

import logging
from functools import cached_property
from datetime import datetime

from yc_lockbox._abc import AbstractYandexAuthClient, CredentialsType
from yc_lockbox._exceptions import BadCredentials
from yc_lockbox._adapters import HTTPAdapter
from yc_lockbox._models import IamTokenResponse

logger = logging.getLogger(__name__)


[docs] class YandexAuthClient(AbstractYandexAuthClient): """ This is a simple client that allows you to get an up-to-date IAM token to make authenticated requests to Yandex Cloud. If you pass a IAM token as credentials, you need to take care of the freshness of the token yourself. :param credentials: Credentials for authenticate requests. Allowed types: service account key, OAuth token, IAM token. :param auth_base_url: Base IAM url without resource path URL. .. note:: Important. This client works only in synchronous mode for backward compatibility. """ def __init__( self, credentials: str | dict[str, str], *, auth_base_url: str | None = None, **kwargs, ) -> None: super().__init__(credentials, auth_base_url=auth_base_url, **kwargs) self.iam_expires_at = 0.0 self.iam_token_url_path = self.auth_base_url + "/iam/v1/tokens" @cached_property def adapter(self) -> HTTPAdapter: """Returns HTTP adapter for communicate with Yandex Cloud.""" return HTTPAdapter
[docs] def get_iam_token(self) -> str: """Cacheable (in-memory, per instance) method for get IAM token from Yandex Cloud.""" if self._credentials_type == CredentialsType.IAM_TOKEN: return self.credentials if self.iam_token is not None and self.iam_expires_at >= datetime.now().timestamp(): # returns cached token return self.iam_token try: if self._credentials_type == CredentialsType.SERVICE_ACCOUNT: body = {"jwt": self._generate_jwt_for_sa()} response = self.adapter.request( "POST", self.iam_token_url_path, json=body, response_model=IamTokenResponse ) self.iam_token = response.token self.iam_expires_at = response.expires_at.timestamp() return self.iam_token elif self._credentials_type == CredentialsType.OAUTH_TOKEN: body = {"yandexPassportOauthToken": self.credentials} response = self.adapter.request( "POST", self.iam_token_url_path, json=body, response_model=IamTokenResponse ) self.iam_token = response.token self.iam_expires_at = response.expires_at.timestamp() return self.iam_token except Exception: raise BadCredentials("Bad credentials, can't issue IAM token.")