86
86
IncrementalDataRecord ,
87
87
IncrementalPublisher ,
88
88
IncrementalResult ,
89
+ InitialResultRecord ,
89
90
StreamItemsRecord ,
91
+ SubsequentDataRecord ,
90
92
SubsequentIncrementalExecutionResult ,
91
93
)
92
94
from .middleware import MiddlewareManager
@@ -352,7 +354,6 @@ class ExecutionContext:
352
354
field_resolver : GraphQLFieldResolver
353
355
type_resolver : GraphQLTypeResolver
354
356
subscribe_field_resolver : GraphQLFieldResolver
355
- errors : list [GraphQLError ]
356
357
incremental_publisher : IncrementalPublisher
357
358
middleware_manager : MiddlewareManager | None
358
359
@@ -371,7 +372,6 @@ def __init__(
371
372
field_resolver : GraphQLFieldResolver ,
372
373
type_resolver : GraphQLTypeResolver ,
373
374
subscribe_field_resolver : GraphQLFieldResolver ,
374
- errors : list [GraphQLError ],
375
375
incremental_publisher : IncrementalPublisher ,
376
376
middleware_manager : MiddlewareManager | None ,
377
377
is_awaitable : Callable [[Any ], bool ] | None ,
@@ -385,7 +385,6 @@ def __init__(
385
385
self .field_resolver = field_resolver
386
386
self .type_resolver = type_resolver
387
387
self .subscribe_field_resolver = subscribe_field_resolver
388
- self .errors = errors
389
388
self .incremental_publisher = incremental_publisher
390
389
self .middleware_manager = middleware_manager
391
390
if is_awaitable :
@@ -478,7 +477,6 @@ def build(
478
477
field_resolver or default_field_resolver ,
479
478
type_resolver or default_type_resolver ,
480
479
subscribe_field_resolver or default_field_resolver ,
481
- [],
482
480
IncrementalPublisher (),
483
481
middleware_manager ,
484
482
is_awaitable ,
@@ -514,15 +512,14 @@ def build_per_event_execution_context(self, payload: Any) -> ExecutionContext:
514
512
self .field_resolver ,
515
513
self .type_resolver ,
516
514
self .subscribe_field_resolver ,
517
- [],
518
- # no need to update incrementalPublisher,
519
- # incremental delivery is not supported for subscriptions
520
515
self .incremental_publisher ,
521
516
self .middleware_manager ,
522
517
self .is_awaitable ,
523
518
)
524
519
525
- def execute_operation (self ) -> AwaitableOrValue [dict [str , Any ]]:
520
+ def execute_operation (
521
+ self , initial_result_record : InitialResultRecord
522
+ ) -> AwaitableOrValue [dict [str , Any ]]:
526
523
"""Execute an operation.
527
524
528
525
Implements the "Executing operations" section of the spec.
@@ -551,12 +548,17 @@ def execute_operation(self) -> AwaitableOrValue[dict[str, Any]]:
551
548
self .execute_fields_serially
552
549
if operation .operation == OperationType .MUTATION
553
550
else self .execute_fields
554
- )(root_type , root_value , None , grouped_field_set ) # type: ignore
551
+ )(root_type , root_value , None , grouped_field_set , initial_result_record )
555
552
556
553
for patch in patches :
557
554
label , patch_grouped_filed_set = patch
558
555
self .execute_deferred_fragment (
559
- root_type , root_value , patch_grouped_filed_set , label , None
556
+ root_type ,
557
+ root_value ,
558
+ patch_grouped_filed_set ,
559
+ initial_result_record ,
560
+ label ,
561
+ None ,
560
562
)
561
563
562
564
return result
@@ -567,6 +569,7 @@ def execute_fields_serially(
567
569
source_value : Any ,
568
570
path : Path | None ,
569
571
grouped_field_set : GroupedFieldSet ,
572
+ incremental_data_record : IncrementalDataRecord ,
570
573
) -> AwaitableOrValue [dict [str , Any ]]:
571
574
"""Execute the given fields serially.
572
575
@@ -581,7 +584,11 @@ def reducer(
581
584
response_name , field_group = field_item
582
585
field_path = Path (path , response_name , parent_type .name )
583
586
result = self .execute_field (
584
- parent_type , source_value , field_group , field_path
587
+ parent_type ,
588
+ source_value ,
589
+ field_group ,
590
+ field_path ,
591
+ incremental_data_record ,
585
592
)
586
593
if result is Undefined :
587
594
return results
@@ -607,7 +614,7 @@ def execute_fields(
607
614
source_value : Any ,
608
615
path : Path | None ,
609
616
grouped_field_set : GroupedFieldSet ,
610
- incremental_data_record : IncrementalDataRecord | None = None ,
617
+ incremental_data_record : IncrementalDataRecord ,
611
618
) -> AwaitableOrValue [dict [str , Any ]]:
612
619
"""Execute the given fields concurrently.
613
620
@@ -662,7 +669,7 @@ def execute_field(
662
669
source : Any ,
663
670
field_group : FieldGroup ,
664
671
path : Path ,
665
- incremental_data_record : IncrementalDataRecord | None = None ,
672
+ incremental_data_record : IncrementalDataRecord ,
666
673
) -> AwaitableOrValue [Any ]:
667
674
"""Resolve the field on the given source object.
668
675
@@ -774,7 +781,7 @@ def handle_field_error(
774
781
return_type : GraphQLOutputType ,
775
782
field_group : FieldGroup ,
776
783
path : Path ,
777
- incremental_data_record : IncrementalDataRecord | None = None ,
784
+ incremental_data_record : IncrementalDataRecord ,
778
785
) -> None :
779
786
"""Handle error properly according to the field type."""
780
787
error = located_error (raw_error , field_group , path .as_list ())
@@ -784,13 +791,9 @@ def handle_field_error(
784
791
if is_non_null_type (return_type ):
785
792
raise error
786
793
787
- errors = (
788
- incremental_data_record .errors if incremental_data_record else self .errors
789
- )
790
-
791
794
# Otherwise, error protection is applied, logging the error and resolving a
792
795
# null value for this field if one is encountered.
793
- errors . append ( error )
796
+ self . incremental_publisher . add_field_error ( incremental_data_record , error )
794
797
795
798
def complete_value (
796
799
self ,
@@ -799,7 +802,7 @@ def complete_value(
799
802
info : GraphQLResolveInfo ,
800
803
path : Path ,
801
804
result : Any ,
802
- incremental_data_record : IncrementalDataRecord | None ,
805
+ incremental_data_record : IncrementalDataRecord ,
803
806
) -> AwaitableOrValue [Any ]:
804
807
"""Complete a value.
805
808
@@ -888,7 +891,7 @@ async def complete_awaitable_value(
888
891
info : GraphQLResolveInfo ,
889
892
path : Path ,
890
893
result : Any ,
891
- incremental_data_record : IncrementalDataRecord | None = None ,
894
+ incremental_data_record : IncrementalDataRecord ,
892
895
) -> Any :
893
896
"""Complete an awaitable value."""
894
897
try :
@@ -955,7 +958,7 @@ async def complete_async_iterator_value(
955
958
info : GraphQLResolveInfo ,
956
959
path : Path ,
957
960
async_iterator : AsyncIterator [Any ],
958
- incremental_data_record : IncrementalDataRecord | None ,
961
+ incremental_data_record : IncrementalDataRecord ,
959
962
) -> list [Any ]:
960
963
"""Complete an async iterator.
961
964
@@ -984,8 +987,8 @@ async def complete_async_iterator_value(
984
987
info ,
985
988
item_type ,
986
989
path ,
987
- stream .label ,
988
990
incremental_data_record ,
991
+ stream .label ,
989
992
)
990
993
),
991
994
timeout = ASYNC_DELAY ,
@@ -1039,7 +1042,7 @@ def complete_list_value(
1039
1042
info : GraphQLResolveInfo ,
1040
1043
path : Path ,
1041
1044
result : AsyncIterable [Any ] | Iterable [Any ],
1042
- incremental_data_record : IncrementalDataRecord | None ,
1045
+ incremental_data_record : IncrementalDataRecord ,
1043
1046
) -> AwaitableOrValue [list [Any ]]:
1044
1047
"""Complete a list value.
1045
1048
@@ -1093,8 +1096,8 @@ def complete_list_value(
1093
1096
field_group ,
1094
1097
info ,
1095
1098
item_type ,
1096
- stream .label ,
1097
1099
previous_incremental_data_record ,
1100
+ stream .label ,
1098
1101
)
1099
1102
continue
1100
1103
@@ -1138,7 +1141,7 @@ def complete_list_item_value(
1138
1141
field_group : FieldGroup ,
1139
1142
info : GraphQLResolveInfo ,
1140
1143
item_path : Path ,
1141
- incremental_data_record : IncrementalDataRecord | None ,
1144
+ incremental_data_record : IncrementalDataRecord ,
1142
1145
) -> bool :
1143
1146
"""Complete a list item value by adding it to the completed results.
1144
1147
@@ -1229,7 +1232,7 @@ def complete_abstract_value(
1229
1232
info : GraphQLResolveInfo ,
1230
1233
path : Path ,
1231
1234
result : Any ,
1232
- incremental_data_record : IncrementalDataRecord | None ,
1235
+ incremental_data_record : IncrementalDataRecord ,
1233
1236
) -> AwaitableOrValue [Any ]:
1234
1237
"""Complete an abstract value.
1235
1238
@@ -1344,7 +1347,7 @@ def complete_object_value(
1344
1347
info : GraphQLResolveInfo ,
1345
1348
path : Path ,
1346
1349
result : Any ,
1347
- incremental_data_record : IncrementalDataRecord | None ,
1350
+ incremental_data_record : IncrementalDataRecord ,
1348
1351
) -> AwaitableOrValue [dict [str , Any ]]:
1349
1352
"""Complete an Object value by executing all sub-selections."""
1350
1353
# If there is an `is_type_of()` predicate function, call it with the current
@@ -1379,7 +1382,7 @@ def collect_and_execute_subfields(
1379
1382
field_group : FieldGroup ,
1380
1383
path : Path ,
1381
1384
result : Any ,
1382
- incremental_data_record : IncrementalDataRecord | None ,
1385
+ incremental_data_record : IncrementalDataRecord ,
1383
1386
) -> AwaitableOrValue [dict [str , Any ]]:
1384
1387
"""Collect sub-fields to execute to complete this value."""
1385
1388
sub_grouped_field_set , sub_patches = self .collect_subfields (
@@ -1396,9 +1399,9 @@ def collect_and_execute_subfields(
1396
1399
return_type ,
1397
1400
result ,
1398
1401
sub_patch_grouped_field_set ,
1402
+ incremental_data_record ,
1399
1403
label ,
1400
1404
path ,
1401
- incremental_data_record ,
1402
1405
)
1403
1406
1404
1407
return sub_fields
@@ -1474,9 +1477,9 @@ def execute_deferred_fragment(
1474
1477
parent_type : GraphQLObjectType ,
1475
1478
source_value : Any ,
1476
1479
fields : GroupedFieldSet ,
1480
+ parent_context : IncrementalDataRecord ,
1477
1481
label : str | None = None ,
1478
1482
path : Path | None = None ,
1479
- parent_context : IncrementalDataRecord | None = None ,
1480
1483
) -> None :
1481
1484
"""Execute deferred fragment."""
1482
1485
incremental_publisher = self .incremental_publisher
@@ -1529,9 +1532,9 @@ def execute_stream_field(
1529
1532
field_group : FieldGroup ,
1530
1533
info : GraphQLResolveInfo ,
1531
1534
item_type : GraphQLOutputType ,
1535
+ parent_context : IncrementalDataRecord ,
1532
1536
label : str | None = None ,
1533
- parent_context : IncrementalDataRecord | None = None ,
1534
- ) -> IncrementalDataRecord :
1537
+ ) -> SubsequentDataRecord :
1535
1538
"""Execute stream field."""
1536
1539
is_awaitable = self .is_awaitable
1537
1540
incremental_publisher = self .incremental_publisher
@@ -1678,8 +1681,8 @@ async def execute_stream_async_iterator(
1678
1681
info : GraphQLResolveInfo ,
1679
1682
item_type : GraphQLOutputType ,
1680
1683
path : Path ,
1684
+ parent_context : IncrementalDataRecord ,
1681
1685
label : str | None = None ,
1682
- parent_context : IncrementalDataRecord | None = None ,
1683
1686
) -> None :
1684
1687
"""Execute stream iterator."""
1685
1688
incremental_publisher = self .incremental_publisher
@@ -1877,21 +1880,24 @@ def execute_impl(
1877
1880
# Errors from sub-fields of a NonNull type may propagate to the top level,
1878
1881
# at which point we still log the error and null the parent field, which
1879
1882
# in this case is the entire response.
1880
- errors = context .errors
1881
1883
incremental_publisher = context .incremental_publisher
1884
+ initial_result_record = incremental_publisher .prepare_initial_result_record ()
1882
1885
build_response = context .build_response
1883
1886
try :
1884
- result = context .execute_operation ()
1887
+ result = context .execute_operation (initial_result_record )
1885
1888
1886
1889
if context .is_awaitable (result ):
1887
1890
# noinspection PyShadowingNames
1888
1891
async def await_result () -> Any :
1889
1892
try :
1893
+ errors = incremental_publisher .get_initial_errors (
1894
+ initial_result_record
1895
+ )
1890
1896
initial_result = build_response (
1891
1897
await result , # type: ignore
1892
1898
errors ,
1893
1899
)
1894
- incremental_publisher .publish_initial ()
1900
+ incremental_publisher .publish_initial (initial_result_record )
1895
1901
if incremental_publisher .has_next ():
1896
1902
return ExperimentalIncrementalExecutionResults (
1897
1903
initial_result = InitialIncrementalExecutionResult (
@@ -1902,14 +1908,17 @@ async def await_result() -> Any:
1902
1908
subsequent_results = incremental_publisher .subscribe (),
1903
1909
)
1904
1910
except GraphQLError as error :
1905
- errors .append (error )
1911
+ incremental_publisher .add_field_error (initial_result_record , error )
1912
+ errors = incremental_publisher .get_initial_errors (
1913
+ initial_result_record
1914
+ )
1906
1915
return build_response (None , errors )
1907
1916
return initial_result
1908
1917
1909
1918
return await_result ()
1910
1919
1911
- initial_result = build_response (result , errors ) # type: ignore
1912
- incremental_publisher .publish_initial ()
1920
+ initial_result = build_response (result , initial_result_record . errors ) # type: ignore
1921
+ incremental_publisher .publish_initial (initial_result_record )
1913
1922
if incremental_publisher .has_next ():
1914
1923
return ExperimentalIncrementalExecutionResults (
1915
1924
initial_result = InitialIncrementalExecutionResult (
@@ -1920,7 +1929,8 @@ async def await_result() -> Any:
1920
1929
subsequent_results = incremental_publisher .subscribe (),
1921
1930
)
1922
1931
except GraphQLError as error :
1923
- errors .append (error )
1932
+ incremental_publisher .add_field_error (initial_result_record , error )
1933
+ errors = incremental_publisher .get_initial_errors (initial_result_record )
1924
1934
return build_response (None , errors )
1925
1935
return initial_result
1926
1936
0 commit comments