Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions contrib/plugins/helloworld.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,15 @@ def init(options, configuration, plugin):
plugin.log("Plugin helloworld.py initialized")


@plugin.subscribe("connect")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an awesome API!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Flask was a huge inspiration.

def on_connect(id, address, plugin):
plugin.log("Received connect event for peer {}".format(id))


@plugin.subscribe("disconnect")
def on_disconnect(id, plugin):
plugin.log("Received disconnect event for peer {}".format(id))


plugin.add_option('greeting', 'Hello', 'The greeting I should use.')
plugin.run()
106 changes: 80 additions & 26 deletions contrib/pylightning/lightning/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import json
import inspect
import re
import traceback


Expand All @@ -18,6 +19,9 @@ def __init__(self, stdout=None, stdin=None, autopatch=True):
self.methods = {}
self.options = {}

# A dict from topics to handler functions
self.subscriptions = {}

if not stdout:
self.stdout = sys.stdout
if not stdin:
Expand Down Expand Up @@ -60,6 +64,33 @@ def add_method(self, name, func):
# Register the function with the name
self.methods[name] = func

def add_subscription(self, topic, func):
"""Add a subscription to our list of subscriptions.

A subscription is an association between a topic and a handler
function. Adding a subscription means that we will
automatically subscribe to events from that topic with
`lightningd` and, upon receiving a matching notification, we
will call the associated handler. Notice that in order for the
automatic subscriptions to work, the handlers need to be
registered before we send our manifest, hence before
`Plugin.run` is called.

"""
if topic in self.subscriptions:
raise ValueError(
"Topic {} already has a handler".format(topic)
)
self.subscriptions[topic] = func

def subscribe(self, topic):
"""Function decorator to register a notification handler.
"""
def decorator(f):
self.add_subscription(topic, f)
return f
return decorator

