2
2
3
3
from time import sleep
4
4
5
- from torchx .specs .api import AppState , is_terminal
6
-
7
5
from codeflare_sdk import Cluster , ClusterConfiguration , TokenAuthentication
8
- from codeflare_sdk .job . jobs import DDPJobDefinition
6
+ from codeflare_sdk .job import RayJobClient
9
7
10
8
import pytest
11
9
@@ -79,7 +77,7 @@ def assert_jobsubmit_withoutLogin(self, cluster):
79
77
"entrypoint" : "python mnist.py" ,
80
78
"runtime_env" : {
81
79
"working_dir" : "./tests/e2e/" ,
82
- "pip" : "mnist_pip_requirements.txt" ,
80
+ "pip" : "./tests/e2e/ mnist_pip_requirements.txt" ,
83
81
},
84
82
}
85
83
try :
@@ -98,19 +96,26 @@ def assert_jobsubmit_withoutLogin(self, cluster):
98
96
99
97
def assert_jobsubmit_withlogin (self , cluster ):
100
98
self .assert_appwrapper_exists ()
101
- jobdef = DDPJobDefinition (
102
- name = "mnist" ,
103
- script = "./tests/e2e/mnist.py" ,
104
- scheduler_args = {"requirements" : "./tests/e2e/mnist_pip_requirements.txt" },
99
+ auth_token = run_oc_command (["whoami" , "--show-token=true" ])
100
+ ray_dashboard = cluster .cluster_dashboard_uri ()
101
+ header = {"Authorization" : f"Bearer { auth_token } " }
102
+ client = RayJobClient (address = ray_dashboard , headers = header , verify = True )
103
+
104
+ # Submit the job
105
+ submission_id = client .submit_job (
106
+ entrypoint = "python mnist.py" ,
107
+ runtime_env = {
108
+ "working_dir" : "./tests/e2e/" ,
109
+ "pip" : "mnist_pip_requirements.txt" ,
110
+ },
105
111
)
106
- job = jobdef .submit (cluster )
107
-
112
+ print (f"Submitted job with ID: { submission_id } " )
108
113
done = False
109
114
time = 0
110
115
timeout = 900
111
116
while not done :
112
- status = job . status ( )
113
- if is_terminal ( status .state ):
117
+ status = client . get_job_status ( submission_id )
118
+ if status .is_terminal ( ):
114
119
break
115
120
if not done :
116
121
print (status )
@@ -119,11 +124,12 @@ def assert_jobsubmit_withlogin(self, cluster):
119
124
sleep (5 )
120
125
time += 5
121
126
122
- print ( job . status () )
123
- self . assert_job_completion ( status )
127
+ logs = client . get_job_logs ( submission_id )
128
+ print ( logs )
124
129
125
- print ( job . logs () )
130
+ self . assert_job_completion ( status )
126
131
132
+ client .delete_job (submission_id )
127
133
cluster .down ()
128
134
129
135
def assert_appwrapper_exists (self ):
@@ -144,9 +150,9 @@ def assert_appwrapper_exists(self):
144
150
assert False
145
151
146
152
def assert_job_completion (self , status ):
147
- if status . state == AppState . SUCCEEDED :
148
- print (f"Job has completed: '{ status . state } '" )
153
+ if status == " SUCCEEDED" :
154
+ print (f"Job has completed: '{ status } '" )
149
155
assert True
150
156
else :
151
- print (f"Job has completed: '{ status . state } '" )
157
+ print (f"Job has completed: '{ status } '" )
152
158
assert False
0 commit comments