Skip to content

GDB-11761: Add mongo connector batch processing #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<graphdb.version>11.0.0-TR18</graphdb.version>
<dependency.check.version>12.1.0</dependency.check.version>

<java.level>1.8</java.level>
<java.level>21</java.level>

<internal.repo>https://maven.ontotext.com/content/repositories/owlim-releases</internal.repo>
<snapshots.repo>https://maven.ontotext.com/content/repositories/owlim-snapshots</snapshots.repo>
Expand Down Expand Up @@ -127,6 +127,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.eclipse.collections</groupId>
<artifactId>eclipse-collections-api</artifactId>
<version>11.1.0</version>
</dependency>

<dependency>
<groupId>com.ontotext.graphdb</groupId>
<artifactId>graphdb-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.ontotext.trree.plugin.mongodb;

import org.eclipse.collections.api.iterator.LongIterator;
import org.eclipse.collections.api.set.primitive.MutableLongSet;
import org.eclipse.collections.api.factory.primitive.LongSets;
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.impl.LinkedHashModel;

/**
* Collects the document data during batch processing.
*
* @author <a href="mailto:borislav.bonev@ontotext.com">Borislav Bonev</a>
* @since 26/03/2025
*/
public class BatchDocumentStore {
private final MutableLongSet documentIds = LongSets.mutable.empty();
private final Model data = new LinkedHashModel();

public void addDocument(long id, Model model) {
documentIds.add(id);
data.addAll(model);
}

public Model getData() {
return data;
}

public void clear() {
documentIds.clear();
data.clear();
}

public int size() {
return documentIds.size();
}

LongIterator getIterator() {
return documentIds.longIterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,30 @@ public class MongoDBPlugin extends PluginBase implements Preprocessor, PatternIn
public static final IRI ENTITY = SimpleValueFactory.getInstance().createIRI(NAMESPACE + "entity");
public static final IRI GRAPH = SimpleValueFactory.getInstance().createIRI(NAMESPACE + "graph");
public static final IRI COLLATION = SimpleValueFactory.getInstance().createIRI(NAMESPACE + "collation");
public static final IRI BATCH = SimpleValueFactory.getInstance().createIRI(NAMESPACE + "batchSize");

protected static final String MONGODB_PROPERTIES = "mongodb.properties";

public final int MAX_BATCH_SIZE;

{
int maxBatch;
try {
maxBatch = Integer.parseInt(System.getProperty("graphdb.mongodb.maxBatchSize", "1000"));
} catch (NumberFormatException e) {
getLogger().error("Invalid graphdb.mongodb.maxBatchSize: {}", System.getProperty(
"graphdb.mongodb.maxBatchSize"));
maxBatch = 1000;
}
if (maxBatch > 10000) {
getLogger().warn("graphdb.mongodb.maxBatchSize size is too large. Max allowed is 10000");
maxBatch = 10000;
}
if (maxBatch == 0) {
getLogger().info("MongoDB batch loading is disabled");
}
MAX_BATCH_SIZE = maxBatch;
}

protected ValueFactory vf = SimpleValueFactory.getInstance();

Expand All @@ -61,6 +83,7 @@ public class MongoDBPlugin extends PluginBase implements Preprocessor, PatternIn
long graphId = 0;
long rdf_type = 0;
long collationId = 0;
long batchSize = 0;

protected long[] predicateSet;

Expand Down Expand Up @@ -158,10 +181,11 @@ public void initialize(InitReason initReason, PluginConnection pluginConnection)
graphId = entities.put(GRAPH, Scope.SYSTEM);
rdf_type = entities.resolve(RDF.TYPE);
collationId = entities.put(COLLATION, Scope.SYSTEM);
batchSize = entities.put(BATCH, Scope.SYSTEM);


predicateSet = new long[] {serviceId, databaseId, collectionId, userId, passwordId, authDbId, dropId, queryId,
projectionId, aggregationId, hintId, entityId, graphId, collationId, rdf_type};
projectionId, aggregationId, hintId, entityId, graphId, collationId, batchSize, rdf_type};
Arrays.sort(predicateSet);
}

