Skip to content

Commit 77a5c28

Browse files
committed
add AppWrapper tests
1 parent 768c2fb commit 77a5c28

File tree

2 files changed

+248
-0
lines changed

2 files changed

+248
-0
lines changed
+165
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
Copyright 2023.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package e2e
18+
19+
import (
20+
"testing"
21+
22+
. "github.com/onsi/gomega"
23+
mcadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
24+
. "github.com/project-codeflare/codeflare-common/support"
25+
26+
batchv1 "k8s.io/api/batch/v1"
27+
corev1 "k8s.io/api/core/v1"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30+
"k8s.io/apimachinery/pkg/runtime"
31+
)
32+
33+
// Trains the MNIST dataset as a batch Job in an AppWrapper, and asserts successful completion of the training job.
34+
func TestMNISTPyTorchAppWrapper(t *testing.T) {
35+
test := With(t)
36+
test.T().Parallel()
37+
38+
// Create a namespace and localqueue in that namespace
39+
namespace := test.NewTestNamespace()
40+
localQueue := CreateKueueLocalQueue(test, namespace.Name, "e2e-cluster-queue")
41+
42+
// Test configuration
43+
config := &corev1.ConfigMap{
44+
TypeMeta: metav1.TypeMeta{
45+
APIVersion: corev1.SchemeGroupVersion.String(),
46+
Kind: "ConfigMap",
47+
},
48+
ObjectMeta: metav1.ObjectMeta{
49+
Name: "mnist-mcad",
50+
Namespace: namespace.Name,
51+
},
52+
BinaryData: map[string][]byte{
53+
// pip requirements
54+
"requirements.txt": ReadFile(test, "mnist_pip_requirements.txt"),
55+
// MNIST training script
56+
"mnist.py": ReadFile(test, "mnist.py"),
57+
},
58+
Immutable: Ptr(true),
59+
}
60+
config, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Create(test.Ctx(), config, metav1.CreateOptions{})
61+
test.Expect(err).NotTo(HaveOccurred())
62+
test.T().Logf("Created ConfigMap %s/%s successfully", config.Namespace, config.Name)
63+
64+
// Batch Job
65+
job := &batchv1.Job{
66+
TypeMeta: metav1.TypeMeta{
67+
APIVersion: batchv1.SchemeGroupVersion.String(),
68+
Kind: "Job",
69+
},
70+
ObjectMeta: metav1.ObjectMeta{
71+
Name: "mnist",
72+
Namespace: namespace.Name,
73+
},
74+
Spec: batchv1.JobSpec{
75+
Completions: Ptr(int32(1)),
76+
Parallelism: Ptr(int32(1)),
77+
Template: corev1.PodTemplateSpec{
78+
Spec: corev1.PodSpec{
79+
Containers: []corev1.Container{
80+
{
81+
Name: "job",
82+
Image: GetPyTorchImage(),
83+
Env: []corev1.EnvVar{
84+
{Name: "PYTHONUSERBASE", Value: "/workdir"},
85+
{Name: "MNIST_DATASET_URL", Value: GetMnistDatasetURL()},
86+
{Name: "PIP_INDEX_URL", Value: GetPipIndexURL()},
87+
{Name: "PIP_TRUSTED_HOST", Value: GetPipTrustedHost()},
88+
},
89+
Command: []string{"/bin/sh", "-c", "pip install -r /test/requirements.txt && torchrun /test/mnist.py"},
90+
VolumeMounts: []corev1.VolumeMount{
91+
{
92+
Name: "test",
93+
MountPath: "/test",
94+
},
95+
{
96+
Name: "workdir",
97+
MountPath: "/workdir",
98+
},
99+
},
100+
WorkingDir: "/workdir",
101+
},
102+
},
103+
Volumes: []corev1.Volume{
104+
{
105+
Name: "test",
106+
VolumeSource: corev1.VolumeSource{
107+
ConfigMap: &corev1.ConfigMapVolumeSource{
108+
LocalObjectReference: corev1.LocalObjectReference{
109+
Name: config.Name,
110+
},
111+
},
112+
},
113+
},
114+
{
115+
Name: "workdir",
116+
VolumeSource: corev1.VolumeSource{
117+
EmptyDir: &corev1.EmptyDirVolumeSource{},
118+
},
119+
},
120+
},
121+
RestartPolicy: corev1.RestartPolicyNever,
122+
},
123+
},
124+
},
125+
}
126+
127+
// Create an AppWrapper resource
128+
aw := &mcadv1beta2.AppWrapper{
129+
TypeMeta: metav1.TypeMeta{
130+
APIVersion: mcadv1beta2.GroupVersion.String(),
131+
Kind: "AppWrapper",
132+
},
133+
ObjectMeta: metav1.ObjectMeta{
134+
Name: "mnist",
135+
Namespace: namespace.Name,
136+
Labels: map[string]string{"kueue.x-k8s.io/queue-name": localQueue.Name},
137+
},
138+
Spec: mcadv1beta2.AppWrapperSpec{
139+
Components: []mcadv1beta2.AppWrapperComponent{
140+
{
141+
Template: Raw(test, job),
142+
},
143+
},
144+
},
145+
}
146+
147+
appWrapperResource := mcadv1beta2.GroupVersion.WithResource("appwrappers")
148+
awMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(aw)
149+
test.Expect(err).NotTo(HaveOccurred())
150+
unstruct := unstructured.Unstructured{Object: awMap}
151+
_, err = test.Client().Dynamic().Resource(appWrapperResource).Namespace(namespace.Name).Create(test.Ctx(), &unstruct, metav1.CreateOptions{})
152+
test.Expect(err).NotTo(HaveOccurred())
153+
test.T().Logf("Created AppWrapper %s/%s successfully", aw.Namespace, aw.Name)
154+
155+
test.T().Logf("Waiting for AppWrapper %s/%s to be running", aw.Namespace, aw.Name)
156+
test.Eventually(AppWrapper(test, namespace, aw.Name), TestTimeoutMedium).
157+
Should(WithTransform(AppWrapperPhase, Equal(mcadv1beta2.AppWrapperRunning)))
158+
159+
test.T().Logf("Waiting for AppWrapper %s/%s to complete", job.Namespace, job.Name)
160+
test.Eventually(AppWrapper(test, namespace, aw.Name), TestTimeoutLong).Should(
161+
Or(
162+
WithTransform(AppWrapperPhase, Equal(mcadv1beta2.AppWrapperSucceeded)),
163+
WithTransform(AppWrapperPhase, Equal(mcadv1beta2.AppWrapperFailed)),
164+
))
165+
}

