diff --git a/.gitignore b/.gitignore index 9bb4877..4140b23 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ *~ *.exe +_vagrant/.vagrant +.config.conf diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 1c03fcf..0000000 --- a/.travis.yml +++ /dev/null @@ -1,11 +0,0 @@ -language: go - -go: - - 1.3.1 - -before_script: - - source /etc/lsb-release && echo "deb http://download.rethinkdb.com/apt $DISTRIB_CODENAME main" | sudo tee /etc/apt/sources.list.d/rethinkdb.list - - wget -qO- http://download.rethinkdb.com/apt/pubkey.gpg | sudo apt-key add - - - sudo apt-get update - - sudo apt-get install rethinkdb - - rethinkdb --bind all & diff --git a/README.md b/README.md index 923460f..fa5f6f7 100644 --- a/README.md +++ b/README.md @@ -56,5 +56,5 @@ version=v0 ## Build status: - - `master` - [![Build Status](https://magnum.travis-ci.com/lavab/api.svg?token=kJbppXeTxzqpCVvt4t5X&branch=master)](https://magnum.travis-ci.com/lavab/api) - - `develop` - [![Build Status](https://magnum.travis-ci.com/lavab/api.svg?token=kJbppXeTxzqpCVvt4t5X&branch=develop)](https://magnum.travis-ci.com/lavab/api) \ No newline at end of file + - `master` - [![Circle CI](https://circleci.com/gh/lavab/api/tree/master.svg?style=svg&circle-token=4a52d619a03d0249906195d6447ceb60a475c0c5)](https://circleci.com/gh/lavab/api/tree/master) + - `develop` - [![Circle CI](https://circleci.com/gh/lavab/api/tree/develop.svg?style=svg&circle-token=4a52d619a03d0249906195d6447ceb60a475c0c5)](https://circleci.com/gh/lavab/api/tree/develop) diff --git a/_research/vote_storage/benchmark_test.go b/_research/vote_storage/benchmark_test.go new file mode 100644 index 0000000..adab145 --- /dev/null +++ b/_research/vote_storage/benchmark_test.go @@ -0,0 +1,115 @@ +package vote_storage_test + +import ( + "math/rand" + "testing" + + "github.com/dancannon/gorethink" + "github.com/dchest/uniuri" +) + +var ( + session *gorethink.Session + key2find string + table2search string +) + +func init() { + var err error + session, err = gorethink.Connect(gorethink.ConnectOpts{ + Address: "127.0.0.1:28015", + }) + if err != nil { + panic(err) + } + + key2find = uniuri.New() + + // Create a new table + gorethink.Db("test").TableDrop("benchmark_keys_list").Run(session) + gorethink.Db("test").TableCreate("benchmark_keys_list").Run(session) + + var klist []*KeysList + + // Populate with sample data + for n := 0; n < 300; n++ { + keys := rndStringSlice(999) + keys = randomlyInsert(keys, key2find) + + y := uniuri.New() + if n == 153 { + table2search = y + } + + klist = append(klist, &KeysList{ + ID: y, + Voted: keys, + }) + } + + gorethink.Db("test").Table("benchmark_keys_list").Insert(klist).Run(session) +} + +func rndStringSlice(count int) []string { + var r []string + for i := 0; i < count; i++ { + r = append(r, uniuri.New()) + } + return r +} + +func randomlyInsert(s []string, x string) []string { + i := rand.Intn(len(s) - 1) + + s = append(s[:i], append([]string{x}, s[i:]...)...) + + return s +} + +type KeysList struct { + ID string `gorethink:"id"` + Voted []string `gorethink:"voted"` +} + +func BenchmarkContains(b *testing.B) { + for n := 0; n < b.N; n++ { + contains, err := gorethink.Db("test").Table("benchmark_keys_list").Get(table2search).Field("voted").Contains(key2find).Run(session) + if err != nil { + b.Log(err) + b.Fail() + } + + var res bool + err = contains.One(&res) + if err != nil { + b.Log(err) + b.Fail() + } + if !res { + b.Log("invalid response") + b.Fail() + } + } +} + +func BenchmarkAppend(b *testing.B) { + for n := 0; n < b.N; n++ { + _, err := gorethink.Db("test").Table("benchmark_keys_list").Get(table2search).Field("voted").Append(uniuri.New()).Run(session) + if err != nil { + b.Log(err) + b.Fail() + } + } +} + +func BenchmarkDelete(b *testing.B) { + for n := 0; n < b.N; n++ { + _, err := gorethink.Db("test").Table("benchmark_keys_list").Get(table2search).Field("voted").DeleteAt( + gorethink.Expr(gorethink.Db("test").Table("benchmark_keys_list").Get(table2search).Field("voted").IndexesOf(key2find).AtIndex(0)), + ).Run(session) + if err != nil { + b.Log(err) + b.Fail() + } + } +} diff --git a/_research/ws_client/index.html b/_research/ws_client/index.html new file mode 100644 index 0000000..f0c88d4 --- /dev/null +++ b/_research/ws_client/index.html @@ -0,0 +1,40 @@ + + + + + lavab client + + + + +
+
+ + + + + + + diff --git a/_vagrant/Vagrantfile b/_vagrant/Vagrantfile new file mode 100644 index 0000000..15c639f --- /dev/null +++ b/_vagrant/Vagrantfile @@ -0,0 +1,58 @@ +# -*- mode: ruby -*- +# vi: set ft=ruby : + +# Vagrantfile API/syntax version. Don't touch unless you know what you're doing! +VAGRANTFILE_API_VERSION = "2" + +Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| + + #config.vm.define "redisthink" do |rethinkdb| + #rethinkdb.vm.box = "ubuntu/trusty64" + + # rethinkdb + #rethinkdb.vm.network "forwarded_port", guest: 8080, host: 8080 + #rethinkdb.vm.network "forwarded_port", guest: 28015, host: 28015 + #rethinkdb.vm.network "forwarded_port", guest: 29015, host: 29015 + + # redis + #rethinkdb.vm.network "forwarded_port", guest: 6379, host: 6379 + + #rethinkdb.vm.provider "virtualbox" do |v| + #v.memory = 2048 + #v.cpus = 4 + #end + + # load ansible playbook + #rethinkdb.vm.provision "shell", path: "deploy.sh" + #end + + config.vm.define "docker" do |docker| + docker.vm.box = "ubuntu/trusty64" + + docker.vm.network "forwarded_port", guest: 4222, host: 4222 + docker.vm.network "forwarded_port", guest: 8333, host: 8333 + docker.vm.network "forwarded_port", guest: 6379, host: 6379 + docker.vm.network "forwarded_port", guest: 8080, host: 8080 + docker.vm.network "forwarded_port", guest: 28015, host: 28015 + docker.vm.network "forwarded_port", guest: 29015, host: 29015 + + docker.vm.provider "virtualbox" do |v| + v.customize ["modifyvm", :id, "--natdnshostresolver1", "on"] + v.customize ["modifyvm", :id, "--natdnsproxy1", "on"] + end + + docker.vm.provision "docker" do |d| + d.pull_images "apcera/gnatsd" + d.run "apcera/gnatsd", + args: "--name gnatsd -p 4222:4222 -p 8333:8333" + + d.pull_images "dockerfile/rethinkdb" + d.run "dockerfile/rethinkdb", + args: "--name rethinkdb -p 8080:8080 -p 28015:28015 -p 29015:29015" + + d.pull_images "dockerfile/redis" + d.run "dockerfile/redis", + args: "--name redis -p 6379:6379" + end + end +end diff --git a/_vagrant/deploy.sh b/_vagrant/deploy.sh new file mode 100644 index 0000000..95804d3 --- /dev/null +++ b/_vagrant/deploy.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash + +export DEBIAN_FRONTEND=noninteractive + +apt-get update -qq +apt-get install -y python2.7 python-pip + +pip install ansible + +mkdir /etc/ansible +echo "localhost" > /etc/ansible/hosts + +cd /vagrant +ansible-playbook -vvvv playbook.yml diff --git a/_vagrant/playbook.yml b/_vagrant/playbook.yml new file mode 100644 index 0000000..55e9e82 --- /dev/null +++ b/_vagrant/playbook.yml @@ -0,0 +1,46 @@ +--- +- hosts: 127.0.0.1 + connection: local + sudo: true + tasks: + # install rethinkdb + - name: add rethinkdb sources + shell: . /etc/lsb-release && echo "deb http://download.rethinkdb.com/apt $DISTRIB_CODENAME main" | tee /etc/apt/sources.list.d/rethinkdb.list + - name: add rethinkdb key + shell: wget -qO- http://download.rethinkdb.com/apt/pubkey.gpg | apt-key add - + - name: update apt cache + apt: update_cache=yes + - name: install rethinkdb + apt: pkg=rethinkdb state=present + # configure rethinkdb + - name: ensure group is present + group: name=rethinkdb state=present + - name: ensure user is present + user: name=rethinkdb state=present + - name: copy master config file + copy: src=./rethinkdb.conf dest=/etc/rethinkdb/instances.d/rethinkdb.conf owner=rethinkdb group=rethinkdb mode=664 + - name: start rethinkdb + service: name=rethinkdb state=restarted + # install redis + - name: prepare a directory + shell: mkdir /tmp/redis-stable + - name: download redis + get_url: url=http://download.redis.io/redis-stable.tar.gz dest=/tmp/redis-stable.tar.gz + - name: unpack redis + unarchive: src=/tmp/redis-stable.tar.gz dest=/tmp/redis-stable copy=no + - name: make redis + shell: 'cd /tmp/redis-stable/redis-stable && make && make install' + - name: ensure group is present + group: name=redis state=present + - name: ensure user is present + user: name=redis state=present + - name: prepare config and storage directories for redis + shell: mkdir /etc/redis; mkdir /var/redis; sudo mkdir /var/redis/6379 + - name: prepare the config and an init script for redis + shell: sudo cp /tmp/redis-stable/redis-stable/utils/redis_init_script /etc/init.d/redis_6379 + - name: upload the config to the vm + copy: src=./redis.conf dest=/etc/redis/6379.conf owner=redis group=redis mode=664 + - name: put the prepared config on the server + shell: sudo update-rc.d redis_6379 defaults + - name: start redis_6379 + service: name=redis_6379 state=restarted diff --git a/_vagrant/redis.conf b/_vagrant/redis.conf new file mode 100644 index 0000000..d176fe0 --- /dev/null +++ b/_vagrant/redis.conf @@ -0,0 +1,827 @@ +# Redis configuration file example + +# Note on units: when memory size is needed, it is possible to specify +# it in the usual form of 1k 5GB 4M and so forth: +# +# 1k => 1000 bytes +# 1kb => 1024 bytes +# 1m => 1000000 bytes +# 1mb => 1024*1024 bytes +# 1g => 1000000000 bytes +# 1gb => 1024*1024*1024 bytes +# +# units are case insensitive so 1GB 1Gb 1gB are all the same. + +################################## INCLUDES ################################### + +# Include one or more other config files here. This is useful if you +# have a standard template that goes to all Redis servers but also need +# to customize a few per-server settings. Include files can include +# other files, so use this wisely. +# +# Notice option "include" won't be rewritten by command "CONFIG REWRITE" +# from admin or Redis Sentinel. Since Redis always uses the last processed +# line as value of a configuration directive, you'd better put includes +# at the beginning of this file to avoid overwriting config change at runtime. +# +# If instead you are interested in using includes to override configuration +# options, it is better to use include as the last line. +# +# include /path/to/local.conf +# include /path/to/other.conf + +################################ GENERAL ##################################### + +# By default Redis does not run as a daemon. Use 'yes' if you need it. +# Note that Redis will write a pid file in /var/run/redis.pid when daemonized. +daemonize yes + +# When running daemonized, Redis writes a pid file in /var/run/redis.pid by +# default. You can specify a custom pid file location here. +pidfile /var/run/redis_6379.pid + +# Accept connections on the specified port, default is 6379. +# If port 0 is specified Redis will not listen on a TCP socket. +port 6379 + +# TCP listen() backlog. +# +# In high requests-per-second environments you need an high backlog in order +# to avoid slow clients connections issues. Note that the Linux kernel +# will silently truncate it to the value of /proc/sys/net/core/somaxconn so +# make sure to raise both the value of somaxconn and tcp_max_syn_backlog +# in order to get the desired effect. +tcp-backlog 511 + +# By default Redis listens for connections from all the network interfaces +# available on the server. It is possible to listen to just one or multiple +# interfaces using the "bind" configuration directive, followed by one or +# more IP addresses. +# +# Examples: +# +# bind 192.168.1.100 10.0.0.1 +# bind 127.0.0.1 + +# Specify the path for the Unix socket that will be used to listen for +# incoming connections. There is no default, so Redis will not listen +# on a unix socket when not specified. +# +# unixsocket /tmp/redis.sock +# unixsocketperm 700 + +# Close the connection after a client is idle for N seconds (0 to disable) +timeout 0 + +# TCP keepalive. +# +# If non-zero, use SO_KEEPALIVE to send TCP ACKs to clients in absence +# of communication. This is useful for two reasons: +# +# 1) Detect dead peers. +# 2) Take the connection alive from the point of view of network +# equipment in the middle. +# +# On Linux, the specified value (in seconds) is the period used to send ACKs. +# Note that to close the connection the double of the time is needed. +# On other kernels the period depends on the kernel configuration. +# +# A reasonable value for this option is 60 seconds. +tcp-keepalive 0 + +# Specify the server verbosity level. +# This can be one of: +# debug (a lot of information, useful for development/testing) +# verbose (many rarely useful info, but not a mess like the debug level) +# notice (moderately verbose, what you want in production probably) +# warning (only very important / critical messages are logged) +loglevel notice + +# Specify the log file name. Also the empty string can be used to force +# Redis to log on the standard output. Note that if you use standard +# output for logging but daemonize, logs will be sent to /dev/null +logfile /var/log/redis_6379.log + +# To enable logging to the system logger, just set 'syslog-enabled' to yes, +# and optionally update the other syslog parameters to suit your needs. +# syslog-enabled no + +# Specify the syslog identity. +# syslog-ident redis + +# Specify the syslog facility. Must be USER or between LOCAL0-LOCAL7. +# syslog-facility local0 + +# Set the number of databases. The default database is DB 0, you can select +# a different one on a per-connection basis using SELECT where +# dbid is a number between 0 and 'databases'-1 +databases 16 + +################################ SNAPSHOTTING ################################ +# +# Save the DB on disk: +# +# save +# +# Will save the DB if both the given number of seconds and the given +# number of write operations against the DB occurred. +# +# In the example below the behaviour will be to save: +# after 900 sec (15 min) if at least 1 key changed +# after 300 sec (5 min) if at least 10 keys changed +# after 60 sec if at least 10000 keys changed +# +# Note: you can disable saving completely by commenting out all "save" lines. +# +# It is also possible to remove all the previously configured save +# points by adding a save directive with a single empty string argument +# like in the following example: +# +# save "" + +save 900 1 +save 300 10 +save 60 10000 + +# By default Redis will stop accepting writes if RDB snapshots are enabled +# (at least one save point) and the latest background save failed. +# This will make the user aware (in a hard way) that data is not persisting +# on disk properly, otherwise chances are that no one will notice and some +# disaster will happen. +# +# If the background saving process will start working again Redis will +# automatically allow writes again. +# +# However if you have setup your proper monitoring of the Redis server +# and persistence, you may want to disable this feature so that Redis will +# continue to work as usual even if there are problems with disk, +# permissions, and so forth. +stop-writes-on-bgsave-error yes + +# Compress string objects using LZF when dump .rdb databases? +# For default that's set to 'yes' as it's almost always a win. +# If you want to save some CPU in the saving child set it to 'no' but +# the dataset will likely be bigger if you have compressible values or keys. +rdbcompression yes + +# Since version 5 of RDB a CRC64 checksum is placed at the end of the file. +# This makes the format more resistant to corruption but there is a performance +# hit to pay (around 10%) when saving and loading RDB files, so you can disable it +# for maximum performances. +# +# RDB files created with checksum disabled have a checksum of zero that will +# tell the loading code to skip the check. +rdbchecksum yes + +# The filename where to dump the DB +dbfilename dump.rdb + +# The working directory. +# +# The DB will be written inside this directory, with the filename specified +# above using the 'dbfilename' configuration directive. +# +# The Append Only File will also be created inside this directory. +# +# Note that you must specify a directory here, not a file name. +dir /var/redis/6379 + +################################# REPLICATION ################################# + +# Master-Slave replication. Use slaveof to make a Redis instance a copy of +# another Redis server. A few things to understand ASAP about Redis replication. +# +# 1) Redis replication is asynchronous, but you can configure a master to +# stop accepting writes if it appears to be not connected with at least +# a given number of slaves. +# 2) Redis slaves are able to perform a partial resynchronization with the +# master if the replication link is lost for a relatively small amount of +# time. You may want to configure the replication backlog size (see the next +# sections of this file) with a sensible value depending on your needs. +# 3) Replication is automatic and does not need user intervention. After a +# network partition slaves automatically try to reconnect to masters +# and resynchronize with them. +# +# slaveof + +# If the master is password protected (using the "requirepass" configuration +# directive below) it is possible to tell the slave to authenticate before +# starting the replication synchronization process, otherwise the master will +# refuse the slave request. +# +# masterauth + +# When a slave loses its connection with the master, or when the replication +# is still in progress, the slave can act in two different ways: +# +# 1) if slave-serve-stale-data is set to 'yes' (the default) the slave will +# still reply to client requests, possibly with out of date data, or the +# data set may just be empty if this is the first synchronization. +# +# 2) if slave-serve-stale-data is set to 'no' the slave will reply with +# an error "SYNC with master in progress" to all the kind of commands +# but to INFO and SLAVEOF. +# +slave-serve-stale-data yes + +# You can configure a slave instance to accept writes or not. Writing against +# a slave instance may be useful to store some ephemeral data (because data +# written on a slave will be easily deleted after resync with the master) but +# may also cause problems if clients are writing to it because of a +# misconfiguration. +# +# Since Redis 2.6 by default slaves are read-only. +# +# Note: read only slaves are not designed to be exposed to untrusted clients +# on the internet. It's just a protection layer against misuse of the instance. +# Still a read only slave exports by default all the administrative commands +# such as CONFIG, DEBUG, and so forth. To a limited extent you can improve +# security of read only slaves using 'rename-command' to shadow all the +# administrative / dangerous commands. +slave-read-only yes + +# Replication SYNC strategy: disk or socket. +# +# ------------------------------------------------------- +# WARNING: DISKLESS REPLICATION IS EXPERIMENTAL CURRENTLY +# ------------------------------------------------------- +# +# New slaves and reconnecting slaves that are not able to continue the replication +# process just receiving differences, need to do what is called a "full +# synchronization". An RDB file is transmitted from the master to the slaves. +# The transmission can happen in two different ways: +# +# 1) Disk-backed: The Redis master creates a new process that writes the RDB +# file on disk. Later the file is transferred by the parent +# process to the slaves incrementally. +# 2) Diskless: The Redis master creates a new process that directly writes the +# RDB file to slave sockets, without touching the disk at all. +# +# With disk-backed replication, while the RDB file is generated, more slaves +# can be queued and served with the RDB file as soon as the current child producing +# the RDB file finishes its work. With diskless replication instead once +# the transfer starts, new slaves arriving will be queued and a new transfer +# will start when the current one terminates. +# +# When diskless replication is used, the master waits a configurable amount of +# time (in seconds) before starting the transfer in the hope that multiple slaves +# will arrive and the transfer can be parallelized. +# +# With slow disks and fast (large bandwidth) networks, diskless replication +# works better. +repl-diskless-sync no + +# When diskless replication is enabled, it is possible to configure the delay +# the server waits in order to spawn the child that trnasfers the RDB via socket +# to the slaves. +# +# This is important since once the transfer starts, it is not possible to serve +# new slaves arriving, that will be queued for the next RDB transfer, so the server +# waits a delay in order to let more slaves arrive. +# +# The delay is specified in seconds, and by default is 5 seconds. To disable +# it entirely just set it to 0 seconds and the transfer will start ASAP. +repl-diskless-sync-delay 5 + +# Slaves send PINGs to server in a predefined interval. It's possible to change +# this interval with the repl_ping_slave_period option. The default value is 10 +# seconds. +# +# repl-ping-slave-period 10 + +# The following option sets the replication timeout for: +# +# 1) Bulk transfer I/O during SYNC, from the point of view of slave. +# 2) Master timeout from the point of view of slaves (data, pings). +# 3) Slave timeout from the point of view of masters (REPLCONF ACK pings). +# +# It is important to make sure that this value is greater than the value +# specified for repl-ping-slave-period otherwise a timeout will be detected +# every time there is low traffic between the master and the slave. +# +# repl-timeout 60 + +# Disable TCP_NODELAY on the slave socket after SYNC? +# +# If you select "yes" Redis will use a smaller number of TCP packets and +# less bandwidth to send data to slaves. But this can add a delay for +# the data to appear on the slave side, up to 40 milliseconds with +# Linux kernels using a default configuration. +# +# If you select "no" the delay for data to appear on the slave side will +# be reduced but more bandwidth will be used for replication. +# +# By default we optimize for low latency, but in very high traffic conditions +# or when the master and slaves are many hops away, turning this to "yes" may +# be a good idea. +repl-disable-tcp-nodelay no + +# Set the replication backlog size. The backlog is a buffer that accumulates +# slave data when slaves are disconnected for some time, so that when a slave +# wants to reconnect again, often a full resync is not needed, but a partial +# resync is enough, just passing the portion of data the slave missed while +# disconnected. +# +# The bigger the replication backlog, the longer the time the slave can be +# disconnected and later be able to perform a partial resynchronization. +# +# The backlog is only allocated once there is at least a slave connected. +# +# repl-backlog-size 1mb + +# After a master has no longer connected slaves for some time, the backlog +# will be freed. The following option configures the amount of seconds that +# need to elapse, starting from the time the last slave disconnected, for +# the backlog buffer to be freed. +# +# A value of 0 means to never release the backlog. +# +# repl-backlog-ttl 3600 + +# The slave priority is an integer number published by Redis in the INFO output. +# It is used by Redis Sentinel in order to select a slave to promote into a +# master if the master is no longer working correctly. +# +# A slave with a low priority number is considered better for promotion, so +# for instance if there are three slaves with priority 10, 100, 25 Sentinel will +# pick the one with priority 10, that is the lowest. +# +# However a special priority of 0 marks the slave as not able to perform the +# role of master, so a slave with priority of 0 will never be selected by +# Redis Sentinel for promotion. +# +# By default the priority is 100. +slave-priority 100 + +# It is possible for a master to stop accepting writes if there are less than +# N slaves connected, having a lag less or equal than M seconds. +# +# The N slaves need to be in "online" state. +# +# The lag in seconds, that must be <= the specified value, is calculated from +# the last ping received from the slave, that is usually sent every second. +# +# This option does not GUARANTEE that N replicas will accept the write, but +# will limit the window of exposure for lost writes in case not enough slaves +# are available, to the specified number of seconds. +# +# For example to require at least 3 slaves with a lag <= 10 seconds use: +# +# min-slaves-to-write 3 +# min-slaves-max-lag 10 +# +# Setting one or the other to 0 disables the feature. +# +# By default min-slaves-to-write is set to 0 (feature disabled) and +# min-slaves-max-lag is set to 10. + +################################## SECURITY ################################### + +# Require clients to issue AUTH before processing any other +# commands. This might be useful in environments in which you do not trust +# others with access to the host running redis-server. +# +# This should stay commented out for backward compatibility and because most +# people do not need auth (e.g. they run their own servers). +# +# Warning: since Redis is pretty fast an outside user can try up to +# 150k passwords per second against a good box. This means that you should +# use a very strong password otherwise it will be very easy to break. +# +# requirepass foobared + +# Command renaming. +# +# It is possible to change the name of dangerous commands in a shared +# environment. For instance the CONFIG command may be renamed into something +# hard to guess so that it will still be available for internal-use tools +# but not available for general clients. +# +# Example: +# +# rename-command CONFIG b840fc02d524045429941cc15f59e41cb7be6c52 +# +# It is also possible to completely kill a command by renaming it into +# an empty string: +# +# rename-command CONFIG "" +# +# Please note that changing the name of commands that are logged into the +# AOF file or transmitted to slaves may cause problems. + +################################### LIMITS #################################### + +# Set the max number of connected clients at the same time. By default +# this limit is set to 10000 clients, however if the Redis server is not +# able to configure the process file limit to allow for the specified limit +# the max number of allowed clients is set to the current file limit +# minus 32 (as Redis reserves a few file descriptors for internal uses). +# +# Once the limit is reached Redis will close all the new connections sending +# an error 'max number of clients reached'. +# +# maxclients 10000 + +# Don't use more memory than the specified amount of bytes. +# When the memory limit is reached Redis will try to remove keys +# according to the eviction policy selected (see maxmemory-policy). +# +# If Redis can't remove keys according to the policy, or if the policy is +# set to 'noeviction', Redis will start to reply with errors to commands +# that would use more memory, like SET, LPUSH, and so on, and will continue +# to reply to read-only commands like GET. +# +# This option is usually useful when using Redis as an LRU cache, or to set +# a hard memory limit for an instance (using the 'noeviction' policy). +# +# WARNING: If you have slaves attached to an instance with maxmemory on, +# the size of the output buffers needed to feed the slaves are subtracted +# from the used memory count, so that network problems / resyncs will +# not trigger a loop where keys are evicted, and in turn the output +# buffer of slaves is full with DELs of keys evicted triggering the deletion +# of more keys, and so forth until the database is completely emptied. +# +# In short... if you have slaves attached it is suggested that you set a lower +# limit for maxmemory so that there is some free RAM on the system for slave +# output buffers (but this is not needed if the policy is 'noeviction'). +# +# maxmemory + +# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory +# is reached. You can select among five behaviors: +# +# volatile-lru -> remove the key with an expire set using an LRU algorithm +# allkeys-lru -> remove any key according to the LRU algorithm +# volatile-random -> remove a random key with an expire set +# allkeys-random -> remove a random key, any key +# volatile-ttl -> remove the key with the nearest expire time (minor TTL) +# noeviction -> don't expire at all, just return an error on write operations +# +# Note: with any of the above policies, Redis will return an error on write +# operations, when there are no suitable keys for eviction. +# +# At the date of writing these commands are: set setnx setex append +# incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd +# sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby +# zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby +# getset mset msetnx exec sort +# +# The default is: +# +# maxmemory-policy volatile-lru + +# LRU and minimal TTL algorithms are not precise algorithms but approximated +# algorithms (in order to save memory), so you can select as well the sample +# size to check. For instance for default Redis will check three keys and +# pick the one that was used less recently, you can change the sample size +# using the following configuration directive. +# +# maxmemory-samples 3 + +############################## APPEND ONLY MODE ############################### + +# By default Redis asynchronously dumps the dataset on disk. This mode is +# good enough in many applications, but an issue with the Redis process or +# a power outage may result into a few minutes of writes lost (depending on +# the configured save points). +# +# The Append Only File is an alternative persistence mode that provides +# much better durability. For instance using the default data fsync policy +# (see later in the config file) Redis can lose just one second of writes in a +# dramatic event like a server power outage, or a single write if something +# wrong with the Redis process itself happens, but the operating system is +# still running correctly. +# +# AOF and RDB persistence can be enabled at the same time without problems. +# If the AOF is enabled on startup Redis will load the AOF, that is the file +# with the better durability guarantees. +# +# Please check http://redis.io/topics/persistence for more information. + +appendonly no + +# The name of the append only file (default: "appendonly.aof") + +appendfilename "appendonly.aof" + +# The fsync() call tells the Operating System to actually write data on disk +# instead of waiting for more data in the output buffer. Some OS will really flush +# data on disk, some other OS will just try to do it ASAP. +# +# Redis supports three different modes: +# +# no: don't fsync, just let the OS flush the data when it wants. Faster. +# always: fsync after every write to the append only log. Slow, Safest. +# everysec: fsync only one time every second. Compromise. +# +# The default is "everysec", as that's usually the right compromise between +# speed and data safety. It's up to you to understand if you can relax this to +# "no" that will let the operating system flush the output buffer when +# it wants, for better performances (but if you can live with the idea of +# some data loss consider the default persistence mode that's snapshotting), +# or on the contrary, use "always" that's very slow but a bit safer than +# everysec. +# +# More details please check the following article: +# http://antirez.com/post/redis-persistence-demystified.html +# +# If unsure, use "everysec". + +# appendfsync always +appendfsync everysec +# appendfsync no + +# When the AOF fsync policy is set to always or everysec, and a background +# saving process (a background save or AOF log background rewriting) is +# performing a lot of I/O against the disk, in some Linux configurations +# Redis may block too long on the fsync() call. Note that there is no fix for +# this currently, as even performing fsync in a different thread will block +# our synchronous write(2) call. +# +# In order to mitigate this problem it's possible to use the following option +# that will prevent fsync() from being called in the main process while a +# BGSAVE or BGREWRITEAOF is in progress. +# +# This means that while another child is saving, the durability of Redis is +# the same as "appendfsync none". In practical terms, this means that it is +# possible to lose up to 30 seconds of log in the worst scenario (with the +# default Linux settings). +# +# If you have latency problems turn this to "yes". Otherwise leave it as +# "no" that is the safest pick from the point of view of durability. + +no-appendfsync-on-rewrite no + +# Automatic rewrite of the append only file. +# Redis is able to automatically rewrite the log file implicitly calling +# BGREWRITEAOF when the AOF log size grows by the specified percentage. +# +# This is how it works: Redis remembers the size of the AOF file after the +# latest rewrite (if no rewrite has happened since the restart, the size of +# the AOF at startup is used). +# +# This base size is compared to the current size. If the current size is +# bigger than the specified percentage, the rewrite is triggered. Also +# you need to specify a minimal size for the AOF file to be rewritten, this +# is useful to avoid rewriting the AOF file even if the percentage increase +# is reached but it is still pretty small. +# +# Specify a percentage of zero in order to disable the automatic AOF +# rewrite feature. + +auto-aof-rewrite-percentage 100 +auto-aof-rewrite-min-size 64mb + +# An AOF file may be found to be truncated at the end during the Redis +# startup process, when the AOF data gets loaded back into memory. +# This may happen when the system where Redis is running +# crashes, especially when an ext4 filesystem is mounted without the +# data=ordered option (however this can't happen when Redis itself +# crashes or aborts but the operating system still works correctly). +# +# Redis can either exit with an error when this happens, or load as much +# data as possible (the default now) and start if the AOF file is found +# to be truncated at the end. The following option controls this behavior. +# +# If aof-load-truncated is set to yes, a truncated AOF file is loaded and +# the Redis server starts emitting a log to inform the user of the event. +# Otherwise if the option is set to no, the server aborts with an error +# and refuses to start. When the option is set to no, the user requires +# to fix the AOF file using the "redis-check-aof" utility before to restart +# the server. +# +# Note that if the AOF file will be found to be corrupted in the middle +# the server will still exit with an error. This option only applies when +# Redis will try to read more data from the AOF file but not enough bytes +# will be found. +aof-load-truncated yes + +################################ LUA SCRIPTING ############################### + +# Max execution time of a Lua script in milliseconds. +# +# If the maximum execution time is reached Redis will log that a script is +# still in execution after the maximum allowed time and will start to +# reply to queries with an error. +# +# When a long running script exceeds the maximum execution time only the +# SCRIPT KILL and SHUTDOWN NOSAVE commands are available. The first can be +# used to stop a script that did not yet called write commands. The second +# is the only way to shut down the server in the case a write command was +# already issued by the script but the user doesn't want to wait for the natural +# termination of the script. +# +# Set it to 0 or a negative value for unlimited execution without warnings. +lua-time-limit 5000 + +################################## SLOW LOG ################################### + +# The Redis Slow Log is a system to log queries that exceeded a specified +# execution time. The execution time does not include the I/O operations +# like talking with the client, sending the reply and so forth, +# but just the time needed to actually execute the command (this is the only +# stage of command execution where the thread is blocked and can not serve +# other requests in the meantime). +# +# You can configure the slow log with two parameters: one tells Redis +# what is the execution time, in microseconds, to exceed in order for the +# command to get logged, and the other parameter is the length of the +# slow log. When a new command is logged the oldest one is removed from the +# queue of logged commands. + +# The following time is expressed in microseconds, so 1000000 is equivalent +# to one second. Note that a negative number disables the slow log, while +# a value of zero forces the logging of every command. +slowlog-log-slower-than 10000 + +# There is no limit to this length. Just be aware that it will consume memory. +# You can reclaim memory used by the slow log with SLOWLOG RESET. +slowlog-max-len 128 + +################################ LATENCY MONITOR ############################## + +# The Redis latency monitoring subsystem samples different operations +# at runtime in order to collect data related to possible sources of +# latency of a Redis instance. +# +# Via the LATENCY command this information is available to the user that can +# print graphs and obtain reports. +# +# The system only logs operations that were performed in a time equal or +# greater than the amount of milliseconds specified via the +# latency-monitor-threshold configuration directive. When its value is set +# to zero, the latency monitor is turned off. +# +# By default latency monitoring is disabled since it is mostly not needed +# if you don't have latency issues, and collecting data has a performance +# impact, that while very small, can be measured under big load. Latency +# monitoring can easily be enalbed at runtime using the command +# "CONFIG SET latency-monitor-threshold " if needed. +latency-monitor-threshold 0 + +############################# Event notification ############################## + +# Redis can notify Pub/Sub clients about events happening in the key space. +# This feature is documented at http://redis.io/topics/notifications +# +# For instance if keyspace events notification is enabled, and a client +# performs a DEL operation on key "foo" stored in the Database 0, two +# messages will be published via Pub/Sub: +# +# PUBLISH __keyspace@0__:foo del +# PUBLISH __keyevent@0__:del foo +# +# It is possible to select the events that Redis will notify among a set +# of classes. Every class is identified by a single character: +# +# K Keyspace events, published with __keyspace@__ prefix. +# E Keyevent events, published with __keyevent@__ prefix. +# g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ... +# $ String commands +# l List commands +# s Set commands +# h Hash commands +# z Sorted set commands +# x Expired events (events generated every time a key expires) +# e Evicted events (events generated when a key is evicted for maxmemory) +# A Alias for g$lshzxe, so that the "AKE" string means all the events. +# +# The "notify-keyspace-events" takes as argument a string that is composed +# of zero or multiple characters. The empty string means that notifications +# are disabled. +# +# Example: to enable list and generic events, from the point of view of the +# event name, use: +# +# notify-keyspace-events Elg +# +# Example 2: to get the stream of the expired keys subscribing to channel +# name __keyevent@0__:expired use: +# +# notify-keyspace-events Ex +# +# By default all notifications are disabled because most users don't need +# this feature and the feature has some overhead. Note that if you don't +# specify at least one of K or E, no events will be delivered. +notify-keyspace-events "" + +############################### ADVANCED CONFIG ############################### + +# Hashes are encoded using a memory efficient data structure when they have a +# small number of entries, and the biggest entry does not exceed a given +# threshold. These thresholds can be configured using the following directives. +hash-max-ziplist-entries 512 +hash-max-ziplist-value 64 + +# Similarly to hashes, small lists are also encoded in a special way in order +# to save a lot of space. The special representation is only used when +# you are under the following limits: +list-max-ziplist-entries 512 +list-max-ziplist-value 64 + +# Sets have a special encoding in just one case: when a set is composed +# of just strings that happen to be integers in radix 10 in the range +# of 64 bit signed integers. +# The following configuration setting sets the limit in the size of the +# set in order to use this special memory saving encoding. +set-max-intset-entries 512 + +# Similarly to hashes and lists, sorted sets are also specially encoded in +# order to save a lot of space. This encoding is only used when the length and +# elements of a sorted set are below the following limits: +zset-max-ziplist-entries 128 +zset-max-ziplist-value 64 + +# HyperLogLog sparse representation bytes limit. The limit includes the +# 16 bytes header. When an HyperLogLog using the sparse representation crosses +# this limit, it is converted into the dense representation. +# +# A value greater than 16000 is totally useless, since at that point the +# dense representation is more memory efficient. +# +# The suggested value is ~ 3000 in order to have the benefits of +# the space efficient encoding without slowing down too much PFADD, +# which is O(N) with the sparse encoding. The value can be raised to +# ~ 10000 when CPU is not a concern, but space is, and the data set is +# composed of many HyperLogLogs with cardinality in the 0 - 15000 range. +hll-sparse-max-bytes 3000 + +# Active rehashing uses 1 millisecond every 100 milliseconds of CPU time in +# order to help rehashing the main Redis hash table (the one mapping top-level +# keys to values). The hash table implementation Redis uses (see dict.c) +# performs a lazy rehashing: the more operation you run into a hash table +# that is rehashing, the more rehashing "steps" are performed, so if the +# server is idle the rehashing is never complete and some more memory is used +# by the hash table. +# +# The default is to use this millisecond 10 times every second in order to +# actively rehash the main dictionaries, freeing memory when possible. +# +# If unsure: +# use "activerehashing no" if you have hard latency requirements and it is +# not a good thing in your environment that Redis can reply from time to time +# to queries with 2 milliseconds delay. +# +# use "activerehashing yes" if you don't have such hard requirements but +# want to free memory asap when possible. +activerehashing yes + +# The client output buffer limits can be used to force disconnection of clients +# that are not reading data from the server fast enough for some reason (a +# common reason is that a Pub/Sub client can't consume messages as fast as the +# publisher can produce them). +# +# The limit can be set differently for the three different classes of clients: +# +# normal -> normal clients including MONITOR clients +# slave -> slave clients +# pubsub -> clients subscribed to at least one pubsub channel or pattern +# +# The syntax of every client-output-buffer-limit directive is the following: +# +# client-output-buffer-limit +# +# A client is immediately disconnected once the hard limit is reached, or if +# the soft limit is reached and remains reached for the specified number of +# seconds (continuously). +# So for instance if the hard limit is 32 megabytes and the soft limit is +# 16 megabytes / 10 seconds, the client will get disconnected immediately +# if the size of the output buffers reach 32 megabytes, but will also get +# disconnected if the client reaches 16 megabytes and continuously overcomes +# the limit for 10 seconds. +# +# By default normal clients are not limited because they don't receive data +# without asking (in a push way), but just after a request, so only +# asynchronous clients may create a scenario where data is requested faster +# than it can read. +# +# Instead there is a default limit for pubsub and slave clients, since +# subscribers and slaves receive data in a push fashion. +# +# Both the hard or the soft limit can be disabled by setting them to zero. +client-output-buffer-limit normal 0 0 0 +client-output-buffer-limit slave 256mb 64mb 60 +client-output-buffer-limit pubsub 32mb 8mb 60 + +# Redis calls an internal function to perform many background tasks, like +# closing connections of clients in timeout, purging expired keys that are +# never requested, and so forth. +# +# Not all tasks are performed with the same frequency, but Redis checks for +# tasks to perform according to the specified "hz" value. +# +# By default "hz" is set to 10. Raising the value will use more CPU when +# Redis is idle, but at the same time will make Redis more responsive when +# there are many keys expiring at the same time, and timeouts may be +# handled with more precision. +# +# The range is between 1 and 500, however a value over 100 is usually not +# a good idea. Most users should use the default of 10 and raise this up to +# 100 only in environments where very low latency is required. +hz 10 + +# When a child rewrites the AOF file, if the following option is enabled +# the file will be fsync-ed every 32 MB of data generated. This is useful +# in order to commit the file to the disk more incrementally and avoid +# big latency spikes. +aof-rewrite-incremental-fsync yes diff --git a/_vagrant/rethinkdb.conf b/_vagrant/rethinkdb.conf new file mode 100644 index 0000000..4e6cce2 --- /dev/null +++ b/_vagrant/rethinkdb.conf @@ -0,0 +1,68 @@ +# +# RethinkDB instance configuration sample +# +# - Give this file the extension .conf and put it in /etc/rethinkdb/instances.d in order to enable it. +# - See http://www.rethinkdb.com/docs/guides/startup/ for the complete documentation +# - Uncomment an option to change its value. +# + +############################### +## RethinkDB configuration +############################### + +### Process options + +## User and group used to run rethinkdb +## Command line default: do not change user or group +## Init script default: rethinkdb user and group +# runuser=rethinkdb +# rungroup=rethinkdb + +## Stash the pid in this file when the process is running +## Command line default: none +## Init script default: /var/run/rethinkdb//pid_file (where is the name of this config file without the extension) +# pid-file=/var/run/rethinkdb/rethinkdb.pid + +### File path options + +## Directory to store data and metadata +## Command line default: ./rethinkdb_data +## Init script default: /var/lib/rethinkdb// (where is the name of this file without the extension) +# directory=/var/lib/rethinkdb/default + +### Network options + +## Address of local interfaces to listen on when accepting connections +## May be 'all' or an IP address, loopback addresses are enabled by default +## Default: all local addresses +bind=all + +## The port for rethinkdb protocol for client drivers +## Default: 28015 + port-offset +# driver-port=28015 + +## The port for receiving connections from other nodes +## Default: 29015 + port-offset +# cluster-port=29015 + +## The host:port of a node that rethinkdb will connect to +## This option can be specified multiple times. +## Default: none +# join=example.com:29015 + +## All ports used locally will have this value added +## Default: 0 +# port-offset=0 + +### Web options + +## Port for the http admin console +## Default: 8080 + port-offset +# http-port=8080 + +### CPU options + +## The number of cores to use +## Default: total number of cores of the CPU +# cores=2 + diff --git a/cache/cache.go b/cache/cache.go new file mode 100644 index 0000000..a95ad01 --- /dev/null +++ b/cache/cache.go @@ -0,0 +1,13 @@ +package cache + +import "time" + +// Cache is the basic interface for cache implementations +type Cache interface { + Get(key string, pointer interface{}) error + Set(key string, value interface{}, expires time.Duration) error + Delete(key string) error + DeleteMask(mask string) error + DeleteMulti(keys ...interface{}) error + Exists(key string) (bool, error) +} diff --git a/cache/redis_cache.go b/cache/redis_cache.go new file mode 100644 index 0000000..d83e968 --- /dev/null +++ b/cache/redis_cache.go @@ -0,0 +1,160 @@ +package cache + +import ( + "bytes" + "encoding/gob" + "time" + + "github.com/garyburd/redigo/redis" +) + +// Scripts! +var ( + scriptDeleteMask = redis.NewScript(0, ` + for _, k in ipairs( redis.call( 'keys', ARGV[1] ) ) do + redis.call( 'del', k ) + end + `) +) + +// RedisCache is an implementation of Cache that uses Redis as a backend +type RedisCache struct { + pool *redis.Pool +} + +// RedisCacheOpts is used to pass options to NewRedisCache +type RedisCacheOpts struct { + Address string + Database int + Password string + MaxIdle int + IdleTimeout time.Duration +} + +// NewRedisCache creates a new cache with a redis backend +func NewRedisCache(options *RedisCacheOpts) (*RedisCache, error) { + // Default values + if options.MaxIdle == 0 { + options.MaxIdle = 3 + } + if options.IdleTimeout == 0 { + options.IdleTimeout = 240 * time.Second + } + + // Create a new redis pool + pool := &redis.Pool{ + MaxIdle: options.MaxIdle, + IdleTimeout: options.IdleTimeout, + Dial: func() (redis.Conn, error) { + c, err := redis.Dial("tcp", options.Address) + if err != nil { + return nil, err + } + + if options.Password != "" { + if _, err := c.Do("AUTH", options.Password); err != nil { + c.Close() + return nil, err + } + } + + if options.Database != 0 { + if _, err := c.Do("SELECT", options.Database); err != nil { + c.Close() + return nil, err + } + } + + return c, err + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err + }, + } + + // Test the pool + conn := pool.Get() + defer conn.Close() + if err := pool.TestOnBorrow(conn, time.Now()); err != nil { + return nil, err + } + + // Return a new cache struct + return &RedisCache{ + pool: pool, + }, nil +} + +// Get retrieves data from the database and then decodes it into the passed pointer. +func (r *RedisCache) Get(key string, pointer interface{}) error { + conn := r.pool.Get() + defer conn.Close() + + // Perform the get + data, err := redis.Bytes(conn.Do("GET", key)) + if err != nil { + return err + } + + // Initialize a new decoder + dec := gob.NewDecoder(bytes.NewReader(data)) + + // Decode it into pointer + return dec.Decode(pointer) +} + +// Set encodes passed value and sends it to redis +func (r *RedisCache) Set(key string, value interface{}, expires time.Duration) error { + conn := r.pool.Get() + defer conn.Close() + + // Initialize a new encoder + var buffer bytes.Buffer + enc := gob.NewEncoder(&buffer) + + // Encode the value + if err := enc.Encode(value); err != nil { + return err + } + + // Save it into redis + if expires == 0 { + _, err := conn.Do("SET", key, buffer.Bytes()) + return err + } + + _, err := conn.Do("SETEX", key, expires.Seconds(), buffer.Bytes()) + return err +} + +// Delete removes data in redis by key +func (r *RedisCache) Delete(key string) error { + conn := r.pool.Get() + defer conn.Close() + _, err := redis.Int(conn.Do("DEL", key)) + return err +} + +// DeleteMask removes data using KEYS masks +func (r *RedisCache) DeleteMask(mask string) error { + conn := r.pool.Get() + defer conn.Close() + _, err := scriptDeleteMask.Do(conn, mask) + return err +} + +// DeleteMulti removes multiple keys +func (r *RedisCache) DeleteMulti(keys ...interface{}) error { + conn := r.pool.Get() + defer conn.Close() + _, err := redis.Int(conn.Do("DEL", keys...)) + return err +} + +// Exists performs a check whether a key exists +func (r *RedisCache) Exists(key string) (bool, error) { + conn := r.pool.Get() + defer conn.Close() + return redis.Bool(conn.Do("EXISTS", key)) +} diff --git a/circle.yml b/circle.yml new file mode 100644 index 0000000..e0897b4 --- /dev/null +++ b/circle.yml @@ -0,0 +1,25 @@ +machine: + timezone: + Europe/Berlin + +dependencies: + pre: + - source /etc/lsb-release && echo "deb http://download.rethinkdb.com/apt $DISTRIB_CODENAME main" | sudo tee /etc/apt/sources.list.d/rethinkdb.list + - wget -qO- http://download.rethinkdb.com/apt/pubkey.gpg | sudo apt-key add - + - sudo apt-get update + - sudo apt-get install rethinkdb + - wget http://download.redis.io/releases/redis-2.8.18.tar.gz + - tar xvzf redis-2.8.18.tar.gz + - cd redis-2.8.18 && make + - go get github.com/apcera/gnatsd + post: + - rethinkdb --bind all: + background: true + - src/redis-server: + background: true + - gnatsd: + background: true + +test: + override: + - go test -v ./... diff --git a/db/default_crud.go b/db/default_crud.go index c3976c5..8cdfef5 100644 --- a/db/default_crud.go +++ b/db/default_crud.go @@ -35,6 +35,11 @@ func (d *Default) GetTable() gorethink.Term { return gorethink.Table(d.table) } +// GetSession returns the current session +func (d *Default) GetSession() *gorethink.Session { + return d.session +} + // Insert inserts a document into the database func (d *Default) Insert(data interface{}) error { _, err := d.GetTable().Insert(data).RunWrite(d.session) diff --git a/db/rethink_crud.go b/db/rethink_crud.go index 8c669ef..adaeaed 100644 --- a/db/rethink_crud.go +++ b/db/rethink_crud.go @@ -1,13 +1,15 @@ package db import ( - rethink "github.com/dancannon/gorethink" + "github.com/dancannon/gorethink" ) // RethinkTable contains the most basic table functions type RethinkTable interface { GetTableName() string GetDBName() string + GetTable() gorethink.Term + GetSession() *gorethink.Session } // RethinkCreator contains a function to create new instances in the table @@ -17,19 +19,19 @@ type RethinkCreator interface { // RethinkReader allows fetching resources from the database type RethinkReader interface { - Find(id string) (*rethink.Cursor, error) + Find(id string) (*gorethink.Cursor, error) FindFetchOne(id string, value interface{}) error - FindBy(key string, value interface{}) (*rethink.Cursor, error) + FindBy(key string, value interface{}) (*gorethink.Cursor, error) FindByAndFetch(key string, value interface{}, results interface{}) error FindByAndFetchOne(key string, value interface{}, result interface{}) error FindByAndCount(key string, value interface{}) (int, error) - Where(filter map[string]interface{}) (*rethink.Cursor, error) + Where(filter map[string]interface{}) (*gorethink.Cursor, error) WhereAndFetch(filter map[string]interface{}, results interface{}) error WhereAndFetchOne(filter map[string]interface{}, result interface{}) error - FindByIndex(index string, values ...interface{}) (*rethink.Cursor, error) + FindByIndex(index string, values ...interface{}) (*gorethink.Cursor, error) FindByIndexFetch(results interface{}, index string, values ...interface{}) error FindByIndexFetchOne(result interface{}, index string, values ...interface{}) error } diff --git a/db/table_emails.go b/db/table_emails.go new file mode 100644 index 0000000..4e0134e --- /dev/null +++ b/db/table_emails.go @@ -0,0 +1,104 @@ +package db + +import ( + "github.com/dancannon/gorethink" + + "github.com/lavab/api/models" +) + +// Emails implements the CRUD interface for tokens +type EmailsTable struct { + RethinkCRUD +} + +// GetEmail returns a token with specified name +func (e *EmailsTable) GetEmail(id string) (*models.Email, error) { + var result models.Email + + if err := e.FindFetchOne(id, &result); err != nil { + return nil, err + } + + return &result, nil +} + +// GetOwnedBy returns all emails owned by id +func (e *EmailsTable) GetOwnedBy(id string) ([]*models.Email, error) { + var result []*models.Email + + err := e.WhereAndFetch(map[string]interface{}{ + "owner": id, + }, &result) + if err != nil { + return nil, err + } + + return result, nil +} + +// DeleteOwnedBy deletes all emails owned by id +func (e *EmailsTable) DeleteOwnedBy(id string) error { + return e.Delete(map[string]interface{}{ + "owner": id, + }) +} + +func (e *EmailsTable) CountOwnedBy(id string) (int, error) { + return e.FindByAndCount("owner", id) +} + +func (e *EmailsTable) List( + owner string, + sort []string, + offset int, + limit int, +) ([]*models.Email, error) { + // Filter by owner's ID + term := e.GetTable().Filter(map[string]interface{}{ + "owner": owner, + }) + + // If sort array has contents, parse them and add to the term + if sort != nil && len(sort) > 0 { + var conds []interface{} + for _, cond := range sort { + if cond[0] == '-' { + conds = append(conds, gorethink.Desc(cond[1:])) + } else if cond[0] == '+' || cond[0] == ' ' { + conds = append(conds, gorethink.Asc(cond[1:])) + } else { + conds = append(conds, gorethink.Asc(cond)) + } + } + + term = term.OrderBy(conds...) + } + + // Slice the result in 3 cases + if offset != 0 && limit == 0 { + term = term.Skip(offset) + } + + if offset == 0 && limit != 0 { + term = term.Limit(limit) + } + + if offset != 0 && limit != 0 { + term = term.Slice(offset, offset+limit) + } + + // Run the query + cursor, err := term.Run(e.GetSession()) + if err != nil { + return nil, err + } + + // Fetch the cursor + var resp []*models.Email + err = cursor.All(&resp) + if err != nil { + return nil, err + } + + return resp, nil +} diff --git a/db/table_keys.go b/db/table_keys.go index 225a830..8679e87 100644 --- a/db/table_keys.go +++ b/db/table_keys.go @@ -8,10 +8,10 @@ type KeysTable struct { RethinkCRUD } -func (k *KeysTable) FindByName(name string) ([]*models.Key, error) { +func (k *KeysTable) FindByOwner(id string) ([]*models.Key, error) { var results []*models.Key - if err := k.FindByAndFetch("owner_name", name, &results); err != nil { + if err := k.FindByAndFetch("owner", id, &results); err != nil { return nil, err } diff --git a/db/table_tokens.go b/db/table_tokens.go index 8de409e..5703067 100644 --- a/db/table_tokens.go +++ b/db/table_tokens.go @@ -1,12 +1,96 @@ package db import ( + "time" + + "github.com/dancannon/gorethink" + + "github.com/lavab/api/cache" "github.com/lavab/api/models" ) // TokensTable implements the CRUD interface for tokens type TokensTable struct { RethinkCRUD + Cache cache.Cache + Expires time.Duration +} + +// Insert monkey-patches the DefaultCRUD method and introduces caching +func (t *TokensTable) Insert(data interface{}) error { + if err := t.RethinkCRUD.Insert(data); err != nil { + return err + } + + token, ok := data.(*models.Token) + if !ok { + return nil + } + + return t.Cache.Set(t.RethinkCRUD.GetTableName()+":"+token.ID, token, t.Expires) +} + +// Update clears all updated keys +func (t *TokensTable) Update(data interface{}) error { + if err := t.RethinkCRUD.Update(data); err != nil { + return err + } + + return t.Cache.DeleteMask(t.RethinkCRUD.GetTableName() + ":*") +} + +// UpdateID updates the specified token and updates cache +func (t *TokensTable) UpdateID(id string, data interface{}) error { + if err := t.RethinkCRUD.UpdateID(id, data); err != nil { + return err + } + + token, err := t.GetToken(id) + if err != nil { + return err + } + + return t.Cache.Set(t.RethinkCRUD.GetTableName()+":"+id, token, t.Expires) +} + +// Delete removes from db and cache using filter +func (t *TokensTable) Delete(cond interface{}) error { + result, err := t.GetTable().Filter(cond).Delete(gorethink.DeleteOpts{ + ReturnChanges: true, + }).RunWrite(t.GetSession()) + if err != nil { + return err + } + + var ids []interface{} + for _, change := range result.Changes { + ids = append(ids, t.RethinkCRUD.GetTableName()+":"+change.OldValue.(map[string]interface{})["id"].(string)) + } + + return t.Cache.DeleteMulti(ids...) +} + +// DeleteID removes from db and cache using id query +func (t *TokensTable) DeleteID(id string) error { + if err := t.RethinkCRUD.DeleteID(id); err != nil { + return err + } + + return t.Cache.Delete(t.RethinkCRUD.GetTableName() + ":" + id) +} + +// FindFetchOne tries cache and then tries using DefaultCRUD's fetch operation +func (t *TokensTable) FindFetchOne(id string, value interface{}) error { + if err := t.Cache.Get(id, value); err == nil { + return nil + } + + err := t.RethinkCRUD.FindFetchOne(id, value) + if err != nil { + return err + } + + return t.Cache.Set(t.RethinkCRUD.GetTableName()+":"+id, value, t.Expires) } // GetToken returns a token with specified name diff --git a/env/config.go b/env/config.go index 93ea52a..6b655df 100644 --- a/env/config.go +++ b/env/config.go @@ -11,7 +11,16 @@ type Flags struct { ClassicRegistration bool UsernameReservation bool - RethinkDBURL string + RedisAddress string + RedisDatabase int + RedisPassword string + + RethinkDBAddress string RethinkDBKey string RethinkDBDatabase string + + NATSAddress string + + YubiCloudID string + YubiCloudKey string } diff --git a/env/env.go b/env/env.go index 0a32a7c..ec7444a 100644 --- a/env/env.go +++ b/env/env.go @@ -2,9 +2,12 @@ package env import ( "github.com/Sirupsen/logrus" + "github.com/apcera/nats" "github.com/dancannon/gorethink" + "github.com/lavab/api/cache" "github.com/lavab/api/db" + "github.com/lavab/api/factor" ) var ( @@ -14,6 +17,8 @@ var ( Log *logrus.Logger // Rethink contains the RethinkDB session used in the API Rethink *gorethink.Session + // Cache is the global instance of the cache interface + Cache cache.Cache // Accounts is the global instance of AccountsTable Accounts *db.AccountsTable // Tokens is the global instance of TokensTable @@ -24,4 +29,10 @@ var ( Contacts *db.ContactsTable // Reservations is the global instance of ReservationsTable Reservations *db.ReservationsTable + // Emails is the global instance of EmailsTable + Emails *db.EmailsTable + // Factors contains all currently registered factors + Factors map[string]factor.Factor + // NATS is the encoded connection to the NATS queue + NATS *nats.EncodedConn ) diff --git a/factor/authenticator.go b/factor/authenticator.go new file mode 100644 index 0000000..ba747b3 --- /dev/null +++ b/factor/authenticator.go @@ -0,0 +1,38 @@ +package factor + +import ( + "github.com/gokyle/hotp" +) + +type Authenticator struct { + length int +} + +func NewAuthenticator(length int) *Authenticator { + return &Authenticator{ + length: length, + } +} + +func (a *Authenticator) Type() string { + return "authenticator" +} + +func (a *Authenticator) Request(data string) (string, error) { + otp, err := hotp.GenerateHOTP(a.length, false) + if err != nil { + return "", err + } + + return otp.URL(data), nil +} + +func (a *Authenticator) Verify(data []string, input string) (bool, error) { + // obviously broken + hotp, err := hotp.Unmarshal([]byte(data[0])) + if err != nil { + return false, err + } + + return hotp.Check(input), nil +} diff --git a/factor/method.go b/factor/method.go new file mode 100644 index 0000000..df869ed --- /dev/null +++ b/factor/method.go @@ -0,0 +1,7 @@ +package factor + +type Factor interface { + Type() string + Request(data string) (string, error) + Verify(data []string, input string) (bool, error) +} diff --git a/factor/yubicloud.go b/factor/yubicloud.go new file mode 100644 index 0000000..8bd0927 --- /dev/null +++ b/factor/yubicloud.go @@ -0,0 +1,58 @@ +package factor + +import "github.com/GeertJohan/yubigo" + +// YubiCloud is an implementation of Factor to authenticate with YubiCloud +type YubiCloud struct { + client *yubigo.YubiAuth +} + +// NewYubiCloud set ups a new Factor that supports authing using YubiCloud +func NewYubiCloud(id string, key string) (*YubiCloud, error) { + client, err := yubigo.NewYubiAuth(id, key) + if err != nil { + return nil, err + } + + return &YubiCloud{ + client: client, + }, nil +} + +// Type returns factor's type +func (y *YubiCloud) Type() string { + return "yubicloud" +} + +// Request does nothing in this driver +func (y *YubiCloud) Request(data string) (string, error) { + return "", nil +} + +// Verify checks if the token is valid +func (y *YubiCloud) Verify(data []string, input string) (bool, error) { + publicKey := input[:12] + + found := false + for _, prefix := range data { + if publicKey == prefix { + found = true + break + } + } + + if !found { + return false, nil + } + + _, ok, err := y.client.Verify(input) + if err != nil { + return false, err + } + + if !ok { + return false, nil + } + + return true, nil +} diff --git a/main.go b/main.go index db34c48..787ee23 100644 --- a/main.go +++ b/main.go @@ -18,17 +18,29 @@ import ( // https://github.com/unrolled/secure var ( + // Enable namsral/flag functionality + configFlag = flag.String("config", "", "config file to load") // General flags bindAddress = flag.String("bind", ":5000", "Network address used to bind") - apiVersion = flag.String("version", "v0", "Shown API version") + apiVersion = flag.String("api_version", "v0", "Shown API version") logFormatterType = flag.String("log", "text", "Log formatter type. Either \"json\" or \"text\"") forceColors = flag.Bool("force_colors", false, "Force colored prompt?") // Registration settings sessionDuration = flag.Int("session_duration", 72, "Session duration expressed in hours") classicRegistration = flag.Bool("classic_registration", false, "Classic registration enabled?") usernameReservation = flag.Bool("username_reservation", false, "Username reservation enabled?") + // Cache-related flags + redisAddress = flag.String("redis_address", func() string { + address := os.Getenv("REDIS_PORT_6379_TCP_ADDR") + if address == "" { + address = "127.0.0.1" + } + return address + ":6379" + }(), "Address of the redis server") + redisDatabase = flag.Int("redis_db", 0, "Index of redis database to use") + redisPassword = flag.String("redis_password", "", "Password of the redis server") // Database-related flags - rethinkdbURL = flag.String("rethinkdb_url", func() string { + rethinkdbAddress = flag.String("rethinkdb_address", func() string { address := os.Getenv("RETHINKDB_PORT_28015_TCP_ADDR") if address == "" { address = "127.0.0.1" @@ -43,6 +55,17 @@ var ( } return database }(), "Database name on the RethinkDB server") + // NATS address + natsAddress = flag.String("nats_address", func() string { + address := os.Getenv("NATS_PORT_4222_TCP_ADDR") + if address == "" { + address = "127.0.0.1" + } + return "nats://" + address + ":4222" + }(), "Address of the NATS server") + // YubiCloud params + yubiCloudID = flag.String("yubicloud_id", "", "YubiCloud API id") + yubiCloudKey = flag.String("yubicloud_key", "", "YubiCloud API key") ) func main() { @@ -60,9 +83,18 @@ func main() { ClassicRegistration: *classicRegistration, UsernameReservation: *usernameReservation, - RethinkDBURL: *rethinkdbURL, + RedisAddress: *redisAddress, + RedisDatabase: *redisDatabase, + RedisPassword: *redisPassword, + + RethinkDBAddress: *rethinkdbAddress, RethinkDBKey: *rethinkdbKey, RethinkDBDatabase: *rethinkdbDatabase, + + NATSAddress: *natsAddress, + + YubiCloudID: *yubiCloudID, + YubiCloudKey: *yubiCloudKey, } // Generate a mux diff --git a/models/account.go b/models/account.go index aa834f1..6e5f6f9 100644 --- a/models/account.go +++ b/models/account.go @@ -14,17 +14,11 @@ type Account struct { Billing BillingData `json:"billing" gorethink:"billing"` // Password is the password used to login to the account. - // It's hashed and salted using a cryptographically strong method (bcrypt|scrypt). + // It's hashed and salted using scrypt. Password string `json:"-" gorethink:"password"` - // PgpExpDate is an RFC3339-encoded string containing the expiry date of the user's public key - PgpExpDate string `json:"pgp_exp_date" gorethink:"pgp_exp_date"` - - // PgpFingerprint is a SHA-512 hash of the user's public key - PgpFingerprint string `json:"pgp_fingerprint" gorethink:"pgp_fingerprint"` - - // PgpPublicKey is a copy of the user's current public key. It can also be found in the 'keys' db. - PgpPublicKey string `json:"pgp_public_key" gorethink:"pgp_public_key"` + // PGPKey is the fingerprint of account's default key + PGPKey string `json:"pgp_key" gorethink:"pgp_key"` // Settings contains data needed to customize the user experience. // TODO Work in progress @@ -40,6 +34,9 @@ type Account struct { AltEmail string `json:"alt_email" gorethink:"alt_email"` + FactorType string `json:"-" gorethink:"factor_type"` + FactorValue []string `json:"-" gorethink:"factor_value"` + Status string `json:"status" gorethink:"status"` } diff --git a/models/base_encrypted.go b/models/base_encrypted.go index 44eec0b..0597cd1 100644 --- a/models/base_encrypted.go +++ b/models/base_encrypted.go @@ -5,8 +5,8 @@ type Encrypted struct { // Encoding tells the reader how to decode the data; can be "json", "protobuf", maybe more in the future Encoding string `json:"encoding" gorethink:"encoding"` - // PgpFingerprints contains the fingerprints of the PGP public keys used to encrypt the data. - PgpFingerprints []string `json:"pgp_fingerprints" gorethink:"pgp_fingerprints"` + // PGPFingerprints contains the fingerprints of the PGP public keys used to encrypt the data. + PGPFingerprints []string `json:"pgp_fingerprints" gorethink:"pgp_fingerprints"` // Data is the raw, PGP-encrypted data Data string `json:"raw" gorethink:"raw"` diff --git a/models/email.go b/models/email.go index 610570c..03bd9e3 100644 --- a/models/email.go +++ b/models/email.go @@ -5,6 +5,12 @@ package models type Email struct { Resource + // Kind of the email. Value is either sent or received. + Kind string `json:"kind" gorethink:"kind"` + + // Who is supposed to receive the email / what email received it. + To []string `json:"to" gorethink:"to"` + // AttachmentsIDs is a slice of the FileIDs associated with this email // For uploading attachments see `POST /upload` AttachmentIDs []string `json:"attachments" gorethink:"attachments"` @@ -23,4 +29,6 @@ type Email struct { // ThreadID ThreadID string `json:"thread_id" gorethink:"thread_id"` + + Status string `json:"status" gorethink:"status"` } diff --git a/models/key.go b/models/key.go index f0c6d86..b114ab8 100644 --- a/models/key.go +++ b/models/key.go @@ -1,19 +1,18 @@ package models type Key struct { - Resource - Expiring + Resource // ID is the fingerprint, Name is empty + Expiring // ExpiryDate is either empty or expiring, user can set it - // ID is the fingerprint + //Body []byte `json:"body" gorethink:"body"` // Raw key contents - Image []byte `json:"image" gorethink:"image"` - - // the actual key - Key string `json:"key" gorethink:"key"` - - OwnerName string `json:"owner_name" gorethink:"owner_name"` - - // the actual id - KeyID string `json:"key_id" gorethink:"key_id"` - KeyIDShort string `json:"key_id_short" gorethink:"key_id_short"` + Headers map[string]string `json:"headers" gorethink:"headers"` // Headers passed with the key + Email string `json:"email" gorethink:"email"` // Address associated with the key + Algorithm string `json:"algorithm" gorethink:"algorithm"` // Algorithm of the key + Length uint16 `json:"length" gorethink:"length"` // Length of the key + Key string `json:"key" gorethink:"key"` // Armor-encoded key + KeyID string `json:"key_id" gorethink:"key_id"` // PGP key ID + KeyIDShort string `json:"key_id_short" gorethink:"key_id_short"` // Shorter version of above + Reliability int `json:"reliability" gorethink:"reliability"` // Reliability algorithm cached result + MasterKey string `json:"master_key" gorethink:"mater_key"` // MasterKey's ID - no idea how it works } diff --git a/routes/accounts_test.go b/routes/accounts_test.go index 6b2f61c..240d34a 100644 --- a/routes/accounts_test.go +++ b/routes/accounts_test.go @@ -149,6 +149,45 @@ func TestAccountsCreateInvitedExisting(t *testing.T) { require.Equal(t, "Username already exists", response.Message) } +func TestAccountsCreateInvitedWeakPassword(t *testing.T) { + const ( + username = "jeremy" + password = "c0067d4af4e87f00dbac63b6156828237059172d1bbeac67427345d6a9fda484" + ) + + // Prepare a token + inviteToken := models.Token{ + Resource: models.MakeResource("", "test invite token"), + Type: "invite", + } + inviteToken.ExpireSoon() + + err := env.Tokens.Insert(inviteToken) + require.Nil(t, err) + + // POST /accounts - invited + result, err := goreq.Request{ + Method: "POST", + Uri: server.URL + "/accounts", + ContentType: "application/json", + Body: routes.AccountsCreateRequest{ + Username: username, + Password: password, + Token: inviteToken.ID, + }, + }.Do() + require.Nil(t, err) + + // Unmarshal the response + var response routes.AccountsCreateResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + // Check the result's contents + require.False(t, response.Success) + require.Equal(t, "Weak password", response.Message) +} + func TestAccountsCreateInvitedExpired(t *testing.T) { const ( username = "jeremy2" diff --git a/routes/emails.go b/routes/emails.go index deb3dd2..612324b 100644 --- a/routes/emails.go +++ b/routes/emails.go @@ -2,26 +2,134 @@ package routes import ( "net/http" + "strconv" + "strings" + "github.com/Sirupsen/logrus" + "github.com/ugorji/go/codec" + "github.com/zenazn/goji/web" + + "github.com/lavab/api/env" "github.com/lavab/api/models" "github.com/lavab/api/utils" ) +var ( + msgpackCodec codec.MsgpackHandle +) + // EmailsListResponse contains the result of the EmailsList request. type EmailsListResponse struct { - Success bool `json:"success"` - Message string `json:"message,omitempty"` - ItemsCount int `json:"items_count,omitempty"` - Emails []*models.Email `json:"emails,omitempty"` + Success bool `json:"success"` + Message string `json:"message,omitempty"` + Emails *[]*models.Email `json:"emails,omitempty"` } // EmailsList sends a list of the emails in the inbox. -func EmailsList(w http.ResponseWriter, r *http.Request) { +func EmailsList(c web.C, w http.ResponseWriter, r *http.Request) { + // Fetch the current session from the database + session := c.Env["token"].(*models.Token) + + // Parse the query + var ( + query = r.URL.Query() + sortRaw = query.Get("sort") + offsetRaw = query.Get("offset") + limitRaw = query.Get("limit") + sort []string + offset int + limit int + ) + + if offsetRaw != "" { + o, err := strconv.Atoi(offsetRaw) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + "offset": offset, + }).Error("Invalid offset") + + utils.JSONResponse(w, 400, &EmailsListResponse{ + Success: false, + Message: "Invalid offset", + }) + return + } + offset = o + } + + if limitRaw != "" { + l, err := strconv.Atoi(limitRaw) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + "limit": limit, + }).Error("Invalid limit") + + utils.JSONResponse(w, 400, &EmailsListResponse{ + Success: false, + Message: "Invalid limit", + }) + return + } + limit = l + } + + if sortRaw != "" { + sort = strings.Split(sortRaw, ",") + } + + // Get contacts from the database + emails, err := env.Emails.List(session.Owner, sort, offset, limit) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Error("Unable to fetch emails") + + utils.JSONResponse(w, 500, &EmailsListResponse{ + Success: false, + Message: "Internal error (code EM/LI/01)", + }) + return + } + + if offsetRaw != "" || limitRaw != "" { + count, err := env.Emails.CountOwnedBy(session.Owner) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Error("Unable to count emails") + + utils.JSONResponse(w, 500, &EmailsListResponse{ + Success: false, + Message: "Internal error (code EM/LI/02)", + }) + return + } + w.Header().Set("X-Total-Count", strconv.Itoa(count)) + } + utils.JSONResponse(w, 200, &EmailsListResponse{ - Success: true, - ItemsCount: 1, - Emails: []*models.Email{}, + Success: true, + Emails: &emails, }) + + // GET parameters: + // sort - split by commas, prefixes: - is desc, + is asc + // offset, limit - for pagination + // Pagination ADDS X-Total-Count to the response! +} + +type EmailsCreateRequest struct { + To []string `json:"to"` + BCC []string `json:"bcc"` + ReplyTo string `json:"reply_to"` + ThreadID string `json:"thread_id"` + Subject string `json:"title"` + Body string `json:"body"` + Preview string `json:"preview"` + Attachments []string `json:"attachments"` + PGPFingerprints []string `json:"pgp_fingerprints"` } // EmailsCreateResponse contains the result of the EmailsCreate request. @@ -32,39 +140,128 @@ type EmailsCreateResponse struct { } // EmailsCreate sends a new email -func EmailsCreate(w http.ResponseWriter, r *http.Request) { - utils.JSONResponse(w, 200, &EmailsCreateResponse{ +func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { + // Decode the request + var input EmailsCreateRequest + err := utils.ParseRequest(r, &input) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Warn("Unable to decode a request") + + utils.JSONResponse(w, 400, &EmailsCreateResponse{ + Success: false, + Message: "Invalid input format", + }) + return + } + + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Ensure that the input data isn't empty + if len(input.To) == 0 || input.Subject == "" || input.Body == "" { + utils.JSONResponse(w, 400, &EmailsCreateResponse{ + Success: false, + Message: "Invalid request", + }) + return + } + + // Create a new email struct + email := &models.Email{ + Kind: "sent", + To: input.To, + Resource: models.MakeResource(session.Owner, input.Subject), + AttachmentIDs: input.Attachments, + Body: models.Encrypted{ + Encoding: "json", + PGPFingerprints: input.PGPFingerprints, + Data: input.Body, + Schema: "email_body", + VersionMajor: 1, + VersionMinor: 0, + }, + Preview: models.Encrypted{ + Encoding: "json", + PGPFingerprints: input.PGPFingerprints, + Data: input.Preview, + Schema: "email_preview", + VersionMajor: 1, + VersionMinor: 0, + }, + ThreadID: input.ThreadID, + Status: "queued", + } + + // Insert the email into the database + if err := env.Emails.Insert(email); err != nil { + utils.JSONResponse(w, 500, &EmailsCreateResponse{ + Success: false, + Message: "internal server error - EM/CR/01", + }) + + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Error("Could not insert an email into the database") + return + } + + // Add a send request to the queue + err = env.NATS.Publish("send", email.ID) + if err != nil { + utils.JSONResponse(w, 500, &EmailsCreateResponse{ + Success: false, + Message: "internal server error - EM/CR/03", + }) + + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Error("Could not publish an email send request") + return + } + + utils.JSONResponse(w, 201, &EmailsCreateResponse{ Success: true, - Created: []string{"123"}, + Created: []string{email.ID}, }) } // EmailsGetResponse contains the result of the EmailsGet request. type EmailsGetResponse struct { - Success bool `json:"success"` - Message string `json:"message,omitempty"` - Status string `json:"status,omitempty"` + Success bool `json:"success"` + Message string `json:"message,omitempty"` + Email *models.Email `json:"email,omitempty"` } // EmailsGet responds with a single email message -func EmailsGet(w http.ResponseWriter, r *http.Request) { - utils.JSONResponse(w, 200, &EmailsGetResponse{ - Success: true, - Status: "sending", - }) -} +func EmailsGet(c web.C, w http.ResponseWriter, r *http.Request) { + // Get the email from the database + email, err := env.Emails.GetEmail(c.URLParams["id"]) + if err != nil { + utils.JSONResponse(w, 404, &EmailsGetResponse{ + Success: false, + Message: "Email not found", + }) + return + } -// EmailsUpdateResponse contains the result of the EmailsUpdate request. -type EmailsUpdateResponse struct { - Success bool `json:"success"` - Message string `json:"message"` -} + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Check for ownership + if email.Owner != session.Owner { + utils.JSONResponse(w, 404, &EmailsGetResponse{ + Success: false, + Message: "Email not found", + }) + return + } -// EmailsUpdate does *something* - TODO -func EmailsUpdate(w http.ResponseWriter, r *http.Request) { - utils.JSONResponse(w, 501, &EmailsUpdateResponse{ - Success: false, - Message: "Sorry, not implemented yet", + // Write the email to the response + utils.JSONResponse(w, 200, &EmailsGetResponse{ + Success: true, + Email: email, }) } @@ -75,9 +272,47 @@ type EmailsDeleteResponse struct { } // EmailsDelete remvoes an email from the system -func EmailsDelete(w http.ResponseWriter, r *http.Request) { - utils.JSONResponse(w, 501, &EmailsDeleteResponse{ - Success: false, - Message: "Sorry, not implemented yet", +func EmailsDelete(c web.C, w http.ResponseWriter, r *http.Request) { + // Get the email from the database + email, err := env.Emails.GetEmail(c.URLParams["id"]) + if err != nil { + utils.JSONResponse(w, 404, &EmailsDeleteResponse{ + Success: false, + Message: "Email not found", + }) + return + } + + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Check for ownership + if email.Owner != session.Owner { + utils.JSONResponse(w, 404, &EmailsDeleteResponse{ + Success: false, + Message: "Email not found", + }) + return + } + + // Perform the deletion + err = env.Emails.DeleteID(c.URLParams["id"]) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + "id": c.URLParams["id"], + }).Error("Unable to delete a email") + + utils.JSONResponse(w, 500, &EmailsDeleteResponse{ + Success: false, + Message: "Internal error (code EM/DE/01)", + }) + return + } + + // Write the email to the response + utils.JSONResponse(w, 200, &EmailsDeleteResponse{ + Success: true, + Message: "Email successfully removed", }) } diff --git a/routes/emails_test.go b/routes/emails_test.go new file mode 100644 index 0000000..7d4310a --- /dev/null +++ b/routes/emails_test.go @@ -0,0 +1,245 @@ +package routes_test + +import ( + "testing" + + "github.com/franela/goreq" + "github.com/stretchr/testify/require" + + "github.com/lavab/api/env" + "github.com/lavab/api/models" + "github.com/lavab/api/routes" +) + +var ( + emailID string + notOwnedEmailID string +) + +func TestEmailsCreate(t *testing.T) { + request := goreq.Request{ + Method: "POST", + Uri: server.URL + "/emails", + ContentType: "application/json", + Body: routes.EmailsCreateRequest{ + To: []string{"piotr@zduniak.net"}, + Subject: "hello world", + Body: "raw meaty email", + }, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsCreateResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.Equal(t, len(response.Created), 1) + require.True(t, response.Success) + + emailID = response.Created[0] +} + +func TestEmailsCreateInvalidBody(t *testing.T) { + request := goreq.Request{ + Method: "POST", + Uri: server.URL + "/emails", + ContentType: "application/json", + Body: "!@#!@#!@#", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsCreateResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.Equal(t, response.Message, "Invalid input format") + require.False(t, response.Success) +} + +func TestEmailsCreateMissingFields(t *testing.T) { + request := goreq.Request{ + Method: "POST", + Uri: server.URL + "/emails", + ContentType: "application/json", + Body: routes.EmailsCreateRequest{ + To: []string{"piotr@zduniak.net"}, + Subject: "hello world", + }, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsCreateResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.Equal(t, response.Message, "Invalid request") + require.False(t, response.Success) +} + +func TestEmailsGet(t *testing.T) { + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails/" + emailID, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsGetResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.True(t, response.Success) + require.Equal(t, "hello world", response.Email.Name) +} + +func TestEmailsGetInvalidID(t *testing.T) { + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails/nonexisting", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsGetResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.Equal(t, "Email not found", response.Message) + require.False(t, response.Success) +} + +func TestEmailsGetNotOwned(t *testing.T) { + email := &models.Email{ + Resource: models.MakeResource("not", "Carpeus Caesar"), + } + + err := env.Emails.Insert(email) + require.Nil(t, err) + + notOwnedEmailID = email.ID + + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails/" + email.ID, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsGetResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.Equal(t, "Email not found", response.Message) + require.False(t, response.Success) +} + +func TestEmailsList(t *testing.T) { + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails?offset=0&limit=1&sort=+date", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsListResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.True(t, response.Success) + require.Equal(t, "hello world", (*response.Emails)[0].Name) +} + +func TestEmailsListInvalidOffset(t *testing.T) { + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails?offset=pppp&limit=1&sort=+date", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsListResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.False(t, response.Success) + require.Equal(t, "Invalid offset", response.Message) +} + +func TestEmailsListInvalidLimit(t *testing.T) { + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails?offset=0&limit=pppp&sort=+date", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsListResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.False(t, response.Success) + require.Equal(t, "Invalid limit", response.Message) +} + +func TestEmailsDelete(t *testing.T) { + request := goreq.Request{ + Method: "DELETE", + Uri: server.URL + "/emails/" + emailID, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsDeleteResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.True(t, response.Success) + require.Equal(t, "Email successfully removed", response.Message) +} + +func TestEmailsDeleteNotExisting(t *testing.T) { + request := goreq.Request{ + Method: "DELETE", + Uri: server.URL + "/emails/nonexisting", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsDeleteResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.False(t, response.Success) + require.Equal(t, "Email not found", response.Message) +} + +func TestEmailsDeleteNotOwned(t *testing.T) { + request := goreq.Request{ + Method: "DELETE", + Uri: server.URL + "/emails/" + notOwnedEmailID, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsDeleteResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.False(t, response.Success) + require.Equal(t, "Email not found", response.Message) +} diff --git a/routes/init_test.go b/routes/init_test.go index 6d1d576..fb0e4e7 100644 --- a/routes/init_test.go +++ b/routes/init_test.go @@ -1,6 +1,7 @@ package routes_test import ( + "fmt" "net/http/httptest" "time" @@ -27,14 +28,18 @@ func init() { ClassicRegistration: true, UsernameReservation: true, - RethinkDBURL: "127.0.0.1:28015", + RedisAddress: "127.0.0.1:6379", + + NATSAddress: "nats://127.0.0.1:4222", + + RethinkDBAddress: "127.0.0.1:28015", RethinkDBKey: "", RethinkDBDatabase: "test", } // Connect to the RethinkDB server rdbSession, err := gorethink.Connect(gorethink.ConnectOpts{ - Address: env.Config.RethinkDBURL, + Address: env.Config.RethinkDBAddress, AuthKey: env.Config.RethinkDBKey, MaxIdle: 10, IdleTimeout: time.Second * 10, @@ -46,7 +51,7 @@ func init() { // Clear the test database err = gorethink.DbDrop("test").Exec(rdbSession) if err != nil { - panic("removing the test database should not return an error, got " + err.Error()) + fmt.Println("removing the test database should not return an error, got " + err.Error()) } // Disconnect diff --git a/routes/keys.go b/routes/keys.go index 980f705..4328583 100644 --- a/routes/keys.go +++ b/routes/keys.go @@ -9,6 +9,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/zenazn/goji/web" "golang.org/x/crypto/openpgp" + "golang.org/x/crypto/openpgp/armor" "github.com/lavab/api/env" "github.com/lavab/api/models" @@ -34,8 +35,17 @@ func KeysList(w http.ResponseWriter, r *http.Request) { return } + account, err := env.Accounts.FindAccountByName(user) + if err != nil { + utils.JSONResponse(w, 409, &KeysListResponse{ + Success: false, + Message: "Invalid username", + }) + return + } + // Find all keys owner by user - keys, err := env.Keys.FindByName(user) + keys, err := env.Keys.FindByOwner(account.ID) if err != nil { utils.JSONResponse(w, 500, &KeysListResponse{ Success: false, @@ -59,8 +69,7 @@ func KeysList(w http.ResponseWriter, r *http.Request) { // KeysCreateRequest contains the data passed to the KeysCreate endpoint. type KeysCreateRequest struct { - Key string `json:"key" schema:"key"` // gpg armored key - Image string `json:"image" schema:"image"` // todo + Key string `json:"key" schema:"key"` // gpg armored key } // KeysCreateResponse contains the result of the KeysCreate request. @@ -105,6 +114,21 @@ func KeysCreate(c web.C, w http.ResponseWriter, r *http.Request) { return } + // Parse using armor pkg + block, err := armor.Decode(strings.NewReader(input.Key)) + if err != nil { + utils.JSONResponse(w, 409, &KeysCreateResponse{ + Success: false, + Message: "Invalid key format", + }) + + env.Log.WithFields(logrus.Fields{ + "error": err, + "list": entityList, + }).Warn("Cannot parse an armored key #2") + return + } + // Get the account from db account, err := env.Accounts.GetAccount(session.Owner) if err != nil { @@ -132,17 +156,21 @@ func KeysCreate(c web.C, w http.ResponseWriter, r *http.Request) { // Allocate a new key key := &models.Key{ Resource: models.MakeResource( - session.Owner, + account.ID, fmt.Sprintf( - "%d/%s public key", + "%s/%d/%s", + utils.GetAlgorithmName(publicKey.PrimaryKey.PubKeyAlgo), bitLength, publicKey.PrimaryKey.KeyIdString(), ), ), - OwnerName: account.Name, - Key: input.Key, - KeyID: publicKey.PrimaryKey.KeyIdString(), - KeyIDShort: publicKey.PrimaryKey.KeyIdShortString(), + Headers: block.Header, + Algorithm: utils.GetAlgorithmName(publicKey.PrimaryKey.PubKeyAlgo), + Length: bitLength, + Key: input.Key, + KeyID: publicKey.PrimaryKey.KeyIdString(), + KeyIDShort: publicKey.PrimaryKey.KeyIdShortString(), + Reliability: 0, } // Update id as we can't do it directly during allocation diff --git a/routes/tokens.go b/routes/tokens.go index 3b11567..87d24b8 100644 --- a/routes/tokens.go +++ b/routes/tokens.go @@ -38,13 +38,16 @@ type TokensCreateRequest struct { Username string `json:"username" schema:"username"` Password string `json:"password" schema:"password"` Type string `json:"type" schema:"type"` + Token string `json:"token" schema:"token"` } // TokensCreateResponse contains the result of the TokensCreate request. type TokensCreateResponse struct { - Success bool `json:"success"` - Message string `json:"message,omitempty"` - Token *models.Token `json:"token,omitempty"` + Success bool `json:"success"` + Message string `json:"message,omitempty"` + Token *models.Token `json:"token,omitempty"` + FactorType string `json:"factor_type,omitempty"` + FactorRequest string `json:"factor_request,omitempty"` } // TokensCreate allows logging in to an account. @@ -104,12 +107,41 @@ func TokensCreate(w http.ResponseWriter, r *http.Request) { } } + // Check for 2nd factor + if user.FactorType != "" { + factor, ok := env.Factors[user.FactorType] + if ok { + if input.Token == "" { + req, err := factor.Request(user.ID) + if err == nil { + utils.JSONResponse(w, 403, &TokensCreateResponse{ + Success: false, + Message: "Factor token was not passed", + FactorType: user.FactorType, + FactorRequest: req, + }) + return + } + } else { + ok, err := factor.Verify(user.FactorValue, input.Token) + if !ok || err != nil { + utils.JSONResponse(w, 403, &TokensCreateResponse{ + Success: false, + Message: "Invalid token passed", + FactorType: user.FactorType, + }) + return + } + } + } + } + // Calculate the expiry date expDate := time.Now().Add(time.Hour * time.Duration(env.Config.SessionDuration)) // Create a new token token := &models.Token{ - Expiring: models.Expiring{expDate}, + Expiring: models.Expiring{ExpiryDate: expDate}, Resource: models.MakeResource(user.ID, "Auth token expiring on "+expDate.Format(time.RFC3339)), Type: input.Type, } diff --git a/setup/setup.go b/setup/setup.go index ab93a54..5549c6c 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -1,19 +1,38 @@ package setup import ( + "bufio" + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httptest" + "strings" + "sync" "time" "github.com/Sirupsen/logrus" + "github.com/apcera/nats" "github.com/dancannon/gorethink" + "github.com/googollee/go-socket.io" + "github.com/rs/cors" "github.com/zenazn/goji/web" "github.com/zenazn/goji/web/middleware" + "gopkg.in/igm/sockjs-go.v2/sockjs" + "github.com/lavab/api/cache" "github.com/lavab/api/db" "github.com/lavab/api/env" + "github.com/lavab/api/factor" "github.com/lavab/api/routes" "github.com/lavab/glogrus" ) +// sessions contains all "subscribing" WebSockets sessions +var ( + sessions = map[string][]sockjs.Session{} + sessionsLock sync.Mutex +) + // PrepareMux sets up the API func PrepareMux(flags *env.Flags) *web.Mux { // Set up a new logger @@ -31,14 +50,28 @@ func PrepareMux(flags *env.Flags) *web.Mux { // Pass it to the environment package env.Log = log + // Initialize the cache + redis, err := cache.NewRedisCache(&cache.RedisCacheOpts{ + Address: flags.RedisAddress, + Database: flags.RedisDatabase, + Password: flags.RedisPassword, + }) + if err != nil { + log.WithFields(logrus.Fields{ + "error": err, + }).Fatal("Unable to connect to the redis server") + } + + env.Cache = redis + // Set up the database rethinkOpts := gorethink.ConnectOpts{ - Address: flags.RethinkDBURL, + Address: flags.RethinkDBAddress, AuthKey: flags.RethinkDBKey, MaxIdle: 10, IdleTimeout: time.Second * 10, } - err := db.Setup(rethinkOpts) + err = db.Setup(rethinkOpts) if err != nil { log.WithFields(logrus.Fields{ "error": err, @@ -64,6 +97,7 @@ func PrepareMux(flags *env.Flags) *web.Mux { rethinkOpts.Database, "tokens", ), + Cache: redis, } env.Accounts = &db.AccountsTable{ RethinkCRUD: db.NewCRUDTable( @@ -94,6 +128,127 @@ func PrepareMux(flags *env.Flags) *web.Mux { "reservations", ), } + env.Emails = &db.EmailsTable{ + RethinkCRUD: db.NewCRUDTable( + rethinkSession, + rethinkOpts.Database, + "emails", + ), + } + + // NATS queue connection + nc, err := nats.Connect(flags.NATSAddress) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + "address": flags.NATSAddress, + }).Fatal("Unable to connect to NATS") + } + + c, err := nats.NewEncodedConn(nc, "json") + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + "address": flags.NATSAddress, + }).Fatal("Unable to initialize a JSON NATS connection") + } + + c.Subscribe("delivery", func(msg *struct { + ID string `json:"id"` + Owner string `json:"owner"` + }) { + // Check if we are handling owner's session + if _, ok := sessions[msg.Owner]; !ok { + return + } + + if len(sessions[msg.Owner]) == 0 { + return + } + + // Resolve the email + email, err := env.Emails.GetEmail(msg.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": msg.ID, + }).Error("Unable to resolve an email from queue") + return + } + + // Send notifications to subscribers + for _, session := range sessions[msg.Owner] { + result, _ := json.Marshal(map[string]interface{}{ + "type": "delivery", + "id": msg.ID, + "name": email.Name, + }) + err = session.Send(string(result)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + } + } + }) + + c.Subscribe("receipt", func(msg *struct { + ID string `json:"id"` + Owner string `json:"owner"` + }) { + // Check if we are handling owner's session + if _, ok := sessions[msg.Owner]; !ok { + return + } + + if len(sessions[msg.Owner]) == 0 { + return + } + + // Resolve the email + email, err := env.Emails.GetEmail(msg.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": msg.ID, + }).Error("Unable to resolve an email from queue") + return + } + + // Send notifications to subscribers + for _, session := range sessions[msg.Owner] { + result, _ := json.Marshal(map[string]interface{}{ + "type": "receipt", + "id": msg.ID, + "name": email.Name, + }) + err = session.Send(string(result)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + } + } + }) + + env.NATS = c + + // Initialize factors + env.Factors = make(map[string]factor.Factor) + if flags.YubiCloudID != "" { + yubicloud, err := factor.NewYubiCloud(flags.YubiCloudID, flags.YubiCloudKey) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Fatal("Unable to initiate YubiCloud") + } + env.Factors[yubicloud.Type()] = yubicloud + } + + authenticator := factor.NewAuthenticator(6) + env.Factors[authenticator.Type()] = authenticator // Create a new goji mux mux := web.New() @@ -106,6 +261,9 @@ func PrepareMux(flags *env.Flags) *web.Mux { mux.Use(middleware.RequestID) mux.Use(glogrus.NewGlogrus(log, "api")) mux.Use(middleware.Recoverer) + mux.Use(cors.New(cors.Options{ + AllowedOrigins: []string{"*"}, + }).Handler) mux.Use(middleware.AutomaticOptions) // Set up an auth'd mux @@ -138,7 +296,6 @@ func PrepareMux(flags *env.Flags) *web.Mux { auth.Get("/emails", routes.EmailsList) auth.Post("/emails", routes.EmailsCreate) auth.Get("/emails/:id", routes.EmailsGet) - auth.Put("/emails/:id", routes.EmailsUpdate) auth.Delete("/emails/:id", routes.EmailsDelete) // Labels @@ -161,6 +318,359 @@ func PrepareMux(flags *env.Flags) *web.Mux { mux.Get("/keys/:id", routes.KeysGet) auth.Post("/keys/:id/vote", routes.KeysVote) + // WebSockets handler + ws, err := socketio.NewServer(nil) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Fatal("Unable to create a socket.io server") + } + ws.On("connection", func(so socketio.Socket) { + env.Log.WithFields(logrus.Fields{ + "id": so.Id(), + }).Info("New WebSockets connection") + + so.On("request", func(id string, method string, path string, data string, headers map[string]string) { + w := httptest.NewRecorder() + r, err := http.NewRequest(method, "http://api.lavaboom.io"+path, strings.NewReader(data)) + if err != nil { + so.Emit("error", err.Error()) + return + } + + for key, value := range headers { + r.Header.Set(key, value) + } + + mux.ServeHTTP(w, r) + + resp, err := http.ReadResponse(bufio.NewReader(w.Body), r) + if err != nil { + so.Emit("error", err.Error()) + return + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + so.Emit("error", err.Error()) + return + } + + so.Emit("response", id, resp.StatusCode, resp.Header, body) + }) + }) + + mux.Handle("/ws/*", sockjs.NewHandler("/ws", sockjs.DefaultOptions, func(session sockjs.Session) { + var subscribed string + + // A new goroutine seems to be spawned for each new session + for { + // Read a message from the input + msg, err := session.Recv() + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while reading from a WebSocket") + break + } + + // Decode the message + var input struct { + Type string `json:"type"` + Token string `json:"token"` + ID string `json:"id"` + Method string `json:"method"` + Path string `json:"path"` + Body string `json:"body"` + Headers map[string]string `json:"headers"` + } + err = json.Unmarshal([]byte(msg), &input) + if err != nil { + // Return an error response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "error", + "error": err, + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + continue + } + + // Check message's type + if input.Type == "subscribe" { + // Listen to user's events + + // Check if token is empty + if input.Token == "" { + // Return an error response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "error", + "error": "Invalid token", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + continue + } + + // Check the token in database + token, err := env.Tokens.GetToken(input.Token) + if err != nil { + // Return an error response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "error", + "error": "Invalid token", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + continue + } + + // Do the actual subscription + subscribed = token.Owner + sessionsLock.Lock() + + // Sessions map already contains this owner + if _, ok := sessions[token.Owner]; ok { + sessions[token.Owner] = append(sessions[token.Owner], session) + } else { + // We have to allocate a new slice + sessions[token.Owner] = []sockjs.Session{session} + } + + // Unlock the map write + sessionsLock.Unlock() + + // Return a response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "subscribed", + }) + err = session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + } else if input.Type == "unsubscribe" { + if subscribed == "" { + resp, _ := json.Marshal(map[string]interface{}{ + "type": "error", + "error": "Not subscribed", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + } + + sessionsLock.Lock() + + if _, ok := sessions[subscribed]; !ok { + // Return a response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "unsubscribed", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + sessionsLock.Unlock() + subscribed = "" + break + } + sessionsLock.Unlock() + subscribed = "" + continue + } + + if len(sessions[subscribed]) == 1 { + delete(sessions, subscribed) + + // Return a response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "unsubscribed", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + sessionsLock.Unlock() + subscribed = "" + break + } + sessionsLock.Unlock() + subscribed = "" + continue + } + + // Find the session + index := -1 + for i, session2 := range sessions[subscribed] { + if session == session2 { + index = i + break + } + } + + // We didn't find anything + if index == -1 { + // Return a response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "unsubscribed", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + sessionsLock.Unlock() + subscribed = "" + break + } + sessionsLock.Unlock() + subscribed = "" + continue + } + + // We found it, so we are supposed to slice it + sessions[subscribed][index] = sessions[subscribed][len(sessions[subscribed])-1] + sessions[subscribed][len(sessions[subscribed])-1] = nil + sessions[subscribed] = sessions[subscribed][:len(sessions[subscribed])-1] + + // Return a response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "unsubscribed", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + sessionsLock.Unlock() + subscribed = "" + break + } + sessionsLock.Unlock() + subscribed = "" + } else if input.Type == "request" { + // Perform the request + w := httptest.NewRecorder() + r, err := http.NewRequest(input.Method, "http://api.lavaboom.io"+input.Path, strings.NewReader(input.Body)) + if err != nil { + // Return an error response + resp, _ := json.Marshal(map[string]interface{}{ + "error": err.Error(), + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + continue + } + + r.RequestURI = input.Path + + for key, value := range input.Headers { + r.Header.Set(key, value) + } + + mux.ServeHTTP(w, r) + + // Return the final response + result, _ := json.Marshal(map[string]interface{}{ + "type": "response", + "id": input.ID, + "status": w.Code, + "header": w.HeaderMap, + "body": w.Body.String(), + }) + err = session.Send(string(result)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + } + } + + // We have to clear the subscription here too. TODO: make the code shorter + if subscribed == "" { + return + } + + sessionsLock.Lock() + + if _, ok := sessions[subscribed]; !ok { + sessionsLock.Unlock() + return + } + + if len(sessions[subscribed]) == 1 { + delete(sessions, subscribed) + sessionsLock.Unlock() + return + } + + // Find the session + index := -1 + for i, session2 := range sessions[subscribed] { + if session == session2 { + index = i + break + } + } + + // We didn't find anything + if index == -1 { + sessionsLock.Unlock() + return + } + + // We found it, so we are supposed to slice it + sessions[subscribed][index] = sessions[subscribed][len(sessions[subscribed])-1] + sessions[subscribed][len(sessions[subscribed])-1] = nil + sessions[subscribed] = sessions[subscribed][:len(sessions[subscribed])-1] + + // Unlock the mutex + sessionsLock.Unlock() + })) + // Merge the muxes mux.Handle("/*", auth) diff --git a/setup/setup_test.go b/setup/setup_test.go index ebbe49b..c455a0c 100644 --- a/setup/setup_test.go +++ b/setup/setup_test.go @@ -18,7 +18,11 @@ func TestSetup(t *testing.T) { SessionDuration: 72, ClassicRegistration: true, - RethinkDBURL: "127.0.0.1:28015", + RedisAddress: "127.0.0.1:6379", + + NATSAddress: "nats://127.0.0.1:4222", + + RethinkDBAddress: "127.0.0.1:28015", RethinkDBKey: "", RethinkDBDatabase: "test", } diff --git a/utils/pgp.go b/utils/pgp.go new file mode 100644 index 0000000..b806532 --- /dev/null +++ b/utils/pgp.go @@ -0,0 +1,21 @@ +package utils + +import "golang.org/x/crypto/openpgp/packet" + +// GetAlgorithmName returns algorithm's name depending on its ID +func GetAlgorithmName(id packet.PublicKeyAlgorithm) string { + switch id { + case 1, 2, 3: + return "RSA" + case 16: + return "ElGamal" + case 17: + return "DSA" + case 18: + return "ECDH" + case 19: + return "ECDSA" + default: + return "unknown" + } +}