Skip to content

Commit 50347d4

Browse files
committed
Add BulkResponse wrapper for improved decoding of HTTP bulk responses
CrateDB HTTP bulk responses include `rowcount=` items, either signalling if a bulk operation succeeded or failed. - success means `rowcount=1` - failure means `rowcount=-2` https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
1 parent 7cb2c68 commit 50347d4

File tree

5 files changed

+426
-284
lines changed

5 files changed

+426
-284
lines changed

CHANGES.txt

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ Unreleased
1212
"Threads may share the module, but not connections."
1313
- Added ``error_trace`` to string representation of an Error to relay
1414
server stacktraces into exception messages.
15+
- Added ``BulkResponse`` wrapper for improved decoding of CrateDB HTTP bulk
16+
responses including ``rowcount=`` items.
1517

1618
.. _Migrate from crate.client to sqlalchemy-cratedb: https://cratedb.com/docs/sqlalchemy-cratedb/migrate-from-crate-client.html
1719
.. _sqlalchemy-cratedb: https://pypi.org/project/sqlalchemy-cratedb/

src/crate/client/result.py

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import typing as t
2+
from functools import cached_property
3+
4+
5+
class BulkResultItem(t.TypedDict):
6+
"""
7+
Define the shape of a CrateDB bulk request response item.
8+
"""
9+
10+
rowcount: int
11+
12+
13+
class BulkResponse:
14+
"""
15+
Manage CrateDB bulk request responses.
16+
Accepts a list of bulk arguments (parameter list) and a list of bulk response items.
17+
18+
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
19+
"""
20+
21+
def __init__(
22+
self,
23+
records: t.Union[t.Iterable[t.Dict[str, t.Any]], None],
24+
results: t.Union[t.Iterable[BulkResultItem], None]):
25+
self.records = records
26+
self.results = results
27+
28+
@cached_property
29+
def failed_records(self) -> t.List[t.Dict[str, t.Any]]:
30+
"""
31+
Compute list of failed records.
32+
33+
CrateDB signals failed inserts using `rowcount=-2`.
34+
35+
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
36+
"""
37+
if self.records is None or self.results is None:
38+
return []
39+
errors: t.List[t.Dict[str, t.Any]] = []
40+
for record, status in zip(self.records, self.results):
41+
if status["rowcount"] == -2:
42+
errors.append(record)
43+
return errors
44+
45+
@cached_property
46+
def record_count(self) -> int:
47+
"""
48+
Compute bulk size / length of parameter list.
49+
"""
50+
if not self.records:
51+
return 0
52+
return len(self.records)
53+
54+
@cached_property
55+
def success_count(self) -> int:
56+
"""
57+
Compute number of succeeding records within a batch.
58+
"""
59+
return self.record_count - self.failed_count
60+
61+
@cached_property
62+
def failed_count(self) -> int:
63+
"""
64+
Compute number of failed records within a batch.
65+
"""
66+
return len(self.failed_records)

src/crate/client/test_result.py

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import sys
2+
import unittest
3+
4+
from crate import client
5+
from crate.client.exceptions import ProgrammingError
6+
from crate.client.test_support import setUpCrateLayerBaseline, tearDownDropEntitiesBaseline
7+
from crate.testing.settings import crate_host
8+
9+
10+
class BulkOperationTest(unittest.TestCase):
11+
12+
def setUp(self):
13+
setUpCrateLayerBaseline(self)
14+
15+
def tearDown(self):
16+
tearDownDropEntitiesBaseline(self)
17+
18+
@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
19+
def test_executemany_with_bulk_response_partial(self):
20+
21+
# Import at runtime is on purpose, to permit skipping the test case.
22+
from crate.client.result import BulkResponse
23+
24+
connection = client.connect(crate_host)
25+
cursor = connection.cursor()
26+
27+
# Run SQL DDL.
28+
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")
29+
30+
# Run a batch insert that only partially succeeds.
31+
invalid_records = [(1, "Hotzenplotz 1"), (1, "Hotzenplotz 2")]
32+
result = cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", invalid_records)
33+
34+
# Verify CrateDB response.
35+
self.assertEqual(result, [{"rowcount": 1}, {"rowcount": -2}])
36+
37+
# Verify decoded response.
38+
bulk_response = BulkResponse(invalid_records, result)
39+
self.assertEqual(bulk_response.failed_records, [(1, "Hotzenplotz 2")])
40+
self.assertEqual(bulk_response.record_count, 2)
41+
self.assertEqual(bulk_response.success_count, 1)
42+
self.assertEqual(bulk_response.failed_count, 1)
43+
44+
cursor.execute("REFRESH TABLE foobar;")
45+
cursor.execute("SELECT * FROM foobar;")
46+
result = cursor.fetchall()
47+
self.assertEqual(result, [[1, "Hotzenplotz 1"]])
48+
49+
cursor.close()
50+
connection.close()
51+
52+
@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
53+
def test_executemany_empty(self):
54+
55+
connection = client.connect(crate_host)
56+
cursor = connection.cursor()
57+
58+
# Run SQL DDL.
59+
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")
60+
61+
# Run a batch insert that is empty.
62+
with self.assertRaises(ProgrammingError) as cm:
63+
cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", [])
64+
self.assertEqual(
65+
cm.exception.message,
66+
"ArrayIndexOutOfBoundsException[Index 0 out of bounds for length 0]")
67+
68+
cursor.close()
69+
connection.close()

0 commit comments

Comments
 (0)