5
5
from flask import current_app , g , request , session
6
6
from itsdangerous import BadSignature , JSONWebSignatureSerializer
7
7
from sqlalchemy .orm import joinedload
8
- from typing import Mapping , Optional
8
+ from typing import Any , Dict , Mapping , Optional
9
9
from urllib .parse import urlparse , urljoin
10
10
from uuid import UUID
11
11
27
27
28
28
29
29
class Tenant (object ):
30
- access = {}
30
+ access : Dict [ UUID , Optional [ Permission ]] = {}
31
31
32
- def __init__ (self , access : Optional [Mapping [UUID , Optional [Permission ]]] = None ):
32
+ def __init__ (self , access : Optional [Dict [UUID , Optional [Permission ]]] = None ):
33
33
if access is not None :
34
34
self .access = access
35
35
@@ -54,7 +54,7 @@ def has_permission(self, repository_id: UUID, permission: Permission = None):
54
54
return permission in access
55
55
56
56
@classmethod
57
- def from_user (cls , user : User ):
57
+ def from_user (cls , user : Optional [ User ] ):
58
58
if not user :
59
59
return cls ()
60
60
@@ -70,7 +70,7 @@ def from_repository(
70
70
if not repository :
71
71
return cls ()
72
72
73
- return RepositoryTenant (access = { repository .id : permission } )
73
+ return RepositoryTenant (repository .id , permission )
74
74
75
75
@classmethod
76
76
def from_api_token (cls , token : ApiToken ):
@@ -82,9 +82,7 @@ def from_api_token(cls, token: ApiToken):
82
82
83
83
class ApiTokenTenant (Tenant ):
84
84
def __init__ (
85
- self ,
86
- token_id : str ,
87
- access : Optional [Mapping [UUID , Optional [Permission ]]] = None ,
85
+ self , token_id : str , access : Optional [Dict [UUID , Optional [Permission ]]] = None
88
86
):
89
87
self .token_id = token_id
90
88
if access is not None :
@@ -94,7 +92,7 @@ def __repr__(self):
94
92
return "<{} token_id={}>" .format (type (self ).__name__ , self .token_id )
95
93
96
94
@cached_property
97
- def access (self ) -> Mapping [UUID , Permission ]:
95
+ def access (self ) -> Dict [UUID , Permission ]:
98
96
if not self .token_id :
99
97
return {}
100
98
@@ -108,9 +106,7 @@ def access(self) -> Mapping[UUID, Permission]:
108
106
109
107
class UserTenant (Tenant ):
110
108
def __init__ (
111
- self ,
112
- user_id : UUID ,
113
- access : Optional [Mapping [UUID , Optional [Permission ]]] = None ,
109
+ self , user_id : UUID , access : Optional [Dict [UUID , Optional [Permission ]]] = None
114
110
):
115
111
self .user_id = user_id
116
112
if access is not None :
@@ -120,7 +116,7 @@ def __repr__(self):
120
116
return "<{} user_id={}>" .format (type (self ).__name__ , self .user_id )
121
117
122
118
@cached_property
123
- def access (self ) -> Mapping [UUID , Permission ]:
119
+ def access (self ) -> Dict [UUID , Permission ]:
124
120
if not self .user_id :
125
121
return {}
126
122
@@ -142,7 +138,7 @@ def __repr__(self):
142
138
)
143
139
144
140
@cached_property
145
- def access (self ) -> Mapping [UUID , Permission ]:
141
+ def access (self ) -> Dict [UUID , Optional [ Permission ] ]:
146
142
if not self .repository_id :
147
143
return {}
148
144
@@ -156,6 +152,7 @@ def get_tenant_from_headers(headers: Mapping) -> Optional[Tenant]:
156
152
header = headers .get ("Authorization" , "" )
157
153
if header :
158
154
return get_tenant_from_bearer_header (header )
155
+ return None
159
156
160
157
161
158
def get_tenant_from_request () -> Tenant :
@@ -171,6 +168,8 @@ def get_tenant_from_bearer_header(header: str) -> Optional[Tenant]:
171
168
return None
172
169
173
170
match = _bearer_regexp .match (header )
171
+ if not match :
172
+ return None
174
173
token = match .group (2 )
175
174
if not token .startswith ("zeus-" ):
176
175
# Assuming this is a legacy token
@@ -314,13 +313,15 @@ def get_current_tenant() -> Tenant:
314
313
315
314
def generate_token (tenant : Tenant ) -> bytes :
316
315
s = JSONWebSignatureSerializer (current_app .secret_key , salt = "auth" )
317
- payload = {"access" : {str (k ): v for k , v in tenant .access .items ()}}
316
+ payload : Dict [str , Any ] = {
317
+ "access" : {str (k ): int (v ) if v else None for k , v in tenant .access .items ()}
318
+ }
318
319
if getattr (tenant , "user_id" , None ):
319
320
payload ["uid" ] = str (tenant .user_id )
320
321
return s .dumps (payload )
321
322
322
323
323
- def parse_token (token : str ) -> Optional [str ]:
324
+ def parse_token (token : str ) -> Optional [Any ]:
324
325
s = JSONWebSignatureSerializer (current_app .secret_key , salt = "auth" )
325
326
try :
326
327
return s .loads (token )
@@ -330,10 +331,12 @@ def parse_token(token: str) -> Optional[str]:
330
331
331
332
332
333
def get_tenant_from_signed_token (token : str ) -> Tenant :
333
- payload = parse_token (token )
334
+ payload : Optional [ Dict [ str , Any ]] = parse_token (token )
334
335
if not payload :
335
336
return Tenant ()
336
- access = {UUID (k ): v for k , v in payload ["access" ].items ()}
337
+ access = {
338
+ UUID (k ): Permission (v ) if v else None for k , v in payload ["access" ].items ()
339
+ }
337
340
if "uid" in payload :
338
341
return UserTenant (user_id = UUID (payload ["uid" ]), access = access )
339
342
return Tenant (access = access )
@@ -354,7 +357,7 @@ def is_safe_url(target: str) -> bool:
354
357
)
355
358
356
359
357
- def get_redirect_target (clear = True , session = session ) -> str :
360
+ def get_redirect_target (clear = True , session = session ) -> Optional [ str ] :
358
361
if clear :
359
362
session_target = session .pop ("next" , None )
360
363
else :
@@ -366,6 +369,7 @@ def get_redirect_target(clear=True, session=session) -> str:
366
369
367
370
if is_safe_url (target ):
368
371
return target
372
+ return None
369
373
370
374
371
375
def bind_redirect_target (target : str = None , session = session ):
0 commit comments