def add_option(self, name, default, description):
"""Add an option that we'd like to register with lightningd.

Expand Down Expand Up @@ -98,17 +129,11 @@ def decorator(f):
return f
return decorator

def _dispatch(self, request):
name = request['method']
def _exec_func(self, func, request):
params = request['params']

if name not in self.methods:
raise ValueError("No method {} found.".format(name))

args = params.copy() if isinstance(params, list) else []
kwargs = params.copy() if isinstance(params, dict) else {}

func = self.methods[name]
sig = inspect.signature(func)

if 'plugin' in sig.parameters:
Expand All @@ -121,6 +146,43 @@ def _dispatch(self, request):
ba.apply_defaults()
return func(*ba.args, **ba.kwargs)

def _dispatch_request(self, request):
name = request['method']

if name not in self.methods:
raise ValueError("No method {} found.".format(name))
func = self.methods[name]

try:
result = {
'jsonrpc': '2.0',
'id': request['id'],
'result': self._exec_func(func, request)
}
except Exception as e:
result = {
'jsonrpc': '2.0',
'id': request['id'],
"error": "Error while processing {}: {}".format(
request['method'], repr(e)
),
}
self.log(traceback.format_exc())
json.dump(result, fp=self.stdout)
self.stdout.write('\n\n')
self.stdout.flush()

def _dispatch_notification(self, request):
name = request['method']
if name not in self.subscriptions:
raise ValueError("No subscription for {} found.".format(name))
func = self.subscriptions[name]

try:
self._exec_func(func, request)
except Exception as _:
self.log(traceback.format_exc())

def notify(self, method, params):
payload = {
'jsonrpc': '2.0',
Expand All @@ -145,24 +207,14 @@ def _multi_dispatch(self, msgs):
for payload in msgs[:-1]:
request = json.loads(payload)

try:
result = {
"jsonrpc": "2.0",
"result": self._dispatch(request),
"id": request['id']
}
except Exception as e:
result = {
"jsonrpc": "2.0",
"error": "Error while processing {}".format(
request['method']
),
"id": request['id']
}
self.log(traceback.format_exc())
json.dump(result, fp=self.stdout)
self.stdout.write('\n\n')
self.stdout.flush()
# If this has an 'id'-field, it's a request and returns a
# result. Otherwise it's a notification and it doesn't
# return anything.
if 'id' in request:
self._dispatch_request(request)
else:
self._dispatch_notification(request)

return msgs[-1]

def run(self):
Expand Down Expand Up @@ -190,6 +242,7 @@ def _getmanifest(self):
continue

doc = inspect.getdoc(func)
doc = re.sub('\n+', ' ', doc)
if not doc:
self.log(
'RPC method \'{}\' does not have a docstring.'.format(name)
Expand All @@ -204,6 +257,7 @@ def _getmanifest(self):
return {
'options': list(self.options.values()),
'rpcmethods': methods,
'subscriptions': list(self.subscriptions.keys()),
}

def _init(self, options, configuration, request):
Expand All @@ -217,7 +271,7 @@ def _init(self, options, configuration, request):
if self.init:
self.methods['init'] = self.init
self.init = None
return self._dispatch(request)
return self._exec_func(self.methods['init'], request)
return None


Expand Down
102 changes: 99 additions & 3 deletions doc/plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ this example:
"description": "Returns the current time in {timezone}",
"long_description": "Returns the current time in the timezone that is given as the only parameter.\nThis description may be quite long and is allowed to span multiple lines."
}
],
"subscriptions": [
"connect",
"disconnect"
]
}
```
Expand Down Expand Up @@ -109,8 +113,12 @@ simple JSON object containing the options:

```json
{
"objects": {
"options": {
"greeting": "World"
},
"configuration": {
"lightning-dir": "/home/user/.lightning",
"rpc-file": "lightning-rpc"
}
}
```
Expand All @@ -120,10 +128,98 @@ arbitrary and will currently be discarded by `lightningd`. JSON-RPC
commands were chosen over notifications in order not to force plugins
to implement notifications which are not that well supported.

## Event stream subscriptions
## JSON-RPC passthrough

*TBD*
Plugins may register their own JSON-RPC methods that are exposed
through the JSON-RPC provided by `lightningd`. This provides users
with a single interface to interact with, while allowing the addition
of custom methods without having to modify the daemon itself.

JSON-RPC methods are registered as part of the `getmanifest`
result. Each registered method must provide a `name` and a
`description`. An optional `long_description` may also be
provided. This information is then added to the internal dispatch
table, and used to return the help text when using `lightning-cli
help`, and the methods can be called using the `name`.

For example the above `getmanifest` result will register two methods,
called `hello` and `gettime`:

```json
...
"rpcmethods": [
{
"name": "hello",
"description": "Returns a personalized greeting for {greeting} (set via options)."
},
{
"name": "gettime",
"description": "Returns the current time in {timezone}",
"long_description": "Returns the current time in the timezone that is given as the only parameter.\nThis description may be quite long and is allowed to span multiple lines."
}
],
...
```

The RPC call will be passed through unmodified, with the exception of
the JSON-RPC call `id`, which is internally remapped to a unique
integer instead, in order to avoid collisions. When passing the result
back the `id` field is restored to its original value.

## Event notifications

Event notifications allow a plugin to subscribe to events in
`lightningd`. `lightningd` will then send a push notification if an
event matching the subscription occurred. A notification is defined in
the JSON-RPC [specification][jsonrpc-spec] as an RPC call that does
not include an `id` parameter:

> A Notification is a Request object without an "id" member. A Request
> object that is a Notification signifies the Client's lack of
> interest in the corresponding Response object, and as such no
> Response object needs to be returned to the client. The Server MUST
> NOT reply to a Notification, including those that are within a batch
> request.
>
> Notifications are not confirmable by definition, since they do not
> have a Response object to be returned. As such, the Client would not
> be aware of any errors (like e.g. "Invalid params","Internal
> error").

Plugins subscribe by returning an array of subscriptions as part of
the `getmanifest` response. The result for the `getmanifest` call
above for example subscribes to the two topics `connect` and
`disconnect`. The topics that are currently defined and the
corresponding payloads are listed below.

### Notification Types

#### `connect`

A notification for topic `connect` is sent every time a new connection
to a peer is established.

```json
{
"id": "02f6725f9c1c40333b67faea92fd211c183050f28df32cac3f9d69685fe9665432",
"address": "1.2.3.4"
}
```

#### `disconnect`

A notification for topic `disconnect` is sent every time a connection
to a peer was lost.

```json
{
"id": "02f6725f9c1c40333b67faea92fd211c183050f28df32cac3f9d69685fe9665432"
}
```
## Hooks

*TBD*


[jsonrpc-spec]: https://www.jsonrpc.org/specification
[jsonrpc-notification-spec]: https://www.jsonrpc.org/specification#notification
1 change: 1 addition & 0 deletions lightningd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ LIGHTNINGD_SRC := \
lightningd/log.c \
lightningd/log_status.c \
lightningd/memdump.c \
lightningd/notification.c \
lightningd/onchain_control.c \
lightningd/opening_control.c \
lightningd/options.c \
Expand Down
11 changes: 11 additions & 0 deletions lightningd/json_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ struct json_stream *new_json_stream(const tal_t *ctx, struct command *writer)
return js;
}

struct json_stream *json_stream_dup(const tal_t *ctx, struct json_stream *original)
{
size_t num_elems = membuf_num_elems(&original->outbuf);
char *elems = membuf_elems(&original->outbuf);
struct json_stream *js = tal_dup(ctx, struct json_stream, original);
membuf_init(&js->outbuf, tal_dup_arr(js, char, elems, num_elems, 0),
num_elems, membuf_tal_realloc);
membuf_added(&js->outbuf, num_elems);
return js;
}

bool json_stream_still_writing(const struct json_stream *js)
{
return js->writer != NULL;
Expand Down
13 changes: 13 additions & 0 deletions lightningd/json_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ struct json_stream;
*/
struct json_stream *new_json_stream(const tal_t *ctx, struct command *writer);

/**
* Duplicate an existing stream.
*
* Mostly useful when we want to send copies of a given stream to
* multiple recipients, that might read at different speeds from the
* stream. For example this is used when construcing a single
* notification and then duplicating it for the fanout.
*
* @ctx: tal context for allocation.
* @original: the stream to duplicate.
*/
struct json_stream *json_stream_dup(const tal_t *ctx, struct json_stream *original);

/**
* json_stream_close - finished writing to a JSON stream.
* @js: the json_stream.
Expand Down
20 changes: 20 additions & 0 deletions lightningd/jsonrpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,26 @@ static struct command_result *param_command(struct command *cmd,
tok->end - tok->start, buffer + tok->start);
}

struct jsonrpc_notification *jsonrpc_notification_start(const tal_t *ctx, const char *method)
{
struct jsonrpc_notification *n = tal(ctx, struct jsonrpc_notification);
n->method = tal_strdup(n, method);
n->stream = new_json_stream(n, NULL);
json_object_start(n->stream, NULL);
json_add_string(n->stream, "jsonrpc", "2.0");
json_add_string(n->stream, "method", method);
json_object_start(n->stream, "params");

return n;
}

void jsonrpc_notification_end(struct jsonrpc_notification *n)
{
json_object_end(n->stream); /* closes '.params' */
json_object_end(n->stream); /* closes '.' */
json_stream_append(n->stream, "\n\n");
}

/* We add this destructor as a canary to detect cmd failing. */
static void destroy_command_canary(struct command *cmd, bool *failed)
{
Expand Down
Loading