Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .github/workflows/node.js.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This workflow will do a clean install of node dependencies, build the source code and run tests across different versions of node
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-nodejs-with-github-actions

name: Node.js CI

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:
build:

runs-on: ubuntu-latest

strategy:
matrix:
node-version: [8.x, 10.x, 12.x, 14.x]

steps:
- uses: actions/checkout@v2
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
- run: npm install
- run: npm test
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ var MSGMAX = svmq.MSGMAX; // max message data size (hardcoded)
var id = msg.get(31337, 950);

// Push a string to the queue
msg.snd(id, new Buffer('TestString1234'), 1, (err) => {
msg.snd(id, new Buffer('TestString1234'), 1, 0, (err) => {
if (err) throw err;
});

Expand Down
29 changes: 26 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,37 @@
{
"name": "svmq",
"version": "1.0.2",
"description": "Native System V (svipc) message queues in Node.js",
"version": "1.0.7",
"description": "Native System V message queues in Node.js",
"main": "index.js",
"scripts": {
"test": "jasmine spec/**/*.js",
"install": "node-gyp rebuild"
},
"author": "",
"repository": {
"type": "git",
"url": "git+https://github.com/jhead/node-svmq.git"
},
"keywords": [
"system",
"v",
"sysv",
"mq",
"message",
"queue",
"svmq",
"ipc"
],
"author": "Justin Head <jhead@jxh.io>",
"license": "MIT",
"gypfile": true,
"bugs": {
"url": "https://github.com/jhead/node-svmq/issues"
},
"homepage": "https://github.com/jhead/node-svmq#readme",
"dependencies": {
"nan": "^2.2.0"
},
"devDependencies": {
"jasmine": "^3.6.1"
}
}
32 changes: 32 additions & 0 deletions spec/queueSpec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
const MessageQueue = require('../');

const inputs = [
Buffer.alloc(0),
Buffer.from(""),
Buffer.from("test"),
Buffer.from("test 1234 foo bar 567890"),
Buffer.from([1, 2, 3, 4]),
Buffer.from(new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8])),
Buffer.alloc(1024)
];

describe("MessageQueue", function () {

// Test a batch of diverse inputs that should be supported
inputs.forEach((input, index) =>{
const msgType = index + 1;
const selectedInput = input;

it("push and pop a message: " + selectedInput, function (done) {
const queue = new MessageQueue(31337);

queue.push(selectedInput, { type: msgType })
queue.pop({ type: msgType }, function(err, msg) {
expect(err).toBeNull();
expect(msg).toEqual(selectedInput);
done();
})
});
})

});
11 changes: 11 additions & 0 deletions spec/support/jasmine.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"spec_dir": "spec",
"spec_files": [
"**/*[sS]pec.js"
],
"helpers": [
"helpers/**/*.js"
],
"stopSpecOnExpectationFailure": false,
"random": true
}
47 changes: 27 additions & 20 deletions src/functions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@ using Nan::Callback;
using Nan::New;
using Nan::Null;

#ifndef __USE_GNU

#ifndef MSGMAX
#define MSGMAX 4056
#endif

#ifndef linux
struct msgbuf {
long mtype;
char mtext[MSGMAX - 4];
char mtext[1];
};
#endif

const size_t bsize = sizeof(struct msgbuf);

const char *ConcatString(std::string one, const char *two) {
return one.append(two).c_str();
}
Expand All @@ -44,13 +45,16 @@ class SendMessageWorker : public AsyncWorker {
~SendMessageWorker() { }

void Execute() {
msgbuf *message = new msgbuf;
message->mtype = type;
struct msgbuf* message =
(struct msgbuf*)malloc(bsize + dataLength);

message->mtype = type;
memcpy(message->mtext, data, dataLength);

ret = msgsnd(id, message, dataLength, flags);
error = errno;

free(message);
}

void HandleOKCallback () {
Expand All @@ -66,7 +70,7 @@ class SendMessageWorker : public AsyncWorker {
private:
int id;
char *data;
int dataLength;
size_t dataLength;
long type;
int flags;
int ret;
Expand All @@ -82,7 +86,8 @@ class ReceiveMessageWorker : public AsyncWorker {
~ReceiveMessageWorker() { }

void Execute() {
message = new msgbuf;
message =
(struct msgbuf*) malloc(bsize + bufferLength);

bufferLength = msgrcv(id, message, bufferLength, type, flags);
error = errno;
Expand All @@ -100,21 +105,23 @@ class ReceiveMessageWorker : public AsyncWorker {
argv[1] = Nan::CopyBuffer(message->mtext, bufferLength).ToLocalChecked();
}

free(message);

callback->Call(2, argv);
}

private:
int id;
msgbuf *message;
struct msgbuf* message;
size_t bufferLength;
long type;
int flags;
int error;
};

NAN_METHOD(GetMessageQueue) {
key_t key = (key_t) info[0]->Int32Value();
int flags = info[1]->Int32Value();
key_t key = (key_t) Nan::To<int32_t>(info[0]).FromJust();
int flags = Nan::To<int32_t>(info[1]).FromJust();

int queue = msgget(key, flags);

Expand All @@ -126,8 +133,8 @@ NAN_METHOD(GetMessageQueue) {
}

NAN_METHOD(ControlMessageQueue) {
int id = info[0]->Int32Value();
int cmd = info[1]->Int32Value();
int id = Nan::To<int32_t>(info[0]).FromJust();
int cmd = Nan::To<int32_t>(info[1]).FromJust();

msqid_ds *buf;

Expand Down Expand Up @@ -157,7 +164,7 @@ NAN_METHOD(ControlMessageQueue) {
}

NAN_METHOD(CloseMessageQueue) {
int id = info[0]->Int32Value();
int id = Nan::To<int32_t>(info[0]).FromJust();

int ret = msgctl(id, IPC_RMID, nullptr);

Expand All @@ -169,21 +176,21 @@ NAN_METHOD(CloseMessageQueue) {
}

NAN_METHOD(SendMessage) {
int id = info[0]->Int32Value();
int id = Nan::To<int32_t>(info[0]).FromJust();
char* bufferData = node::Buffer::Data(info[1]);
size_t bufferLength = (size_t) node::Buffer::Length(info[1]);
long type = info[2]->Int32Value();
int flags = info[3]->Int32Value();
long type = (long) Nan::To<int32_t>(info[2]).FromJust();
int flags = Nan::To<int32_t>(info[3]).FromJust();
Callback *callback = new Callback(info[4].As<Function>());

AsyncQueueWorker(new SendMessageWorker(callback, id, bufferData, bufferLength, type, flags));
}

NAN_METHOD(ReceiveMessage) {
int id = info[0]->Int32Value();
size_t bufferLength = (size_t) info[1]->Int32Value();
long type = info[2]->Int32Value();
int flags = info[3]->Int32Value();
int id = Nan::To<int32_t>(info[0]).FromJust();
size_t bufferLength = (size_t) Nan::To<int32_t>(info[1]).FromJust();
long type = (long) Nan::To<int32_t>(info[2]).FromJust();
int flags = Nan::To<int32_t>(info[3]).FromJust();
Callback *callback = new Callback(info[4].As<Function>());

AsyncQueueWorker(new ReceiveMessageWorker(callback, id, bufferLength, type, flags));
Expand Down