test/e2e/mnist_rayjob_raycluster_test.go

+83
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,22 @@ import (
2323
"testing"
2424

2525
. "github.com/onsi/gomega"
26+
mcadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
2627
. "github.com/project-codeflare/codeflare-common/support"
2728
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
2829

2930
corev1 "k8s.io/api/core/v1"
3031
"k8s.io/apimachinery/pkg/api/resource"
3132
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
34+
"k8s.io/apimachinery/pkg/runtime"
3235
)
3336

3437
// Trains the MNIST dataset as a RayJob, executed by a Ray cluster
3538
// directly managed by Kueue, and asserts successful completion of the training job.
3639
func TestMNISTRayJobRayCluster(t *testing.T) {
3740
test := With(t)
41+
test.T().Parallel()
3842

3943
// Create a namespace and localqueue in that namespace
4044
namespace := test.NewTestNamespace()
@@ -84,6 +88,85 @@ func TestMNISTRayJobRayCluster(t *testing.T) {
8488
To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusSucceeded)))
8589
}
8690

91+
// Same as TestMNISTRayJobRayCluster, except the RayCluster is wrapped in an AppWrapper
92+
func TestMNISTRayJobRayClusterAppWrapper(t *testing.T) {
93+
test := With(t)
94+
test.T().Parallel()
95+
test.T().Skip("Disabled due to Kueue 0.6.x overly eager reconcilliation bug")
96+
97+
// Create a namespace and localqueue in that namespace
98+
namespace := test.NewTestNamespace()
99+
localQueue := CreateKueueLocalQueue(test, namespace.Name, "e2e-cluster-queue")
100+
101+
// Create MNIST training script
102+
mnist := constructMNISTConfigMap(test, namespace)
103+
mnist, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Create(test.Ctx(), mnist, metav1.CreateOptions{})
104+
test.Expect(err).NotTo(HaveOccurred())
105+
test.T().Logf("Created ConfigMap %s/%s successfully", mnist.Namespace, mnist.Name)
106+
107+
// Create RayCluster, wrap in AppWrapper and assign to localqueue
108+
rayCluster := constructRayCluster(test, namespace, mnist)
109+
aw := &mcadv1beta2.AppWrapper{
110+
TypeMeta: metav1.TypeMeta{
111+
APIVersion: mcadv1beta2.GroupVersion.String(),
112+
Kind: "AppWrapper",
113+
},
114+
ObjectMeta: metav1.ObjectMeta{
115+
Name: rayCluster.Name,
116+
Namespace: namespace.Name,
117+
Labels: map[string]string{"kueue.x-k8s.io/queue-name": localQueue.Name},
118+
},
119+
Spec: mcadv1beta2.AppWrapperSpec{
120+
Components: []mcadv1beta2.AppWrapperComponent{
121+
{
122+
Template: Raw(test, rayCluster),
123+
},
124+
},
125+
},
126+
}
127+
appWrapperResource := mcadv1beta2.GroupVersion.WithResource("appwrappers")
128+
awMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(aw)
129+
test.Expect(err).NotTo(HaveOccurred())
130+
unstruct := unstructured.Unstructured{Object: awMap}
131+
_, err = test.Client().Dynamic().Resource(appWrapperResource).Namespace(namespace.Name).Create(test.Ctx(), &unstruct, metav1.CreateOptions{})
132+
test.Expect(err).NotTo(HaveOccurred())
133+
test.T().Logf("Created AppWrapper %s/%s successfully", aw.Namespace, aw.Name)
134+
135+
test.T().Logf("Waiting for AppWrapper %s/%s to be running", aw.Namespace, aw.Name)
136+
test.Eventually(AppWrapper(test, namespace, aw.Name), TestTimeoutMedium).
137+
Should(WithTransform(AppWrapperPhase, Equal(mcadv1beta2.AppWrapperRunning)))
138+
139+
test.T().Logf("Waiting for RayCluster %s/%s to be running", rayCluster.Namespace, rayCluster.Name)
140+
test.Eventually(RayCluster(test, namespace.Name, rayCluster.Name), TestTimeoutMedium).
141+
Should(WithTransform(RayClusterState, Equal(rayv1.Ready)))
142+
143+
// Create RayJob
144+
rayJob := constructRayJob(test, namespace, rayCluster)
145+
rayJob, err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Create(test.Ctx(), rayJob, metav1.CreateOptions{})
146+
test.Expect(err).NotTo(HaveOccurred())
147+
test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
148+
149+
rayDashboardURL := getRayDashboardURL(test, rayCluster.Namespace, rayCluster.Name)
150+
151+
test.T().Logf("Connecting to Ray cluster at: %s", rayDashboardURL.String())
152+
rayClient := NewRayClusterClient(rayDashboardURL)
153+
154+
// Wait for Ray job id to be available, this value is needed for writing logs in defer
155+
test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutShort).
156+
Should(WithTransform(RayJobId, Not(BeEmpty())))
157+
158+
// Retrieving the job logs once it has completed or timed out
159+
defer WriteRayJobAPILogs(test, rayClient, GetRayJobId(test, rayJob.Namespace, rayJob.Name))
160+
161+
test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name)
162+
test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutLong).
163+
Should(WithTransform(RayJobStatus, Satisfy(rayv1.IsJobTerminal)))
164+
165+
// Assert the Ray job has completed successfully
166+
test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
167+
To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusSucceeded)))
168+
}
169+
87170
func constructMNISTConfigMap(test Test, namespace *corev1.Namespace) *corev1.ConfigMap {
88171
return &corev1.ConfigMap{
89172
TypeMeta: metav1.TypeMeta{

0 commit comments

Comments
 (0)