Expand All @@ -182,6 +206,9 @@ public double estimate(long subject, long predicate, long object, long context,
if (predicate == graphId) {
return 0.35;
}
if (predicate == batchSize) {
return 0.37;
}
if (predicate == aggregationId) {
return 0.39;
}
Expand Down Expand Up @@ -402,6 +429,20 @@ public StatementIterator interpret(long subject, long predicate, long object, lo
iter.setCollation(collationString);
return iter.singletonIterator(collationId, object);
}
if (predicate == batchSize) {
if (ctx.iters == null) {
getLogger().error("iter not created yet");
return StatementIterator.EMPTY;
}
Integer batchSizeCfg = readBatchSize(object, entities);
if (batchSizeCfg == null) {
return StatementIterator.EMPTY;
}

MongoResultIterator iter = getIterator(subject, context, ctx);
iter.setDocumentsLimit(batchSizeCfg);
return iter.singletonIterator(batchSize, object);
}
if (predicate == hintId) {
String hintString = Utils.getString(entities, object);

Expand Down Expand Up @@ -460,6 +501,26 @@ public StatementIterator interpret(long subject, long predicate, long object, lo
return null;
}

private Integer readBatchSize(long object, Entities entities) {
Integer batchSizeCfg = Utils.getInteger(entities, object);
if (batchSizeCfg == null || batchSizeCfg < 0) {
getLogger().error("Invalid batch size configuration: {}",
Utils.getString(entities, object));
return null;
}
if (batchSizeCfg >= MAX_BATCH_SIZE) {
if (MAX_BATCH_SIZE == 0) {
getLogger().warn("Batch document functionality is disabled. Ignoring {} configuration.",
BATCH);
} else {
getLogger().warn("Batch size {} exceeds maximum {}. Using default size.",
Utils.getString(entities, object), MAX_BATCH_SIZE);
}
batchSizeCfg = MAX_BATCH_SIZE;
}
return batchSizeCfg;
}

private Configuration resolveConfiguration(String suffix) {
return configMap.computeIfAbsent(suffix, indexName -> {
File indexInst = new File(getDataDir(), indexName);
Expand Down Expand Up @@ -919,6 +980,26 @@ private boolean isAlreadyDelegateToSomeoneElse(ContextImpl ctx, MongoResultItera
super.setIndexId(indexId);
}

@Override public void setDocumentsLimit(int documentsLimit) {
MongoResultIterator delegate = getDelegate();
if (delegate != null) {
getDelegate().setDocumentsLimit(documentsLimit);
}
super.setDocumentsLimit(documentsLimit);
}

@Override public int getDocumentsLimit() {
int limit = super.getDocumentsLimit();
if (limit != 0) {
return limit;
}
MongoResultIterator delegate = getDelegate();
if (delegate == null) {
return 0;
}
return delegate.getDocumentsLimit();
}

@Override public long getIndexId() {
long indexId = super.getIndexId();
if (indexId != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.bson.codecs.DocumentCodec;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;
import org.eclipse.collections.api.iterator.LongIterator;
import org.eclipse.rdf4j.model.*;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.rio.*;
Expand All @@ -36,6 +37,7 @@
import java.io.StringReader;
import java.net.URI;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class MongoResultIterator extends StatementIterator {
Expand All @@ -45,20 +47,20 @@ public class MongoResultIterator extends StatementIterator {
private String query, projection, hint, database, collection;
private Collation collation;
private List<Document> aggregation = null;
private long searchSubject;
protected long searchSubject;
// custom graph id, if not present should equal to indexId
private long graphId;
// the id of the index, could be shared among multiple iterators
private long indexId;
private boolean initialized = false;
private boolean initializedByEntityIterator = false;
protected boolean initialized = false;
protected boolean initializedByEntityIterator = false;
private boolean searchDone = false;
private MongoClient client;
private MongoDatabase db;
private MongoCollection<Document> coll;
private MongoCursor<Document> iter;
private Model currentRDF;
private Entities entities;
protected MongoCursor<Document> iter;
protected Model currentRDF;
protected Entities entities;
private MongoDBPlugin plugin;
private RequestCache cache;

Expand All @@ -69,7 +71,7 @@ public class MongoResultIterator extends StatementIterator {
private boolean cloned = false;
private boolean entityIteratorCreated = false;
private boolean modelIteratorCreated = false;
private boolean interrupted = false;
protected boolean interrupted = false;
private boolean closed = false;
// if some of the query components are constructed with a function
// and set using bind the first time they are visited will be null. If we have setter with null
Expand All @@ -78,6 +80,12 @@ public class MongoResultIterator extends StatementIterator {
// set components are null (query, hint, projection, collation, aggregation)
private boolean closeable = true;

private boolean batched = false;
private boolean batchedLoading = false;
private int documentsLimit;
private BatchDocumentStore batchDocumentStore;
private LongIterator storeIterator;

public MongoResultIterator(MongoDBPlugin plugin, MongoClient client, String database, String collection, RequestCache cache, long searchsubject) {
this.cache = cache;
this.plugin = plugin;
Expand Down Expand Up @@ -156,9 +164,33 @@ protected boolean initialize() {
plugin.getLogger().error("Could not connect to mongo", ex);
throw new PluginException("Could not connect to MongoDB. Please make sure you are using correct authentication. " + ex.getMessage());
}
if (batched) {
if (iter != null && iter.hasNext()) {
batchDocumentStore = new BatchDocumentStore();
loadBatchedData();
storeIterator = batchDocumentStore.getIterator();
this.currentRDF = batchDocumentStore.getData();
}
return batchDocumentStore.size() > 0;
}
return iter != null && iter.hasNext();
}

private void loadBatchedData() {
Model[] data = new Model[1];
batchedLoading = true;
try {
while (hasSolution() && batchDocumentStore.size() <= getDocumentsLimit()) {
long docId = readNextDocument(current -> data[0] = current);
if (docId != 0) {
batchDocumentStore.addDocument(docId, data[0]);
}
}
} finally {
batchedLoading = false;
}
}

@Override
public void close() {
if (!closeable) {
Expand All @@ -184,6 +216,11 @@ public void close() {
initializedByEntityIterator = false;

IOUtils.closeQuietly((Closeable) jsonLdParserConfig.get(JSONLDSettings.DOCUMENT_LOADER));

if (batchDocumentStore != null) {
batchDocumentStore.clear();
batchDocumentStore = null;
}
}

public void setQuery(String query) {
Expand Down Expand Up @@ -228,12 +265,24 @@ public void close() {
};
}

private void advance() {
Document doc = iter.next();
protected void advance() {
if (batched) {
this.object = storeIterator.next();
} else {
long id = readNextDocument(doc -> currentRDF = doc);
if (id > 0) {
object = id;
}
}
}

if (interrupted)
return;
protected long readNextDocument(Consumer<Model> dataAccumulator) {
Document doc = iter.next();

if (interrupted) {
return 0;
}

String entity = null;
if (doc.containsKey(GRAPH)) {
Object item = doc.get(GRAPH);
Expand Down Expand Up @@ -326,17 +375,18 @@ private void advance() {
if (id == 0) {
id = entities.put(v, Scope.REQUEST);
}
this.object = id;

Object customNode = doc.get(CUSTOM_NODE);
if (customNode instanceof Document) {
for (Map.Entry<String, Object> val : ((Document) customNode).entrySet()) {
currentRDF.add(v, plugin.vf.createIRI(MongoDBPlugin.NAMESPACE_INST, val.getKey()), plugin.vf.createLiteral(val.getValue().toString()));
}
}
dataAccumulator.accept(currentRDF);
return id;
} catch (RDFParseException | UnsupportedRDFormatException | IOException e) {
iter.close();
plugin.getLogger().error("Could not parse mongo document", e);
return 0;
}
}

Expand Down Expand Up @@ -408,6 +458,9 @@ private String resolveDocumentBase(Object context, boolean allowRemoteContext) {
}

protected boolean hasSolution() {
if (batched && !batchedLoading) {
return !interrupted && storeIterator != null && storeIterator.hasNext();
}
return !interrupted && iter != null && iter.hasNext();
}

Expand Down Expand Up @@ -517,6 +570,17 @@ public void setIndexId(long indexId) {
this.indexId = indexId;
}

public void setDocumentsLimit(int documentsLimit) {
if (documentsLimit > 0) {
batched = true;
}
this.documentsLimit = documentsLimit;
}

public int getDocumentsLimit() {
return documentsLimit;
}

public long getSearchSubject() {
return searchSubject;
}
Expand Down Expand Up @@ -642,6 +706,9 @@ protected void reset() {
hint = null;
modelIteratorCreated = false;
entityIteratorCreated = false;
if (batched) {
batchDocumentStore.clear();
}
}

public boolean isCloned() {
Expand Down
Loading