-
Notifications
You must be signed in to change notification settings - Fork 0
Adds an initial version of querying adapter with Oxigraph and Apache Jena and initial Stream Broker Adapter for RSP. #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Supporting the data stores with the querying support from SPARQL with the Typescript library. - Supports the querying with HTTP and WASM based.
Run Jena/Oxigraph adapter tests as integration suites (skip if server unavailable). Update adapters to match real endpoints and responses: - Jena: basePath getter, use GET for SPARQL, change auth encoding, unify dataset paths - Oxigraph: handle CONSTRUCT N-Triples parsing, export format param, N3 term conversion Also update examples to use RdfTermType, add selective run flags, add Fuseki config and npm run scripts, and adjust WASM/test mocks.
Introduce StreamBrokerAdapter (Kafka and MQTT) and RSPEngineAdapter to process RDF streams and publish annotated result events. Add RDF utilities (turtleStringToStore, stringToStore, storeToString, hashStringMD5) and a placeholder JanusQLParser. Update package.json to include kafkajs, mqtt, rsp-js, rdf-parse, rdf-store-stream, streamify-string and related @types.
Clean up README by removing installation and deployment sections and add license and contact information
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds an initial version of querying adapters with Oxigraph and Apache Jena, along with an initial Stream Broker Adapter for RDF Stream Processing (RSP). The implementation includes:
- HTTP-based adapters for Oxigraph and Apache Jena Fuseki with comprehensive SPARQL query support
- WASM adapter for high-performance in-memory RDF processing via Rust/Oxigraph
- Stream broker adapter supporting Kafka and MQTT protocols
- RSP engine adapter for processing RDF streams
- Complete utility layer including logging, error handling, and validation
- Rust implementation providing WASM bindings for RDF operations
- Comprehensive test suites and CI/CD pipeline configuration
Reviewed Changes
Copilot reviewed 35 out of 39 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/adapters/OxigraphAdapter.ts | HTTP adapter implementation for Oxigraph store with full SPARQL support |
| src/adapters/JenaAdapter.ts | HTTP adapter implementation for Apache Jena Fuseki with dataset management |
| src/adapters/WasmAdapter.ts | WebAssembly adapter providing direct Rust/Oxigraph integration |
| src/adapters/InMemoryAdapter.ts | Simple in-memory adapter for testing and development |
| src/rsp/RSPEngineAdapter.ts | RDF Stream Processing engine adapter with broker integration |
| src/StreamBrokerAdapter.ts | Message broker adapter supporting Kafka and MQTT |
| src/utils/validators.ts | Validation utilities for RDF terms, IRIs, and SPARQL queries |
| src/utils/logger.ts | Structured logging implementation with performance tracking |
| src/utils/errors.ts | Comprehensive error handling with typed error classes |
| src/core/types.ts | Core type definitions for RDF operations and queries |
| rust/src/*.rs | Rust implementation providing WASM bindings for RDF operations |
| package.json | Project dependencies and build scripts |
| README.md | Updated project documentation |
| ARCHITECTURE.md | Comprehensive architecture documentation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
rust/src/query.rs
Outdated
| let executor = QueryExecutor::new().unwrap(); | ||
| let result = executor.validate_query("SELECT * WHERE { ?s ?p ?o }"); | ||
| assert!(result.is_ok()); | ||
| assert_eq!(result.unwrap(), "SELECT"); |
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test validates the query but expects "SELECT" as the return value, while the function returns "VALID". This assertion will fail. Update the assertion to expect "VALID" instead.
| assert_eq!(result.unwrap(), "SELECT"); | |
| assert_eq!(result.unwrap(), "VALID"); |
rust/src/parser.rs
Outdated
| let parser = RdfParser::new(RdfFormat::Turtle); | ||
| assert_eq!(parser.format, RdfFormat::Turtle); |
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function signature expects a string parameter (&str), but the test passes an enum value. This will not compile. Pass "turtle" as a string instead.
rust/src/parser.rs
Outdated
| #[test] | ||
| fn test_serializer_creation() { | ||
| let serializer = RdfSerializer::new(RdfFormat::Turtle); | ||
| assert_eq!(serializer.format, RdfFormat::Turtle); |
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the parser test, this function expects a string parameter but receives an enum. Pass "turtle" as a string instead.
| assert_eq!(serializer.format, RdfFormat::Turtle); | |
| assert_eq!(serializer.format, "turtle"); |
rust/src/parser.rs
Outdated
| ex:Alice ex:knows ex:Bob . | ||
| "#; | ||
|
|
||
| let result = RdfSerializer::convert(turtle_data, RdfFormat::Turtle, RdfFormat::NTriples); |
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The convert function expects string parameters for formats, but enum values are passed. Change to RdfSerializer::convert(turtle_data, \"turtle\", \"ntriples\").
| let result = RdfSerializer::convert(turtle_data, RdfFormat::Turtle, RdfFormat::NTriples); | |
| let result = RdfSerializer::convert(turtle_data, "turtle", "ntriples"); |
src/rsp/RSPEngineAdapter.ts
Outdated
|
|
||
| try { | ||
| const event_store = await turtleStringToStore(message.toString()); | ||
| const event_timestamp = event_store.getQuads(null, DataFactory.namedNode("https://saref.etsi.org/core/hasTimestamp"), null, null)[0]?.object.value; |
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is overly complex and difficult to read. Consider breaking it into separate steps: first get the quads, then access the timestamp value. This would improve readability and make debugging easier.
| const event_timestamp = event_store.getQuads(null, DataFactory.namedNode("https://saref.etsi.org/core/hasTimestamp"), null, null)[0]?.object.value; | |
| const quads = event_store.getQuads( | |
| null, | |
| DataFactory.namedNode("https://saref.etsi.org/core/hasTimestamp"), | |
| null, | |
| null | |
| ); | |
| const event_timestamp = quads.length > 0 ? quads[0].object.value : undefined; |
support as currently no plan to support it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 33 out of 36 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| FROM NAMED WINDOW :liveWindow ON STREAM :accelStream [RANGE 1000 STEP 10] | ||
| FROM NAMED WINDOW :historicalSliding ON STREAM :historicalAccl [OFFSET 2000 RANGE 1000 STEP 100] | ||
| FROM NAMED WINDOW :fixedHistorical ON STREAM :historicalAccl [START 1745857573 END 1738485859] |
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The START timestamp (1745857573) is greater than the END timestamp (1738485859). This creates an invalid time range where the start is after the end. The timestamps should be corrected to have START < END.
| FROM NAMED WINDOW :fixedHistorical ON STREAM :historicalAccl [START 1745857573 END 1738485859] | |
| FROM NAMED WINDOW :fixedHistorical ON STREAM :historicalAccl [START 1738485859 END 1745857573] |
| @@ -0,0 +1,154 @@ | |||
| import { StreamBrokerAdapter } from '@/StreamBrokerAdapter'; | |||
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The import uses a path alias '@/' that points to the project root, but the referenced file is actually at 'src/StreamBrokerAdapter.ts'. Consider using a relative import '../../../StreamBrokerAdapter' or ensuring the path alias is properly configured in tsconfig.json to avoid import resolution issues.
| import { StreamBrokerAdapter } from '@/StreamBrokerAdapter'; | |
| import { StreamBrokerAdapter } from '../../StreamBrokerAdapter'; |
| import { DataFactory, Quad } from 'n3'; | ||
| import { randomUUID } from 'node:crypto'; | ||
| import { EventEmitter } from 'node:stream'; | ||
| import { turtleStringToStore } from '@/utils/Util'; |
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The import uses a path alias '@/utils/Util' but should use a relative path '../../utils/Util' or ensure the path alias is correctly configured. Additionally, consider using consistent import patterns throughout the codebase.
| import { turtleStringToStore } from '@/utils/Util'; | |
| import { turtleStringToStore } from '../../utils/Util'; |
| if (term.language && term.datatype) { | ||
| return false; // Cannot have both language and datatype | ||
| } | ||
| if (term.language && !/^[a-z]{2,3}(-[A-Z]{2})?$/.test(term.language)) { |
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The language tag validation regex is too restrictive and doesn't comply with BCP 47 (RFC 5646) language tag format. Valid language tags can include subtags like 'en-US-x-private', 'zh-Hans-CN', or 'sr-Latn-RS'. Consider using a more comprehensive regex or a dedicated language tag validation library.
|
|
||
| FROM NAMED WINDOW :liveWindow ON STREAM :accelStream [RANGE 1000 STEP 10] | ||
| FROM NAMED WINDOW :historicalSliding ON STREAM :historicalAccl [OFFSET 2000 RANGE 1000 STEP 100] | ||
| FROM NAMED WINDOW :fixedHistorical ON STREAM :historicalAccl [START 1745857573 END 1738485859] |
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The START timestamp (1745857573) is greater than the END timestamp (1738485859). This creates an invalid time range. The timestamps should be swapped or corrected to ensure START < END.
| FROM NAMED WINDOW :fixedHistorical ON STREAM :historicalAccl [START 1745857573 END 1738485859] | |
| FROM NAMED WINDOW :fixedHistorical ON STREAM :historicalAccl [START 1738485859 END 1745857573] |
| async publish(topic: string, payload: any, callback: (error: any) => void): Promise<void> { | ||
| try { | ||
| if (this.type === 'kafka' && this.kafkaProducer) { | ||
| await this.kafkaProducer.send({ | ||
| topic, | ||
| messages: [{ value: JSON.stringify(payload) }], | ||
| }); | ||
| callback(null); | ||
| } else if (this.type === 'mqtt' && this.mqttClient) { | ||
| this.mqttClient.publish(topic, JSON.stringify(payload), { qos: 0 }, (err?: Error) => { | ||
| callback(err); | ||
| }); | ||
| } else { | ||
| callback(new Error('Unsupported broker type or not connected')); | ||
| } | ||
| } catch (error) { | ||
| callback(error); |
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using callback-based error handling in an async function is inconsistent. Consider either using the Promise rejection mechanism (throwing errors) or using only callbacks, but not mixing both patterns. This would make error handling more predictable for API consumers.
| async publish(topic: string, payload: any, callback: (error: any) => void): Promise<void> { | |
| try { | |
| if (this.type === 'kafka' && this.kafkaProducer) { | |
| await this.kafkaProducer.send({ | |
| topic, | |
| messages: [{ value: JSON.stringify(payload) }], | |
| }); | |
| callback(null); | |
| } else if (this.type === 'mqtt' && this.mqttClient) { | |
| this.mqttClient.publish(topic, JSON.stringify(payload), { qos: 0 }, (err?: Error) => { | |
| callback(err); | |
| }); | |
| } else { | |
| callback(new Error('Unsupported broker type or not connected')); | |
| } | |
| } catch (error) { | |
| callback(error); | |
| async publish(topic: string, payload: any): Promise<void> { | |
| if (this.type === 'kafka' && this.kafkaProducer) { | |
| await this.kafkaProducer.send({ | |
| topic, | |
| messages: [{ value: JSON.stringify(payload) }], | |
| }); | |
| } else if (this.type === 'mqtt' && this.mqttClient) { | |
| await new Promise<void>((resolve, reject) => { | |
| this.mqttClient!.publish(topic, JSON.stringify(payload), { qos: 0 }, (err?: Error) => { | |
| if (err) { | |
| reject(err); | |
| } else { | |
| resolve(); | |
| } | |
| }); | |
| }); | |
| } else { | |
| throw new Error('Unsupported broker type or not connected'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 34 out of 37 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Query a specific time range in historical data using absolute timestamps. | ||
|
|
||
| ```sparql | ||
| FROM NAMED WINDOW :fixedHistorical ON STREAM :historicalAccl [START 1745857573 END 1738485859] |
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The START timestamp (1745857573) is greater than the END timestamp (1738485859), creating an invalid time range. This should be corrected to show a valid example where START < END.
| FROM NAMED WINDOW :fixedHistorical ON STREAM :historicalAccl [START 1745857573 END 1738485859] | |
| FROM NAMED WINDOW :fixedHistorical ON STREAM :historicalAccl [START 1738485859 END 1745857573] |
| } | ||
| } | ||
|
|
||
| async publish(topic: string, payload: any, callback: (error: any) => void): Promise<void> { |
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace any type for payload and error parameters with more specific types. Consider using string | Buffer for payload and Error | null for the error callback parameter to improve type safety.
| }); | ||
| callback(null); | ||
| } else if (this.type === 'mqtt' && this.mqttClient) { | ||
| this.mqttClient.publish(topic, JSON.stringify(payload), { qos: 0 }, (err?: Error) => { |
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The payload is being JSON.stringified in the MQTT path but not in the Kafka path (line 48). This inconsistency could lead to different data formats being published. Consider applying consistent serialization across both broker types or documenting this intentional difference.
No description provided.