Skip to content

Commit 43b105a

Browse files
committed
add e2e test for appwrapper containing a raycluster
1 parent e7d6380 commit 43b105a

File tree

1 file changed

+106
-0
lines changed

1 file changed

+106
-0
lines changed
+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import requests
2+
3+
from time import sleep
4+
5+
from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication
6+
from codeflare_sdk.job import RayJobClient
7+
8+
import pytest
9+
10+
from support import *
11+
12+
# This test creates an AppWrapper containing a Ray Cluster and covers the Ray Job submission functionality on Kind Cluster
13+
14+
15+
@pytest.mark.kind
16+
class TestRayClusterSDKAppWrapperKind:
17+
def setup_method(self):
18+
initialize_kubernetes_client(self)
19+
20+
def teardown_method(self):
21+
delete_namespace(self)
22+
23+
def test_mnist_ray_cluster_sdk_kind(self):
24+
self.setup_method()
25+
create_namespace(self)
26+
create_kueue_resources(self)
27+
self.run_mnist_raycluster_sdk_kind()
28+
29+
def run_mnist_raycluster_sdk_kind(self):
30+
ray_image = get_ray_image()
31+
32+
cluster = Cluster(
33+
ClusterConfiguration(
34+
name="mnist",
35+
namespace=self.namespace,
36+
num_workers=1,
37+
head_cpus="500m",
38+
head_memory=2,
39+
min_cpus="500m",
40+
max_cpus=1,
41+
min_memory=1,
42+
max_memory=2,
43+
num_gpus=0,
44+
image=ray_image,
45+
write_to_file=True,
46+
verify_tls=False,
47+
appwrapper=True,
48+
)
49+
)
50+
51+
cluster.up()
52+
53+
cluster.status()
54+
55+
cluster.wait_ready()
56+
57+
cluster.status()
58+
59+
cluster.details()
60+
61+
self.assert_jobsubmit_withoutlogin_kind(cluster)
62+
63+
# Assertions
64+
65+
def assert_jobsubmit_withoutlogin_kind(self, cluster):
66+
ray_dashboard = cluster.cluster_dashboard_uri()
67+
client = RayJobClient(address=ray_dashboard, verify=False)
68+
69+
submission_id = client.submit_job(
70+
entrypoint="python mnist.py",
71+
runtime_env={
72+
"working_dir": "./tests/e2e/",
73+
"pip": "./tests/e2e/mnist_pip_requirements.txt",
74+
},
75+
)
76+
print(f"Submitted job with ID: {submission_id}")
77+
done = False
78+
time = 0
79+
timeout = 900
80+
while not done:
81+
status = client.get_job_status(submission_id)
82+
if status.is_terminal():
83+
break
84+
if not done:
85+
print(status)
86+
if timeout and time >= timeout:
87+
raise TimeoutError(f"job has timed out after waiting {timeout}s")
88+
sleep(5)
89+
time += 5
90+
91+
logs = client.get_job_logs(submission_id)
92+
print(logs)
93+
94+
self.assert_job_completion(status)
95+
96+
client.delete_job(submission_id)
97+
98+
cluster.down()
99+
100+
def assert_job_completion(self, status):
101+
if status == "SUCCEEDED":
102+
print(f"Job has completed: '{status}'")
103+
assert True
104+
else:
105+
print(f"Job has completed: '{status}'")
106+
assert False

0 commit comments

Comments
 (0)