15
15
import sys
16
16
import threading
17
17
from pathlib import Path
18
- from typing import Dict , Generator , cast
18
+ from typing import Dict , Generator , Optional , cast
19
19
20
+ import OpenSSL .crypto
21
+ import OpenSSL .SSL
20
22
import pytest
21
23
from twisted .internet import reactor as _twisted_reactor
22
24
from twisted .internet import ssl
23
25
from twisted .internet .selectreactor import SelectReactor
24
26
from twisted .web import resource , server
27
+ from twisted .web .http import Request
25
28
26
- from playwright .async_api import Browser , BrowserType , Playwright , Request , expect
29
+ from playwright .async_api import Browser , BrowserType , Playwright , expect
27
30
31
+ ssl .optionsForClientTLS
28
32
reactor = cast (SelectReactor , _twisted_reactor )
29
33
30
34
@@ -34,17 +38,61 @@ def _skip_webkit_darwin(browser_name: str) -> None:
34
38
pytest .skip ("WebKit does not proxy localhost on macOS" )
35
39
36
40
37
- class Simple (resource .Resource ):
41
+ class HttpsResource (resource .Resource ):
42
+ serverCertificate : ssl .PrivateCertificate
38
43
isLeaf = True
39
44
45
+ def _verify_cert_chain (self , cert : Optional [OpenSSL .crypto .X509 ]) -> bool :
46
+ if not cert :
47
+ return False
48
+ store = OpenSSL .crypto .X509Store ()
49
+ store .add_cert (self .serverCertificate .original )
50
+ store_ctx = OpenSSL .crypto .X509StoreContext (store , cert )
51
+ try :
52
+ store_ctx .verify_certificate ()
53
+ return True
54
+ except OpenSSL .crypto .X509StoreContextError :
55
+ return False
56
+
40
57
def render_GET (self , request : Request ) -> bytes :
41
- return b"<html>Hello, world!</html>"
58
+ tls_socket : OpenSSL .SSL .Connection = request .transport .getHandle () # type: ignore
59
+ cert = tls_socket .get_peer_certificate ()
60
+ parts = []
61
+
62
+ if self ._verify_cert_chain (cert ):
63
+ request .setResponseCode (200 )
64
+ parts .append (
65
+ {
66
+ "key" : "message" ,
67
+ "value" : f"Hello { cert .get_subject ().CN } , your certificate was issued by { cert .get_issuer ().CN } !" , # type: ignore
68
+ }
69
+ )
70
+ elif cert and cert .get_subject ():
71
+ request .setResponseCode (403 )
72
+ parts .append (
73
+ {
74
+ "key" : "message" ,
75
+ "value" : f"Sorry { cert .get_subject ().CN } , certificates from { cert .get_issuer ().CN } are not welcome here." ,
76
+ }
77
+ )
78
+ else :
79
+ request .setResponseCode (401 )
80
+ parts .append (
81
+ {
82
+ "key" : "message" ,
83
+ "value" : "Sorry, but you need to provide a client certificate to continue." ,
84
+ }
85
+ )
86
+ return b"" .join (
87
+ [
88
+ f'<div data-testid="{ part ["key" ]} ">{ part ["value" ]} </div>' .encode ()
89
+ for part in parts
90
+ ]
91
+ )
42
92
43
93
44
94
@pytest .fixture (scope = "session" , autouse = True )
45
95
def _client_certificate_server (assetdir : Path ) -> Generator [None , None , None ]:
46
- server .Site (Simple ())
47
-
48
96
certAuthCert = ssl .Certificate .loadPEM (
49
97
(assetdir / "client-certificates/server/server_cert.pem" ).read_text ()
50
98
)
@@ -54,7 +102,10 @@ def _client_certificate_server(assetdir: Path) -> Generator[None, None, None]:
54
102
)
55
103
56
104
contextFactory = serverCert .options (certAuthCert )
57
- site = server .Site (Simple ())
105
+ contextFactory .requireCertificate = False
106
+ resource = HttpsResource ()
107
+ resource .serverCertificate = serverCert
108
+ site = server .Site (resource )
58
109
59
110
def _run () -> None :
60
111
reactor .listenSSL (8000 , site , contextFactory )
@@ -65,6 +116,27 @@ def _run() -> None:
65
116
thread .join ()
66
117
67
118
119
+ async def test_should_throw_with_untrusted_client_certs (
120
+ playwright : Playwright , assetdir : Path
121
+ ) -> None :
122
+ serverURL = "https://localhost:8000/"
123
+ request = await playwright .request .new_context (
124
+ # TODO: Remove this once we can pass a custom CA.
125
+ ignore_https_errors = True ,
126
+ client_certificates = [
127
+ {
128
+ "origin" : serverURL ,
129
+ "certPath" : assetdir
130
+ / "client-certificates/client/self-signed/cert.pem" ,
131
+ "keyPath" : assetdir / "client-certificates/client/self-signed/key.pem" ,
132
+ }
133
+ ],
134
+ )
135
+ with pytest .raises (Exception , match = "alert unknown ca" ):
136
+ await request .get (serverURL )
137
+ await request .dispose ()
138
+
139
+
68
140
async def test_should_work_with_new_context (browser : Browser , assetdir : Path ) -> None :
69
141
context = await browser .new_context (
70
142
# TODO: Remove this once we can pass a custom CA.
@@ -79,14 +151,24 @@ async def test_should_work_with_new_context(browser: Browser, assetdir: Path) ->
79
151
)
80
152
page = await context .new_page ()
81
153
await page .goto ("https://localhost:8000" )
82
- await expect (page .get_by_text ("alert certificate required" )).to_be_visible ()
154
+ await expect (page .get_by_test_id ("message" )).to_have_text (
155
+ "Sorry, but you need to provide a client certificate to continue."
156
+ )
83
157
await page .goto ("https://127.0.0.1:8000" )
84
- await expect (page .get_by_text ("Hello, world!" )).to_be_visible ()
158
+ await expect (page .get_by_test_id ("message" )).to_have_text (
159
+ "Hello Alice, your certificate was issued by localhost!"
160
+ )
85
161
86
- with pytest .raises (Exception , match = "alert certificate required" ):
87
- await page .context .request .get ("https://localhost:8000" )
162
+ response = await page .context .request .get ("https://localhost:8000" )
163
+ assert (
164
+ "Sorry, but you need to provide a client certificate to continue."
165
+ in await response .text ()
166
+ )
88
167
response = await page .context .request .get ("https://127.0.0.1:8000" )
89
- assert "Hello, world!" in await response .text ()
168
+ assert (
169
+ "Hello Alice, your certificate was issued by localhost!"
170
+ in await response .text ()
171
+ )
90
172
await context .close ()
91
173
92
174
@@ -108,9 +190,13 @@ async def test_should_work_with_new_persistent_context(
108
190
)
109
191
page = await context .new_page ()
110
192
await page .goto ("https://localhost:8000" )
111
- await expect (page .get_by_text ("alert certificate required" )).to_be_visible ()
193
+ await expect (page .get_by_test_id ("message" )).to_have_text (
194
+ "Sorry, but you need to provide a client certificate to continue."
195
+ )
112
196
await page .goto ("https://127.0.0.1:8000" )
113
- await expect (page .get_by_text ("Hello, world!" )).to_be_visible ()
197
+ await expect (page .get_by_test_id ("message" )).to_have_text (
198
+ "Hello Alice, your certificate was issued by localhost!"
199
+ )
114
200
await context .close ()
115
201
116
202
@@ -128,8 +214,14 @@ async def test_should_work_with_global_api_request_context(
128
214
}
129
215
],
130
216
)
131
- with pytest .raises (Exception , match = "alert certificate required" ):
132
- await request .get ("https://localhost:8000" )
217
+ response = await request .get ("https://localhost:8000" )
218
+ assert (
219
+ "Sorry, but you need to provide a client certificate to continue."
220
+ in await response .text ()
221
+ )
133
222
response = await request .get ("https://127.0.0.1:8000" )
134
- assert "Hello, world!" in await response .text ()
223
+ assert (
224
+ "Hello Alice, your certificate was issued by localhost!"
225
+ in await response .text ()
226
+ )
135
227
await request .dispose ()
0 commit comments