10
10
from fractal_server .app .models .v2 import HistoryUnit
11
11
from fractal_server .app .schemas .v2 import HistoryUnitStatusWithUnset
12
12
from fractal_server .logger import set_logger
13
- from fractal_server .types import ImageAttributeValue
14
13
15
14
logger = set_logger (__name__ )
16
15
17
16
18
17
IMAGE_STATUS_KEY = "__wftask_dataset_image_status__"
19
18
20
19
21
- def _enriched_image (* , img : dict [str , Any ], status : str ) -> dict [str , Any ]:
20
+ def _enriched_image (
21
+ * ,
22
+ img : dict [str , Any ],
23
+ status : str ,
24
+ ) -> dict [str , Any ]:
22
25
return img | {
23
26
"attributes" : (img ["attributes" ] | {IMAGE_STATUS_KEY : status })
24
27
}
@@ -29,6 +32,9 @@ def _prepare_query(
29
32
dataset_id : int ,
30
33
workflowtask_id : int ,
31
34
) -> Select :
35
+ """
36
+ Note: the query does not include `.order_by`.
37
+ """
32
38
stm = (
33
39
select (HistoryImageCache .zarr_url , HistoryUnit .status )
34
40
.join (HistoryUnit )
@@ -39,13 +45,56 @@ def _prepare_query(
39
45
return stm
40
46
41
47
42
- async def enrich_images_async (
48
+ def _postprocess_image_lists (
49
+ target_images : list [dict [str , Any ]],
50
+ list_query_url_status : list [tuple [str , str ]],
51
+ ) -> list [dict [str , Any ]]:
52
+ """ """
53
+ t_1 = time .perf_counter ()
54
+
55
+ # Select only processed images that are part of the target image set
56
+ zarr_url_to_image = {img ["zarr_url" ]: img for img in target_images }
57
+ target_zarr_urls = zarr_url_to_image .keys ()
58
+ list_processed_url_status = [
59
+ url_status
60
+ for url_status in list_query_url_status
61
+ if url_status [0 ] in target_zarr_urls
62
+ ]
63
+
64
+ set_processed_urls = set (
65
+ url_status [0 ] for url_status in list_processed_url_status
66
+ )
67
+ processed_images_with_status = [
68
+ _enriched_image (
69
+ img = zarr_url_to_image [item [0 ]],
70
+ status = item [1 ],
71
+ )
72
+ for item in list_processed_url_status
73
+ ]
74
+
75
+ non_processed_urls = target_zarr_urls - set_processed_urls
76
+ non_processed_images_with_status = [
77
+ _enriched_image (
78
+ img = zarr_url_to_image [zarr_url ],
79
+ status = HistoryUnitStatusWithUnset .UNSET ,
80
+ )
81
+ for zarr_url in non_processed_urls
82
+ ]
83
+ t_2 = time .perf_counter ()
84
+ logger .debug (
85
+ f"[enrich_images_async] post-processing, elapsed={ t_2 - t_1 :.5f} s"
86
+ )
87
+
88
+ return processed_images_with_status + non_processed_images_with_status
89
+
90
+
91
+ async def enrich_images_unsorted_async (
43
92
* ,
44
93
images : list [dict [str , Any ]],
45
94
dataset_id : int ,
46
95
workflowtask_id : int ,
47
96
db : AsyncSession ,
48
- ) -> list [dict [str , ImageAttributeValue ]]:
97
+ ) -> list [dict [str , Any ]]:
49
98
"""
50
99
Enrich images with a status-related attribute.
51
100
@@ -56,124 +105,75 @@ async def enrich_images_async(
56
105
db: An async db session
57
106
58
107
Returns:
59
- The list of enriched images
108
+ The list of enriched images, not necessarily in the same order as
109
+ the input.
60
110
"""
61
111
t_0 = time .perf_counter ()
62
112
logger .info (
63
113
f"[enrich_images_async] START, { dataset_id = } , { workflowtask_id = } "
64
114
)
65
115
66
- zarr_url_to_image = { img [ " zarr_url" ]: img for img in images }
67
-
116
+ # Get `( zarr_url, status)` for _all_ processed images (including those that
117
+ # are not part of the target image set)
68
118
res = await db .execute (
69
119
_prepare_query (
70
120
dataset_id = dataset_id ,
71
121
workflowtask_id = workflowtask_id ,
72
122
)
73
123
)
74
- zarr_urls = zarr_url_to_image .keys ()
75
- list_processed_url_status = [
76
- item for item in res .all () if item [0 ] in zarr_urls
77
- ]
124
+ list_query_url_status = res .all ()
78
125
t_1 = time .perf_counter ()
79
- logger .debug (f"[enrich_images_async] db-query, elapsed={ t_1 - t_0 :.4f} s" )
80
-
81
- set_processed_urls = set (item [0 ] for item in list_processed_url_status )
82
- processed_images_with_status = [
83
- _enriched_image (
84
- img = zarr_url_to_image [item [0 ]],
85
- status = item [1 ],
86
- )
87
- for item in list_processed_url_status
88
- ]
89
- t_2 = time .perf_counter ()
90
- logger .debug (
91
- "[enrich_images_async] processed-images, " f"elapsed={ t_2 - t_1 :.4f} s"
92
- )
126
+ logger .debug (f"[enrich_images_async] query, elapsed={ t_1 - t_0 :.5f} s" )
93
127
94
- non_processed_urls = zarr_url_to_image .keys () - set_processed_urls
95
- non_processed_images_with_status = [
96
- _enriched_image (
97
- img = zarr_url_to_image [zarr_url ],
98
- status = HistoryUnitStatusWithUnset .UNSET ,
99
- )
100
- for zarr_url in non_processed_urls
101
- ]
102
- t_3 = time .perf_counter ()
103
- logger .debug (
104
- "[enrich_images_async] non-processed-images, "
105
- f"elapsed={ t_3 - t_2 :.4f} s"
128
+ output = _postprocess_image_lists (
129
+ target_images = images ,
130
+ list_query_url_status = list_query_url_status ,
106
131
)
107
132
108
- return processed_images_with_status + non_processed_images_with_status
133
+ return output
109
134
110
135
111
- def enrich_images_sync (
136
+ def enrich_images_unsorted_sync (
112
137
* ,
113
138
images : list [dict [str , Any ]],
114
139
dataset_id : int ,
115
140
workflowtask_id : int ,
116
- ) -> list [dict [str , ImageAttributeValue ]]:
141
+ ) -> list [dict [str , Any ]]:
117
142
"""
118
143
Enrich images with a status-related attribute.
119
144
145
+
120
146
Args:
121
147
images: The input image list
122
148
dataset_id: The dataset ID
123
149
workflowtask_id: The workflow-task ID
124
150
125
151
Returns:
126
- The list of enriched images
152
+ The list of enriched images, not necessarily in the same order as
153
+ the input.
127
154
"""
155
+
128
156
t_0 = time .perf_counter ()
129
157
logger .info (
130
158
f"[enrich_images_async] START, { dataset_id = } , { workflowtask_id = } "
131
159
)
132
160
133
- zarr_url_to_image = {img ["zarr_url" ]: img for img in images }
134
-
135
- t_1 = time .perf_counter ()
136
- logger .debug (f"[enrich_images_async] deep-copy, elapsed={ t_1 - t_0 :.4f} s" )
137
-
161
+ # Get `(zarr_url, status)` for _all_ processed images (including those that
162
+ # are not part of the target image set)
138
163
with next (get_sync_db ()) as db :
139
164
res = db .execute (
140
165
_prepare_query (
141
166
dataset_id = dataset_id ,
142
167
workflowtask_id = workflowtask_id ,
143
168
)
144
169
)
145
- zarr_urls = zarr_url_to_image .keys ()
146
- list_processed_url_status = [
147
- item for item in res .all () if item [0 ] in zarr_urls
148
- ]
149
- t_2 = time .perf_counter ()
150
- logger .debug (f"[enrich_images_async] db-query, elapsed={ t_2 - t_1 :.4f} s" )
151
-
152
- set_processed_urls = set (item [0 ] for item in list_processed_url_status )
153
- processed_images_with_status = [
154
- _enriched_image (
155
- img = zarr_url_to_image [item [0 ]],
156
- status = item [1 ],
157
- )
158
- for item in list_processed_url_status
159
- ]
160
- t_3 = time .perf_counter ()
161
- logger .debug (
162
- "[enrich_images_async] processed-images, " f"elapsed={ t_3 - t_2 :.4f} s"
163
- )
170
+ list_query_url_status = res .all ()
171
+ t_1 = time .perf_counter ()
172
+ logger .debug (f"[enrich_images_async] query, elapsed={ t_1 - t_0 :.5f} s" )
164
173
165
- non_processed_urls = zarr_url_to_image .keys () - set_processed_urls
166
- non_processed_images_with_status = [
167
- _enriched_image (
168
- img = zarr_url_to_image [zarr_url ],
169
- status = HistoryUnitStatusWithUnset .UNSET ,
170
- )
171
- for zarr_url in non_processed_urls
172
- ]
173
- t_4 = time .perf_counter ()
174
- logger .debug (
175
- "[enrich_images_async] non-processed-images, "
176
- f"elapsed={ t_4 - t_3 :.4f} s"
174
+ output = _postprocess_image_lists (
175
+ target_images = images ,
176
+ list_query_url_status = list_query_url_status ,
177
177
)
178
178
179
- return processed_images_with_status + non_processed_images_with_status
179
+ return output
0 commit comments