Skip to content

Commit 53ce7b4

Browse files
msaroufimfacebook-github-bot
authored andcommitted
Ray Integration test fix (pytorch#483)
Summary: Fixes pytorch#455 * Re-enabled component integration test * Fixed a regression where Ray Job API changed what their status return function printed * Added error handling code in `ray_driver.py` that now makes failed scripts, fail - h/t d4l3k * Fixed broken `CopyComponentProvider` for Ray Pull Request resolved: pytorch#483 Test Plan: Ran below locally and everything was green * `python scripts/component_integration_tests.py --scheduler "ray"` * `pytest schedulers/test/ray_integration_test.py` I'll send a followup PR for the other 2 issues https://github.com/pytorch/torchx/issues?q=is%3Aissue+is%3Aopen+label%3Aray Reviewed By: d4l3k Differential Revision: D36385069 Pulled By: msaroufim fbshipit-source-id: aa91b347e99adeea349c0b0fa377a299dfc5613a
1 parent 6624e0a commit 53ce7b4

File tree

6 files changed

+18
-17
lines changed

6 files changed

+18
-17
lines changed

.github/workflows/components-integration-tests.yaml

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ jobs:
1616
- scheduler: "kubernetes"
1717
- scheduler: "local_cwd"
1818
- scheduler: "local_docker"
19-
# TODO uncomment when https://github.com/pytorch/torchx/issues/455 is resolved
20-
# - scheduler: "ray"
19+
- scheduler: "ray"
2120
fail-fast: false
2221
permissions:
2322
id-token: write

dev-requirements.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,4 @@ ts==0.5.1
2323
usort==0.6.4
2424

2525
# Ray doesn't support Python 3.10
26-
ray[default]==1.11.0; python_version < '3.10'
26+
ray[default]==1.12.0; python_version < '3.10'

scripts/component_integration_tests.py

+1
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ def main() -> None:
100100
component_provider,
101101
],
102102
"image": torchx_image,
103+
"cfg": {},
103104
},
104105
}
105106

torchx/components/integration_tests/component_provider.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def __init__(self, image: str, scheduler: str) -> None:
104104
self._dst_path = "<None>"
105105

106106
def setUp(self) -> None:
107-
if self._scheduler == "local_cwd":
107+
if self._scheduler in ["local_cwd", "ray"]:
108108
fname = "torchx_copy_test.txt"
109109
self._src_path: str = os.path.join(tempfile.gettempdir(), fname)
110110
self._dst_path: str = os.path.join(tempfile.gettempdir(), f"{fname}.copy")
@@ -121,7 +121,7 @@ def _process_local_sched(self) -> None:
121121
def tearDown(self) -> None:
122122
if os.path.exists(self._dst_path):
123123
os.remove(self._dst_path)
124-
if self._scheduler == "local_cwd" and os.path.exists(self._dst_path):
124+
if self._scheduler in ["local_cwd", "ray"] and os.path.exists(self._dst_path):
125125
os.remove(self._dst_path)
126126

127127
def get_app_def(self) -> AppDef:

torchx/schedulers/ray/ray_driver.py

+7-9
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,13 @@ def exec_module(self) -> None:
4949
worker_evn["MASTER_ADDR"] = self.master_addr
5050
worker_evn["MASTER_PORT"] = str(self.master_port)
5151
popen = subprocess.Popen(self.cmd, env=worker_evn)
52+
5253
returncode = popen.wait()
5354
_logger.info(f"Finished with code {returncode}")
5455

56+
if returncode != 0:
57+
raise RuntimeError(f"exec_module failed with return code {returncode}")
58+
5559
def get_actor_address_and_port(self) -> Tuple[str, int]:
5660
return get_address_and_port()
5761

@@ -149,20 +153,14 @@ def main() -> None: # pragma: no cover
149153

150154
# Await return result of remote ray function
151155
while len(active_workers) > 0:
156+
_logger.info(f"running ray.wait on {active_workers}")
157+
152158
# pyre-fixme[16]: Module `worker` has no attribute `wait`.
153159
completed_workers, active_workers = ray.wait(active_workers)
154160
# If a failure occurs the ObjectRef will be marked as completed.
155161
# Calling ray.get will expose the failure as a RayActorError.
156162
for object_ref in completed_workers:
157-
try:
158-
ray.get(object_ref)
159-
# If an error occurs during the actor execution,
160-
# this error will get propagated as-is to the driver when you call ray.get().
161-
# For example, if a ValueError is raised in the actor method call,
162-
# this will be raised as a ValueError on the driver.
163-
# These exceptions will not be caught in this try-except clause
164-
except ray.exceptions.RayActorError as exc:
165-
_logger.error("Ray Actor error", exc)
163+
ray.get(object_ref)
166164

167165

168166
if __name__ == "__main__":

torchx/schedulers/ray_scheduler.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ def wait_until_finish(self, app_id: str, timeout: int = 30) -> None:
308308
start = time.time()
309309
while time.time() - start <= timeout:
310310
status_info = client.get_job_status(app_id)
311-
status = status_info.status
311+
status = status_info
312312
if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}:
313313
break
314314
time.sleep(1)
@@ -322,8 +322,11 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
322322
addr, app_id = app_id.split("-")
323323
client = JobSubmissionClient(f"http://{addr}")
324324
job_status_info = client.get_job_status(app_id)
325-
state = _ray_status_to_torchx_appstate[job_status_info.status]
325+
state = _ray_status_to_torchx_appstate[job_status_info]
326326
roles = [Role(name="ray", num_replicas=-1, image="<N/A>")]
327+
328+
# get ip_address and put it in hostname
329+
327330
roles_statuses = [
328331
RoleStatus(
329332
role="ray",
@@ -340,7 +343,7 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
340343
return DescribeAppResponse(
341344
app_id=app_id,
342345
state=state,
343-
msg=job_status_info.message or NONE,
346+
msg=job_status_info,
344347
roles_statuses=roles_statuses,
345348
roles=roles,
346349
)

0 commit comments

Comments
 (0)