Skip to content

Commit d814283

Browse files
committed
Enhance query processing with improved synchronization and logging for empty results
1 parent 485cde2 commit d814283

File tree

4 files changed

+82
-49
lines changed

4 files changed

+82
-49
lines changed

programs/local/LocalChdb.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "LocalChdb.h"
22
#include <mutex>
3+
#include "Common/logger_useful.h"
34
#include "chdb.h"
45
#include "pybind11/gil.h"
56
#include "pybind11/pytypes.h"
@@ -337,6 +338,10 @@ query_result * connection_wrapper::query(const std::string & query_str, const st
337338

338339
py::gil_scoped_release release;
339340
auto * result = query_conn(*conn, query_str.c_str(), format.c_str());
341+
if (result->len == 0)
342+
{
343+
LOG_WARNING(getLogger("CHDB"), "Empty result returned for query: {}", query_str);
344+
}
340345
if (result->error_message)
341346
{
342347
throw std::runtime_error(result->error_message);

programs/local/LocalChdb.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,11 +218,10 @@ class cursor_wrapper
218218
{
219219
if (current_result)
220220
{
221-
// The free_result_v2 vector is managed by the ClickHouse Engine
222-
// As we don't want to copy the data, so just release the memory here.
223-
// The memory will be released when the ClientBase.query_result_buf is reassigned.
224221
if (current_result->_vec)
225222
{
223+
auto * vec = reinterpret_cast<std::vector<char> *>(current_result->_vec);
224+
delete vec;
226225
current_result->_vec = nullptr;
227226
}
228227
free_result_v2(current_result);

programs/local/LocalServer.cpp

Lines changed: 69 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "chdb.h"
33

44
#include <sys/resource.h>
5+
#include "Common/Logger.h"
56
#include <Common/Config/getLocalConfigPath.h>
67
#include <Common/logger_useful.h>
78
#include <Common/formatReadable.h>
@@ -56,6 +57,7 @@
5657
#include <boost/program_options/options_description.hpp>
5758
#include <base/argsToConfig.h>
5859
#include <filesystem>
60+
#include <math.h>
5961

6062
#include "config.h"
6163

@@ -1289,7 +1291,7 @@ void free_result_v2(local_result_v2 * result)
12891291

12901292
chdb_conn ** connect_chdb(int argc, char ** argv)
12911293
{
1292-
std::lock_guard<std::mutex> glock(global_connection_mutex);
1294+
std::lock_guard<std::mutex> global_lock(global_connection_mutex);
12931295

12941296
std::string path = ":memory:"; // Default path
12951297
for (int i = 1; i < argc; i++)
@@ -1349,27 +1351,26 @@ chdb_conn ** connect_chdb(int argc, char ** argv)
13491351
auto * queue = static_cast<query_queue *>(conn->queue);
13501352
while (true)
13511353
{
1352-
auto result = std::make_unique<local_result_v2>();
1353-
try
1354+
query_request req;
13541355
{
1355-
query_request req;
1356-
{
1357-
std::unique_lock<std::mutex> lock(queue->mutex);
1358-
queue->cv.wait(lock, [queue]() { return !queue->queries.empty() || queue->shutdown; });
1359-
1360-
if (queue->shutdown && queue->queries.empty())
1361-
{
1362-
server->cleanup();
1363-
delete server;
1364-
queue->cleanup_done = true;
1365-
queue->cv.notify_all();
1366-
break;
1367-
}
1356+
std::unique_lock<std::mutex> lock(queue->mutex);
1357+
queue->query_cv.wait(lock, [queue]() { return queue->has_query || queue->shutdown; });
13681358

1369-
req = queue->queries.front();
1370-
queue->queries.pop();
1359+
if (queue->shutdown)
1360+
{
1361+
server->cleanup();
1362+
delete server;
1363+
queue->cleanup_done = true;
1364+
queue->query_cv.notify_all();
1365+
break;
13711366
}
13721367

1368+
req = queue->current_query;
1369+
}
1370+
1371+
auto result = std::make_unique<local_result_v2>();
1372+
try
1373+
{
13731374
if (!server->parseQueryTextWithOutputFormat(req.query, req.format))
13741375
{
13751376
std::string error = server->getErrorMsg();
@@ -1378,11 +1379,12 @@ chdb_conn ** connect_chdb(int argc, char ** argv)
13781379
}
13791380
else
13801381
{
1381-
auto output_span = server->getQueryOutputSpan();
1382-
if (!output_span.empty())
1382+
auto * query_output_vec = server->stealQueryOutputVector();
1383+
if (query_output_vec)
13831384
{
1384-
result->buf = output_span.data();
1385-
result->len = output_span.size();
1385+
result->_vec = query_output_vec;
1386+
result->len = query_output_vec->size();
1387+
result->buf = query_output_vec->data();
13861388
}
13871389
result->rows_read = server->getProcessedRows();
13881390
result->bytes_read = server->getProcessedBytes();
@@ -1404,9 +1406,10 @@ chdb_conn ** connect_chdb(int argc, char ** argv)
14041406

14051407
{
14061408
std::lock_guard<std::mutex> lock(queue->mutex);
1407-
queue->results.push(result.release());
1409+
queue->current_result = result.release();
1410+
queue->has_query = false;
14081411
}
1409-
queue->cv.notify_one();
1412+
queue->result_cv.notify_one();
14101413
}
14111414
}
14121415
catch (...)
@@ -1425,8 +1428,8 @@ chdb_conn ** connect_chdb(int argc, char ** argv)
14251428
// Wait for initialization to complete
14261429
{
14271430
std::unique_lock<std::mutex> init_lock(init_mutex);
1428-
init_cv.wait(init_lock, [&init_done] { return init_done; });
1429-
// If initialization failed, clean up and rethrow the exception
1431+
init_cv.wait(init_lock, [&init_done]() { return init_done; });
1432+
14301433
if (!init_success)
14311434
{
14321435
delete q_queue;
@@ -1442,11 +1445,11 @@ chdb_conn ** connect_chdb(int argc, char ** argv)
14421445

14431446
void close_conn(chdb_conn ** conn)
14441447
{
1448+
std::lock_guard<std::mutex> global_lock(global_connection_mutex);
1449+
14451450
if (!conn || !*conn)
14461451
return;
14471452

1448-
std::lock_guard<std::mutex> lock(global_connection_mutex);
1449-
14501453
if ((*conn)->connected)
14511454
{
14521455
if ((*conn)->queue)
@@ -1456,20 +1459,23 @@ void close_conn(chdb_conn ** conn)
14561459
{
14571460
std::unique_lock<std::mutex> queue_lock(queue->mutex);
14581461
queue->shutdown = true;
1459-
queue->cv.notify_all();
1462+
queue->query_cv.notify_all(); // Wake up query processing thread
1463+
queue->result_cv.notify_all(); // Wake up any waiting result threads
14601464

14611465
// Wait for server cleanup
1462-
queue->cv.wait(queue_lock, [queue] { return queue->cleanup_done; });
1466+
queue->query_cv.wait(queue_lock, [queue] { return queue->cleanup_done; });
14631467

1464-
// Clean up remaining results
1465-
while (!queue->results.empty())
1468+
// Clean up current result if any
1469+
if (queue->current_result)
14661470
{
1467-
auto * result = queue->results.front();
1468-
queue->results.pop();
1469-
free_result_v2(result);
1471+
free_result_v2(queue->current_result);
1472+
queue->current_result = nullptr;
14701473
}
14711474
}
14721475

1476+
// Mark as disconnected before deleting queue
1477+
(*conn)->connected = false;
1478+
14731479
delete queue;
14741480
(*conn)->queue = nullptr;
14751481
}
@@ -1487,29 +1493,50 @@ void close_conn(chdb_conn ** conn)
14871493

14881494
struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const char * format)
14891495
{
1496+
std::lock_guard<std::mutex> global_lock(global_connection_mutex);
1497+
14901498
if (!conn || !conn->connected || !conn->queue)
14911499
return new local_result_v2{nullptr, 0, nullptr, 0, 0, 0, nullptr};
14921500

14931501
auto * queue = static_cast<query_queue *>(conn->queue);
14941502

14951503
{
1496-
std::lock_guard<std::mutex> lock(queue->mutex);
1497-
queue->queries.push({query, format});
1504+
std::unique_lock<std::mutex> lock(queue->mutex);
1505+
// Wait until any ongoing query completes
1506+
queue->query_cv.wait(lock, [queue]() { return !queue->has_query || queue->shutdown; });
1507+
1508+
if (queue->shutdown)
1509+
{
1510+
auto * result = new local_result_v2{};
1511+
const char * error = "Connection is shutting down";
1512+
result->error_message = new char[strlen(error) + 1];
1513+
std::strcpy(result->error_message, error);
1514+
return result;
1515+
}
1516+
1517+
queue->current_query = {query, format};
1518+
queue->has_query = true;
1519+
queue->current_result = nullptr;
14981520
}
1499-
queue->cv.notify_one();
1521+
queue->query_cv.notify_one();
15001522

15011523
local_result_v2 * result = nullptr;
15021524
{
15031525
std::unique_lock<std::mutex> lock(queue->mutex);
1504-
queue->cv.wait(lock, [queue]() { return !queue->results.empty() || queue->shutdown; });
1526+
queue->result_cv.wait(lock, [queue]() { return queue->current_result != nullptr || queue->shutdown; });
15051527

1506-
if (!queue->shutdown && !queue->results.empty())
1528+
if (!queue->shutdown && queue->current_result)
15071529
{
1508-
result = queue->results.front();
1509-
queue->results.pop();
1530+
result = queue->current_result;
1531+
if (result->len == 0)
1532+
{
1533+
LOG_WARNING(getLogger("CHDB"), "Empty result returned for query: {}", query);
1534+
}
1535+
queue->current_result = nullptr;
15101536
}
15111537
}
15121538

1539+
queue->query_cv.notify_one();
15131540
if (result == nullptr)
15141541
{
15151542
result = new local_result_v2{};

programs/local/chdb.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,13 @@ struct query_request
6565
struct query_queue
6666
{
6767
std::mutex mutex;
68-
std::condition_variable cv; // Single condition variable for all synchronization
69-
std::queue<query_request> queries;
70-
std::queue<local_result_v2 *> results;
68+
std::condition_variable query_cv; // For query submission
69+
std::condition_variable result_cv;
70+
query_request current_query;
71+
local_result_v2 * current_result = nullptr;
72+
bool has_query = false;
7173
bool shutdown = false;
72-
bool cleanup_done = false; // Flag to indicate server cleanup is complete
74+
bool cleanup_done = false;
7375
};
7476
#endif
7577

0 commit comments

Comments
 (0)