From 01504ce59f021327b03e627eda397bd9dd360e67 Mon Sep 17 00:00:00 2001
From: Borislav-Bonev-Ontotext
<111446237+Borislav-Bonev-Ontotext@users.noreply.github.com>
Date: Wed, 26 Mar 2025 22:45:24 +0200
Subject: [PATCH 1/2] Change plugin Java compatibility to Java 21 to match GDB
11
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index b6c5d8c..eedf523 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,7 +12,7 @@
11.0.0-TR18
12.1.0
- 1.8
+ 21
https://maven.ontotext.com/content/repositories/owlim-releases
https://maven.ontotext.com/content/repositories/owlim-snapshots
From db2d5fb349212f40c694323b97028caa0d24fb8f Mon Sep 17 00:00:00 2001
From: Borislav-Bonev-Ontotext
<111446237+Borislav-Bonev-Ontotext@users.noreply.github.com>
Date: Wed, 26 Mar 2025 22:20:03 +0200
Subject: [PATCH 2/2] Add simple batch document processing.
Added functionality where all documents matching the input query, up to a maximum are loaded in memory and returned as named graph collection instead of processing one by one.
When the new functionality is used the plugin is no longer streaming but will require a buffer to store the documents until the query is complete.
Added system property configuration that controls the maximum allowed batch size: graphdb.mongodb.maxBatchSize. This is to prevent OOM problems.
---
pom.xml | 6 +
.../plugin/mongodb/BatchDocumentStore.java | 40 +++++++
.../trree/plugin/mongodb/MongoDBPlugin.java | 83 ++++++++++++-
.../plugin/mongodb/MongoResultIterator.java | 93 +++++++++++++--
.../mongodb/AbstractMongoBasicTest.java | 3 +
.../TestPluginMongoBatchedQueries.java | 112 ++++++++++++++++++
.../file1.jsonld | 22 ++++
.../file2.jsonld | 27 +++++
.../file3.jsonld | 27 +++++
...shouldWorkWithUNION_customGraphs_example_2 | 2 +
.../testBadConfig | 0
.../testResultsAreJoined | 2 +
.../testResultsAreJoined_inverseDef | 2 +
13 files changed, 405 insertions(+), 14 deletions(-)
create mode 100644 src/main/java/com/ontotext/trree/plugin/mongodb/BatchDocumentStore.java
create mode 100644 src/test/java/com/ontotext/trree/plugin/mongodb/TestPluginMongoBatchedQueries.java
create mode 100644 src/test/resources/mongodb/input/TestPluginMongoBatchedQueries/file1.jsonld
create mode 100644 src/test/resources/mongodb/input/TestPluginMongoBatchedQueries/file2.jsonld
create mode 100644 src/test/resources/mongodb/input/TestPluginMongoBatchedQueries/file3.jsonld
create mode 100644 src/test/resources/mongodb/results/TestPluginMongoBasicQueries/shouldWorkWithUNION_customGraphs_example_2
create mode 100644 src/test/resources/mongodb/results/TestPluginMongoBatchedQueries/testBadConfig
create mode 100644 src/test/resources/mongodb/results/TestPluginMongoBatchedQueries/testResultsAreJoined
create mode 100644 src/test/resources/mongodb/results/TestPluginMongoBatchedQueries/testResultsAreJoined_inverseDef
diff --git a/pom.xml b/pom.xml
index eedf523..4b27e99 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,6 +127,12 @@
provided
+
+ org.eclipse.collections
+ eclipse-collections-api
+ 11.1.0
+
+
com.ontotext.graphdb
graphdb-runtime
diff --git a/src/main/java/com/ontotext/trree/plugin/mongodb/BatchDocumentStore.java b/src/main/java/com/ontotext/trree/plugin/mongodb/BatchDocumentStore.java
new file mode 100644
index 0000000..772c9e5
--- /dev/null
+++ b/src/main/java/com/ontotext/trree/plugin/mongodb/BatchDocumentStore.java
@@ -0,0 +1,40 @@
+package com.ontotext.trree.plugin.mongodb;
+
+import org.eclipse.collections.api.iterator.LongIterator;
+import org.eclipse.collections.api.set.primitive.MutableLongSet;
+import org.eclipse.collections.api.factory.primitive.LongSets;
+import org.eclipse.rdf4j.model.Model;
+import org.eclipse.rdf4j.model.impl.LinkedHashModel;
+
+/**
+ * Collects the document data during batch processing.
+ *
+ * @author Borislav Bonev
+ * @since 26/03/2025
+ */
+public class BatchDocumentStore {
+ private final MutableLongSet documentIds = LongSets.mutable.empty();
+ private final Model data = new LinkedHashModel();
+
+ public void addDocument(long id, Model model) {
+ documentIds.add(id);
+ data.addAll(model);
+ }
+
+ public Model getData() {
+ return data;
+ }
+
+ public void clear() {
+ documentIds.clear();
+ data.clear();
+ }
+
+ public int size() {
+ return documentIds.size();
+ }
+
+ LongIterator getIterator() {
+ return documentIds.longIterator();
+ }
+}
diff --git a/src/main/java/com/ontotext/trree/plugin/mongodb/MongoDBPlugin.java b/src/main/java/com/ontotext/trree/plugin/mongodb/MongoDBPlugin.java
index d84e990..4208ae7 100644
--- a/src/main/java/com/ontotext/trree/plugin/mongodb/MongoDBPlugin.java
+++ b/src/main/java/com/ontotext/trree/plugin/mongodb/MongoDBPlugin.java
@@ -45,8 +45,30 @@ public class MongoDBPlugin extends PluginBase implements Preprocessor, PatternIn
public static final IRI ENTITY = SimpleValueFactory.getInstance().createIRI(NAMESPACE + "entity");
public static final IRI GRAPH = SimpleValueFactory.getInstance().createIRI(NAMESPACE + "graph");
public static final IRI COLLATION = SimpleValueFactory.getInstance().createIRI(NAMESPACE + "collation");
+ public static final IRI BATCH = SimpleValueFactory.getInstance().createIRI(NAMESPACE + "batchSize");
protected static final String MONGODB_PROPERTIES = "mongodb.properties";
+
+ public final int MAX_BATCH_SIZE;
+
+ {
+ int maxBatch;
+ try {
+ maxBatch = Integer.parseInt(System.getProperty("graphdb.mongodb.maxBatchSize", "1000"));
+ } catch (NumberFormatException e) {
+ getLogger().error("Invalid graphdb.mongodb.maxBatchSize: {}", System.getProperty(
+ "graphdb.mongodb.maxBatchSize"));
+ maxBatch = 1000;
+ }
+ if (maxBatch > 10000) {
+ getLogger().warn("graphdb.mongodb.maxBatchSize size is too large. Max allowed is 10000");
+ maxBatch = 10000;
+ }
+ if (maxBatch == 0) {
+ getLogger().info("MongoDB batch loading is disabled");
+ }
+ MAX_BATCH_SIZE = maxBatch;
+ }
protected ValueFactory vf = SimpleValueFactory.getInstance();
@@ -61,6 +83,7 @@ public class MongoDBPlugin extends PluginBase implements Preprocessor, PatternIn
long graphId = 0;
long rdf_type = 0;
long collationId = 0;
+ long batchSize = 0;
protected long[] predicateSet;
@@ -158,10 +181,11 @@ public void initialize(InitReason initReason, PluginConnection pluginConnection)
graphId = entities.put(GRAPH, Scope.SYSTEM);
rdf_type = entities.resolve(RDF.TYPE);
collationId = entities.put(COLLATION, Scope.SYSTEM);
+ batchSize = entities.put(BATCH, Scope.SYSTEM);
predicateSet = new long[] {serviceId, databaseId, collectionId, userId, passwordId, authDbId, dropId, queryId,
- projectionId, aggregationId, hintId, entityId, graphId, collationId, rdf_type};
+ projectionId, aggregationId, hintId, entityId, graphId, collationId, batchSize, rdf_type};
Arrays.sort(predicateSet);
}
@@ -182,6 +206,9 @@ public double estimate(long subject, long predicate, long object, long context,
if (predicate == graphId) {
return 0.35;
}
+ if (predicate == batchSize) {
+ return 0.37;
+ }
if (predicate == aggregationId) {
return 0.39;
}
@@ -402,6 +429,20 @@ public StatementIterator interpret(long subject, long predicate, long object, lo
iter.setCollation(collationString);
return iter.singletonIterator(collationId, object);
}
+ if (predicate == batchSize) {
+ if (ctx.iters == null) {
+ getLogger().error("iter not created yet");
+ return StatementIterator.EMPTY;
+ }
+ Integer batchSizeCfg = readBatchSize(object, entities);
+ if (batchSizeCfg == null) {
+ return StatementIterator.EMPTY;
+ }
+
+ MongoResultIterator iter = getIterator(subject, context, ctx);
+ iter.setDocumentsLimit(batchSizeCfg);
+ return iter.singletonIterator(batchSize, object);
+ }
if (predicate == hintId) {
String hintString = Utils.getString(entities, object);
@@ -460,6 +501,26 @@ public StatementIterator interpret(long subject, long predicate, long object, lo
return null;
}
+ private Integer readBatchSize(long object, Entities entities) {
+ Integer batchSizeCfg = Utils.getInteger(entities, object);
+ if (batchSizeCfg == null || batchSizeCfg < 0) {
+ getLogger().error("Invalid batch size configuration: {}",
+ Utils.getString(entities, object));
+ return null;
+ }
+ if (batchSizeCfg >= MAX_BATCH_SIZE) {
+ if (MAX_BATCH_SIZE == 0) {
+ getLogger().warn("Batch document functionality is disabled. Ignoring {} configuration.",
+ BATCH);
+ } else {
+ getLogger().warn("Batch size {} exceeds maximum {}. Using default size.",
+ Utils.getString(entities, object), MAX_BATCH_SIZE);
+ }
+ batchSizeCfg = MAX_BATCH_SIZE;
+ }
+ return batchSizeCfg;
+ }
+
private Configuration resolveConfiguration(String suffix) {
return configMap.computeIfAbsent(suffix, indexName -> {
File indexInst = new File(getDataDir(), indexName);
@@ -919,6 +980,26 @@ private boolean isAlreadyDelegateToSomeoneElse(ContextImpl ctx, MongoResultItera
super.setIndexId(indexId);
}
+ @Override public void setDocumentsLimit(int documentsLimit) {
+ MongoResultIterator delegate = getDelegate();
+ if (delegate != null) {
+ getDelegate().setDocumentsLimit(documentsLimit);
+ }
+ super.setDocumentsLimit(documentsLimit);
+ }
+
+ @Override public int getDocumentsLimit() {
+ int limit = super.getDocumentsLimit();
+ if (limit != 0) {
+ return limit;
+ }
+ MongoResultIterator delegate = getDelegate();
+ if (delegate == null) {
+ return 0;
+ }
+ return delegate.getDocumentsLimit();
+ }
+
@Override public long getIndexId() {
long indexId = super.getIndexId();
if (indexId != 0) {
diff --git a/src/main/java/com/ontotext/trree/plugin/mongodb/MongoResultIterator.java b/src/main/java/com/ontotext/trree/plugin/mongodb/MongoResultIterator.java
index f7d4227..9477eec 100644
--- a/src/main/java/com/ontotext/trree/plugin/mongodb/MongoResultIterator.java
+++ b/src/main/java/com/ontotext/trree/plugin/mongodb/MongoResultIterator.java
@@ -22,6 +22,7 @@
import org.bson.codecs.DocumentCodec;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;
+import org.eclipse.collections.api.iterator.LongIterator;
import org.eclipse.rdf4j.model.*;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.rio.*;
@@ -36,6 +37,7 @@
import java.io.StringReader;
import java.net.URI;
import java.util.*;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
public class MongoResultIterator extends StatementIterator {
@@ -45,20 +47,20 @@ public class MongoResultIterator extends StatementIterator {
private String query, projection, hint, database, collection;
private Collation collation;
private List aggregation = null;
- private long searchSubject;
+ protected long searchSubject;
// custom graph id, if not present should equal to indexId
private long graphId;
// the id of the index, could be shared among multiple iterators
private long indexId;
- private boolean initialized = false;
- private boolean initializedByEntityIterator = false;
+ protected boolean initialized = false;
+ protected boolean initializedByEntityIterator = false;
private boolean searchDone = false;
private MongoClient client;
private MongoDatabase db;
private MongoCollection coll;
- private MongoCursor iter;
- private Model currentRDF;
- private Entities entities;
+ protected MongoCursor iter;
+ protected Model currentRDF;
+ protected Entities entities;
private MongoDBPlugin plugin;
private RequestCache cache;
@@ -69,7 +71,7 @@ public class MongoResultIterator extends StatementIterator {
private boolean cloned = false;
private boolean entityIteratorCreated = false;
private boolean modelIteratorCreated = false;
- private boolean interrupted = false;
+ protected boolean interrupted = false;
private boolean closed = false;
// if some of the query components are constructed with a function
// and set using bind the first time they are visited will be null. If we have setter with null
@@ -78,6 +80,12 @@ public class MongoResultIterator extends StatementIterator {
// set components are null (query, hint, projection, collation, aggregation)
private boolean closeable = true;
+ private boolean batched = false;
+ private boolean batchedLoading = false;
+ private int documentsLimit;
+ private BatchDocumentStore batchDocumentStore;
+ private LongIterator storeIterator;
+
public MongoResultIterator(MongoDBPlugin plugin, MongoClient client, String database, String collection, RequestCache cache, long searchsubject) {
this.cache = cache;
this.plugin = plugin;
@@ -156,9 +164,33 @@ protected boolean initialize() {
plugin.getLogger().error("Could not connect to mongo", ex);
throw new PluginException("Could not connect to MongoDB. Please make sure you are using correct authentication. " + ex.getMessage());
}
+ if (batched) {
+ if (iter != null && iter.hasNext()) {
+ batchDocumentStore = new BatchDocumentStore();
+ loadBatchedData();
+ storeIterator = batchDocumentStore.getIterator();
+ this.currentRDF = batchDocumentStore.getData();
+ }
+ return batchDocumentStore.size() > 0;
+ }
return iter != null && iter.hasNext();
}
+ private void loadBatchedData() {
+ Model[] data = new Model[1];
+ batchedLoading = true;
+ try {
+ while (hasSolution() && batchDocumentStore.size() <= getDocumentsLimit()) {
+ long docId = readNextDocument(current -> data[0] = current);
+ if (docId != 0) {
+ batchDocumentStore.addDocument(docId, data[0]);
+ }
+ }
+ } finally {
+ batchedLoading = false;
+ }
+ }
+
@Override
public void close() {
if (!closeable) {
@@ -184,6 +216,11 @@ public void close() {
initializedByEntityIterator = false;
IOUtils.closeQuietly((Closeable) jsonLdParserConfig.get(JSONLDSettings.DOCUMENT_LOADER));
+
+ if (batchDocumentStore != null) {
+ batchDocumentStore.clear();
+ batchDocumentStore = null;
+ }
}
public void setQuery(String query) {
@@ -228,12 +265,24 @@ public void close() {
};
}
- private void advance() {
- Document doc = iter.next();
+ protected void advance() {
+ if (batched) {
+ this.object = storeIterator.next();
+ } else {
+ long id = readNextDocument(doc -> currentRDF = doc);
+ if (id > 0) {
+ object = id;
+ }
+ }
+ }
- if (interrupted)
- return;
+ protected long readNextDocument(Consumer dataAccumulator) {
+ Document doc = iter.next();
+ if (interrupted) {
+ return 0;
+ }
+
String entity = null;
if (doc.containsKey(GRAPH)) {
Object item = doc.get(GRAPH);
@@ -326,17 +375,18 @@ private void advance() {
if (id == 0) {
id = entities.put(v, Scope.REQUEST);
}
- this.object = id;
-
Object customNode = doc.get(CUSTOM_NODE);
if (customNode instanceof Document) {
for (Map.Entry val : ((Document) customNode).entrySet()) {
currentRDF.add(v, plugin.vf.createIRI(MongoDBPlugin.NAMESPACE_INST, val.getKey()), plugin.vf.createLiteral(val.getValue().toString()));
}
}
+ dataAccumulator.accept(currentRDF);
+ return id;
} catch (RDFParseException | UnsupportedRDFormatException | IOException e) {
iter.close();
plugin.getLogger().error("Could not parse mongo document", e);
+ return 0;
}
}
@@ -408,6 +458,9 @@ private String resolveDocumentBase(Object context, boolean allowRemoteContext) {
}
protected boolean hasSolution() {
+ if (batched && !batchedLoading) {
+ return !interrupted && storeIterator != null && storeIterator.hasNext();
+ }
return !interrupted && iter != null && iter.hasNext();
}
@@ -517,6 +570,17 @@ public void setIndexId(long indexId) {
this.indexId = indexId;
}
+ public void setDocumentsLimit(int documentsLimit) {
+ if (documentsLimit > 0) {
+ batched = true;
+ }
+ this.documentsLimit = documentsLimit;
+ }
+
+ public int getDocumentsLimit() {
+ return documentsLimit;
+ }
+
public long getSearchSubject() {
return searchSubject;
}
@@ -642,6 +706,9 @@ protected void reset() {
hint = null;
modelIteratorCreated = false;
entityIteratorCreated = false;
+ if (batched) {
+ batchDocumentStore.clear();
+ }
}
public boolean isCloned() {
diff --git a/src/test/java/com/ontotext/trree/plugin/mongodb/AbstractMongoBasicTest.java b/src/test/java/com/ontotext/trree/plugin/mongodb/AbstractMongoBasicTest.java
index f6211fa..022949e 100644
--- a/src/test/java/com/ontotext/trree/plugin/mongodb/AbstractMongoBasicTest.java
+++ b/src/test/java/com/ontotext/trree/plugin/mongodb/AbstractMongoBasicTest.java
@@ -277,6 +277,9 @@ protected void addMongoDates() {
for (Document doc : documents) {
ObjectId id = doc.getObjectId("_id");
List