From 0596b0099e72f62f23ed5e56bbe310b90094686a Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sat, 20 Dec 2014 00:10:07 +0100 Subject: [PATCH 01/12] queue basis --- _vagrant/Vagrantfile | 66 +++++++++++++++++++++++++++++++++++--------- env/config.go | 2 ++ env/env.go | 7 +++++ main.go | 10 +++++++ setup/setup.go | 38 +++++++++++++++++++++++++ 5 files changed, 110 insertions(+), 13 deletions(-) diff --git a/_vagrant/Vagrantfile b/_vagrant/Vagrantfile index 53ea370..e14720f 100644 --- a/_vagrant/Vagrantfile +++ b/_vagrant/Vagrantfile @@ -5,21 +5,61 @@ VAGRANTFILE_API_VERSION = "2" Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| - config.vm.box = "ubuntu/trusty64" - # rethinkdb - config.vm.network "forwarded_port", guest: 8080, host: 8080 - config.vm.network "forwarded_port", guest: 28015, host: 28015 - config.vm.network "forwarded_port", guest: 29015, host: 29015 + #config.vm.define "redisthink" do |rethinkdb| + #rethinkdb.vm.box = "ubuntu/trusty64" - # redis - config.vm.network "forwarded_port", guest: 6379, host: 6379 + # rethinkdb + #rethinkdb.vm.network "forwarded_port", guest: 8080, host: 8080 + #rethinkdb.vm.network "forwarded_port", guest: 28015, host: 28015 + #rethinkdb.vm.network "forwarded_port", guest: 29015, host: 29015 - config.vm.provider "virtualbox" do |v| - v.memory = 2048 - v.cpus = 4 - end + # redis + #rethinkdb.vm.network "forwarded_port", guest: 6379, host: 6379 + + #rethinkdb.vm.provider "virtualbox" do |v| + #v.memory = 2048 + #v.cpus = 4 + #end + + # load ansible playbook + #rethinkdb.vm.provision "shell", path: "deploy.sh" + #end + + config.vm.define "docker" do |docker| + docker.vm.box = "ubuntu/trusty64" + + docker.vm.network "forwarded_port", guest: 4160, host: 4160 + docker.vm.network "forwarded_port", guest: 4161, host: 4161 + docker.vm.network "forwarded_port", guest: 4150, host: 4150 + docker.vm.network "forwarded_port", guest: 4151, host: 4151 + docker.vm.network "forwarded_port", guest: 6379, host: 6379 + docker.vm.network "forwarded_port", guest: 8080, host: 8080 + docker.vm.network "forwarded_port", guest: 28015, host: 28015 + docker.vm.network "forwarded_port", guest: 29015, host: 29015 - # load ansible playbook - config.vm.provision "shell", path: "deploy.sh" + docker.vm.provider "virtualbox" do |v| + v.customize ["modifyvm", :id, "--natdnshostresolver1", "on"] + v.customize ["modifyvm", :id, "--natdnsproxy1", "on"] + end + + docker.vm.provision "docker" do |d| + d.pull_images "nsqio/nsqlookupd" + d.run "nsqio/nsqlookupd", + args: "--name lookupd -p 4160:4160 -p 4161:4161" + + d.pull_images "nsqio/nsqd" + d.run "nsqio/nsqd", + args: "--name nsqd -p 4150:4150 -p 4151:4151", + cmd: "--broadcast-address=172.17.42.1 --lookupd-tcp-address=172.17.42.1:4160" + + d.pull_images "dockerfile/rethinkdb" + d.run "dockerfile/rethinkdb", + args: "--name rethinkdb -p 8080:8080 -p 28015:28015 -p 29015:29015" + + d.pull_images "dockerfile/redis" + d.run "dockerfile/redis", + args: "--name redis -p 6379:6379" + end + end end diff --git a/env/config.go b/env/config.go index c07ee6a..22daddf 100644 --- a/env/config.go +++ b/env/config.go @@ -19,6 +19,8 @@ type Flags struct { RethinkDBKey string RethinkDBDatabase string + NSQAddress string + YubiCloudID string YubiCloudKey string } diff --git a/env/env.go b/env/env.go index e7f5ec5..b2e3e62 100644 --- a/env/env.go +++ b/env/env.go @@ -2,6 +2,7 @@ package env import ( "github.com/Sirupsen/logrus" + "github.com/bitly/go-nsq" "github.com/dancannon/gorethink" "github.com/lavab/api/cache" @@ -30,4 +31,10 @@ var ( Reservations *db.ReservationsTable // Factors contains all currently registered factors Factors map[string]factor.Factor + // NSQProducer is used for email sending + NSQProducer *nsq.Producer + // DeliveryConsumer is the NSQ consumer used for email delivery confirmations + DeliveryConsumer *nsq.Consumer + // ReceiptConsumer is the NSQ consumer for new email receipt handling + ReceiptConsumer *nsq.Consumer ) diff --git a/main.go b/main.go index 7eb6bbd..122ada8 100644 --- a/main.go +++ b/main.go @@ -55,6 +55,14 @@ var ( } return database }(), "Database name on the RethinkDB server") + // NSQ address + nsqAddress = flag.String("nsq_address", func() string { + address := os.Getenv("NSQLOOKUPD_PORT_4160_TCP_ADDR") + if address == "" { + address = "127.0.0.1" + } + return address + ":4160" + }(), "Address of the NSQ server") // YubiCloud params yubiCloudID = flag.String("yubicloud_id", "", "YubiCloud API id") yubiCloudKey = flag.String("yubicloud_key", "", "YubiCloud API key") @@ -83,6 +91,8 @@ func main() { RethinkDBKey: *rethinkdbKey, RethinkDBDatabase: *rethinkdbDatabase, + NSQAddress: *nsqAddress, + YubiCloudID: *yubiCloudID, YubiCloudKey: *yubiCloudKey, } diff --git a/setup/setup.go b/setup/setup.go index eabe49d..b09b094 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -4,6 +4,7 @@ import ( "time" "github.com/Sirupsen/logrus" + "github.com/bitly/go-nsq" "github.com/dancannon/gorethink" "github.com/zenazn/goji/web" "github.com/zenazn/goji/web/middleware" @@ -112,6 +113,43 @@ func PrepareMux(flags *env.Flags) *web.Mux { ), } + // Initialize the NSQ connections + nsqProducer, err := nsq.NewProducer(flags.NSQAddress, nsq.NewConfig()) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Fatal("Unable to create a NSQProducer") + } + env.NSQProducer = nsqProducer + + deliveryConsumer, err := nsq.NewConsumer("delivery", "confirmation", nsq.NewConfig()) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Fatal("Unable to create a DeliveryConsumer") + } + err = deliveryConsumer.ConnectToNSQLookupd(flags.NSQAddress) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Fatal("Unable to connect to nsqlookupd") + } + env.DeliveryConsumer = deliveryConsumer + + receiptConsumer, err := nsq.NewConsumer("receipt", "notification", nsq.NewConfig()) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Fatal("Unable to create a DeliveryConsumer") + } + err = receiptConsumer.ConnectToNSQLookupd(flags.NSQAddress) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Fatal("Unable to connect to nsqlookupd") + } + env.ReceiptConsumer = receiptConsumer + // Initialize factors env.Factors = make(map[string]factor.Factor) if flags.YubiCloudID != "" { From 943d36df4ca7e05f3de777243ff626f4d884ba75 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sun, 21 Dec 2014 01:02:14 +0100 Subject: [PATCH 02/12] A large chunk of base emails code --- _vagrant/Vagrantfile | 17 ++----- db/table_emails.go | 42 ++++++++++++++++ env/config.go | 2 +- env/env.go | 12 ++--- main.go | 12 ++--- models/base_encrypted.go | 4 +- models/email.go | 2 + routes/emails.go | 105 +++++++++++++++++++++++++++++++++++++-- setup/setup.go | 56 ++++++++++----------- 9 files changed, 192 insertions(+), 60 deletions(-) create mode 100644 db/table_emails.go diff --git a/_vagrant/Vagrantfile b/_vagrant/Vagrantfile index e14720f..36d0121 100644 --- a/_vagrant/Vagrantfile +++ b/_vagrant/Vagrantfile @@ -29,10 +29,8 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| config.vm.define "docker" do |docker| docker.vm.box = "ubuntu/trusty64" - docker.vm.network "forwarded_port", guest: 4160, host: 4160 - docker.vm.network "forwarded_port", guest: 4161, host: 4161 - docker.vm.network "forwarded_port", guest: 4150, host: 4150 - docker.vm.network "forwarded_port", guest: 4151, host: 4151 + docker.vm.network "forwarded_port", guest: 4222, host: 4222 + docker.vm.network "forwarded_port", guest: 8333, host: 8333 docker.vm.network "forwarded_port", guest: 6379, host: 6379 docker.vm.network "forwarded_port", guest: 8080, host: 8080 docker.vm.network "forwarded_port", guest: 28015, host: 28015 @@ -44,14 +42,9 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| end docker.vm.provision "docker" do |d| - d.pull_images "nsqio/nsqlookupd" - d.run "nsqio/nsqlookupd", - args: "--name lookupd -p 4160:4160 -p 4161:4161" - - d.pull_images "nsqio/nsqd" - d.run "nsqio/nsqd", - args: "--name nsqd -p 4150:4150 -p 4151:4151", - cmd: "--broadcast-address=172.17.42.1 --lookupd-tcp-address=172.17.42.1:4160" + d.pull_images "apcera/gnatsd" + d.run "apcera/gnatsd", + args: "--name lookupd -p 4222:4222 -p 8333:8333" d.pull_images "dockerfile/rethinkdb" d.run "dockerfile/rethinkdb", diff --git a/db/table_emails.go b/db/table_emails.go new file mode 100644 index 0000000..0a14eb9 --- /dev/null +++ b/db/table_emails.go @@ -0,0 +1,42 @@ +package db + +import ( + "github.com/lavab/api/models" +) + +// Emails implements the CRUD interface for tokens +type EmailsTable struct { + RethinkCRUD +} + +// GetEmail returns a token with specified name +func (c *EmailsTable) GetEmail(id string) (*models.Email, error) { + var result models.Email + + if err := c.FindFetchOne(id, &result); err != nil { + return nil, err + } + + return &result, nil +} + +// GetOwnedBy returns all contacts owned by id +func (c *EmailsTable) GetOwnedBy(id string) ([]*models.Email, error) { + var result []*models.Email + + err := c.WhereAndFetch(map[string]interface{}{ + "owner": id, + }, &result) + if err != nil { + return nil, err + } + + return result, nil +} + +// DeleteOwnedBy deletes all contacts owned by id +func (c *EmailsTable) DeleteOwnedBy(id string) error { + return c.Delete(map[string]interface{}{ + "owner": id, + }) +} diff --git a/env/config.go b/env/config.go index 22daddf..6b655df 100644 --- a/env/config.go +++ b/env/config.go @@ -19,7 +19,7 @@ type Flags struct { RethinkDBKey string RethinkDBDatabase string - NSQAddress string + NATSAddress string YubiCloudID string YubiCloudKey string diff --git a/env/env.go b/env/env.go index b2e3e62..ec7444a 100644 --- a/env/env.go +++ b/env/env.go @@ -2,7 +2,7 @@ package env import ( "github.com/Sirupsen/logrus" - "github.com/bitly/go-nsq" + "github.com/apcera/nats" "github.com/dancannon/gorethink" "github.com/lavab/api/cache" @@ -29,12 +29,10 @@ var ( Contacts *db.ContactsTable // Reservations is the global instance of ReservationsTable Reservations *db.ReservationsTable + // Emails is the global instance of EmailsTable + Emails *db.EmailsTable // Factors contains all currently registered factors Factors map[string]factor.Factor - // NSQProducer is used for email sending - NSQProducer *nsq.Producer - // DeliveryConsumer is the NSQ consumer used for email delivery confirmations - DeliveryConsumer *nsq.Consumer - // ReceiptConsumer is the NSQ consumer for new email receipt handling - ReceiptConsumer *nsq.Consumer + // NATS is the encoded connection to the NATS queue + NATS *nats.EncodedConn ) diff --git a/main.go b/main.go index 122ada8..787ee23 100644 --- a/main.go +++ b/main.go @@ -55,14 +55,14 @@ var ( } return database }(), "Database name on the RethinkDB server") - // NSQ address - nsqAddress = flag.String("nsq_address", func() string { - address := os.Getenv("NSQLOOKUPD_PORT_4160_TCP_ADDR") + // NATS address + natsAddress = flag.String("nats_address", func() string { + address := os.Getenv("NATS_PORT_4222_TCP_ADDR") if address == "" { address = "127.0.0.1" } - return address + ":4160" - }(), "Address of the NSQ server") + return "nats://" + address + ":4222" + }(), "Address of the NATS server") // YubiCloud params yubiCloudID = flag.String("yubicloud_id", "", "YubiCloud API id") yubiCloudKey = flag.String("yubicloud_key", "", "YubiCloud API key") @@ -91,7 +91,7 @@ func main() { RethinkDBKey: *rethinkdbKey, RethinkDBDatabase: *rethinkdbDatabase, - NSQAddress: *nsqAddress, + NATSAddress: *natsAddress, YubiCloudID: *yubiCloudID, YubiCloudKey: *yubiCloudKey, diff --git a/models/base_encrypted.go b/models/base_encrypted.go index 44eec0b..0597cd1 100644 --- a/models/base_encrypted.go +++ b/models/base_encrypted.go @@ -5,8 +5,8 @@ type Encrypted struct { // Encoding tells the reader how to decode the data; can be "json", "protobuf", maybe more in the future Encoding string `json:"encoding" gorethink:"encoding"` - // PgpFingerprints contains the fingerprints of the PGP public keys used to encrypt the data. - PgpFingerprints []string `json:"pgp_fingerprints" gorethink:"pgp_fingerprints"` + // PGPFingerprints contains the fingerprints of the PGP public keys used to encrypt the data. + PGPFingerprints []string `json:"pgp_fingerprints" gorethink:"pgp_fingerprints"` // Data is the raw, PGP-encrypted data Data string `json:"raw" gorethink:"raw"` diff --git a/models/email.go b/models/email.go index 610570c..10aaa5e 100644 --- a/models/email.go +++ b/models/email.go @@ -23,4 +23,6 @@ type Email struct { // ThreadID ThreadID string `json:"thread_id" gorethink:"thread_id"` + + Status string `json:"status" gorethink:"status"` } diff --git a/routes/emails.go b/routes/emails.go index deb3dd2..3cce604 100644 --- a/routes/emails.go +++ b/routes/emails.go @@ -3,10 +3,19 @@ package routes import ( "net/http" + "github.com/Sirupsen/logrus" + "github.com/ugorji/go/codec" + "github.com/zenazn/goji/web" + + "github.com/lavab/api/env" "github.com/lavab/api/models" "github.com/lavab/api/utils" ) +var ( + msgpackCodec codec.MsgpackHandle +) + // EmailsListResponse contains the result of the EmailsList request. type EmailsListResponse struct { Success bool `json:"success"` @@ -24,6 +33,18 @@ func EmailsList(w http.ResponseWriter, r *http.Request) { }) } +type EmailsCreateRequest struct { + To []string `json:"to"` + BCC []string `json:"bcc"` + ReplyTo string `json:"reply_to"` + ThreadID string `json:"thread_id"` + Title string `json:"title"` + Body string `json:"body"` + Preview string `json:"preview"` + Attachments []string `json:"attachments"` + PGPFingerprints []string `json:"pgp_fingerprints"` +} + // EmailsCreateResponse contains the result of the EmailsCreate request. type EmailsCreateResponse struct { Success bool `json:"success"` @@ -32,10 +53,88 @@ type EmailsCreateResponse struct { } // EmailsCreate sends a new email -func EmailsCreate(w http.ResponseWriter, r *http.Request) { - utils.JSONResponse(w, 200, &EmailsCreateResponse{ +func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { + // Decode the request + var input EmailsCreateRequest + err := utils.ParseRequest(r, &input) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Warn("Unable to decode a request") + + utils.JSONResponse(w, 400, &EmailsCreateResponse{ + Success: false, + Message: "Invalid input format", + }) + return + } + + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Ensure that the input data isn't empty + if len(input.To) == 0 || input.Title == "" || input.Body == "" { + utils.JSONResponse(w, 400, &EmailsCreateResponse{ + Success: false, + Message: "Invalid request", + }) + return + } + + // Create a new email struct + email := &models.Email{ + Resource: models.MakeResource(session.Owner, input.Title), + AttachmentIDs: input.Attachments, + Body: models.Encrypted{ + Encoding: "json", + PGPFingerprints: input.PGPFingerprints, + Data: input.Body, + Schema: "email_body", + VersionMajor: 1, + VersionMinor: 0, + }, + Preview: models.Encrypted{ + Encoding: "json", + PGPFingerprints: input.PGPFingerprints, + Data: input.Preview, + Schema: "email_preview", + VersionMajor: 1, + VersionMinor: 0, + }, + ThreadID: input.ThreadID, + Status: "queued", + } + + // Insert the email into the database + if err := env.Emails.Insert(email); err != nil { + utils.JSONResponse(w, 500, &EmailsCreateResponse{ + Success: false, + Message: "internal server error - EM/CR/01", + }) + + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Error("Could not insert an email into the database") + return + } + + // Add a send request to the queue + err = env.NATS.Publish("send", email.ID) + if err != nil { + utils.JSONResponse(w, 500, &EmailsCreateResponse{ + Success: false, + Message: "internal server error - EM/CR/03", + }) + + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Error("Could not publish an email send request") + return + } + + utils.JSONResponse(w, 201, &EmailsCreateResponse{ Success: true, - Created: []string{"123"}, + Created: []string{email.ID}, }) } diff --git a/setup/setup.go b/setup/setup.go index b09b094..f2215d3 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -1,10 +1,11 @@ package setup import ( + "fmt" "time" "github.com/Sirupsen/logrus" - "github.com/bitly/go-nsq" + "github.com/apcera/nats" "github.com/dancannon/gorethink" "github.com/zenazn/goji/web" "github.com/zenazn/goji/web/middleware" @@ -112,43 +113,40 @@ func PrepareMux(flags *env.Flags) *web.Mux { "reservations", ), } - - // Initialize the NSQ connections - nsqProducer, err := nsq.NewProducer(flags.NSQAddress, nsq.NewConfig()) - if err != nil { - env.Log.WithFields(logrus.Fields{ - "error": err, - }).Fatal("Unable to create a NSQProducer") + env.Emails = &db.EmailsTable{ + RethinkCRUD: db.NewCRUDTable( + rethinkSession, + rethinkOpts.Database, + "emails", + ), } - env.NSQProducer = nsqProducer - deliveryConsumer, err := nsq.NewConsumer("delivery", "confirmation", nsq.NewConfig()) + // NATS queue connection + nc, err := nats.Connect(flags.NATSAddress) if err != nil { env.Log.WithFields(logrus.Fields{ - "error": err, - }).Fatal("Unable to create a DeliveryConsumer") + "error": err, + "address": flags.NATSAddress, + }).Fatal("Unable to connect to NATS") } - err = deliveryConsumer.ConnectToNSQLookupd(flags.NSQAddress) - if err != nil { - env.Log.WithFields(logrus.Fields{ - "error": err, - }).Fatal("Unable to connect to nsqlookupd") - } - env.DeliveryConsumer = deliveryConsumer - receiptConsumer, err := nsq.NewConsumer("receipt", "notification", nsq.NewConfig()) - if err != nil { - env.Log.WithFields(logrus.Fields{ - "error": err, - }).Fatal("Unable to create a DeliveryConsumer") - } - err = receiptConsumer.ConnectToNSQLookupd(flags.NSQAddress) + c, err := nats.NewEncodedConn(nc, "json") if err != nil { env.Log.WithFields(logrus.Fields{ - "error": err, - }).Fatal("Unable to connect to nsqlookupd") + "error": err, + "address": flags.NATSAddress, + }).Fatal("Unable to initialize a JSON NATS connection") } - env.ReceiptConsumer = receiptConsumer + + c.Subscribe("delivery", func(s string) { + fmt.Printf("Received a message: %s\n", s) + }) + + c.Subscribe("receipt", func(s string) { + fmt.Printf("Received a message: %s\n", s) + }) + + env.NATS = c // Initialize factors env.Factors = make(map[string]factor.Factor) From 91106bd703e6a7d9c46a8657c33bc6ec805efaea Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Mon, 22 Dec 2014 00:07:06 +0100 Subject: [PATCH 03/12] websockets --- _research/ws_client/index.html | 44 +++++++++++++++++++++++++++ _vagrant/Vagrantfile | 2 +- setup/setup.go | 55 ++++++++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 _research/ws_client/index.html diff --git a/_research/ws_client/index.html b/_research/ws_client/index.html new file mode 100644 index 0000000..e11663a --- /dev/null +++ b/_research/ws_client/index.html @@ -0,0 +1,44 @@ + + + + + lavab client + + + + +
+
+ + + + + + + diff --git a/_vagrant/Vagrantfile b/_vagrant/Vagrantfile index 36d0121..15c639f 100644 --- a/_vagrant/Vagrantfile +++ b/_vagrant/Vagrantfile @@ -44,7 +44,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| docker.vm.provision "docker" do |d| d.pull_images "apcera/gnatsd" d.run "apcera/gnatsd", - args: "--name lookupd -p 4222:4222 -p 8333:8333" + args: "--name gnatsd -p 4222:4222 -p 8333:8333" d.pull_images "dockerfile/rethinkdb" d.run "dockerfile/rethinkdb", diff --git a/setup/setup.go b/setup/setup.go index f2215d3..d1768c6 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -1,12 +1,19 @@ package setup import ( + "bufio" "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "strings" "time" "github.com/Sirupsen/logrus" "github.com/apcera/nats" "github.com/dancannon/gorethink" + "github.com/googollee/go-socket.io" + "github.com/rs/cors" "github.com/zenazn/goji/web" "github.com/zenazn/goji/web/middleware" @@ -174,6 +181,10 @@ func PrepareMux(flags *env.Flags) *web.Mux { mux.Use(middleware.RequestID) mux.Use(glogrus.NewGlogrus(log, "api")) mux.Use(middleware.Recoverer) + mux.Use(cors.New(cors.Options{ + AllowCredentials: true, + AllowedOrigins: []string{"*"}, + }).Handler) mux.Use(middleware.AutomaticOptions) // Set up an auth'd mux @@ -229,6 +240,50 @@ func PrepareMux(flags *env.Flags) *web.Mux { mux.Get("/keys/:id", routes.KeysGet) auth.Post("/keys/:id/vote", routes.KeysVote) + // WebSockets handler + ws, err := socketio.NewServer(nil) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Fatal("Unable to create a socket.io server") + } + ws.On("connection", func(so socketio.Socket) { + env.Log.WithFields(logrus.Fields{ + "id": so.Id(), + }).Info("New WebSockets connection") + + so.On("request", func(id string, method string, path string, data string, headers map[string]string) { + w := httptest.NewRecorder() + r, err := http.NewRequest(method, "http://api.lavaboom.io"+path, strings.NewReader(data)) + if err != nil { + so.Emit("error", err.Error()) + return + } + + for key, value := range headers { + r.Header.Set(key, value) + } + + mux.ServeHTTP(w, r) + + resp, err := http.ReadResponse(bufio.NewReader(w.Body), r) + if err != nil { + so.Emit("error", err.Error()) + return + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + so.Emit("error", err.Error()) + return + } + + so.Emit("response", id, resp.StatusCode, resp.Header, body) + }) + }) + + mux.Handle("/socket.io/", ws) + // Merge the muxes mux.Handle("/*", auth) From 76c1a2d7490c5a888723d42347f528c5615a0dc8 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Fri, 26 Dec 2014 19:15:47 +0100 Subject: [PATCH 04/12] SockJS support --- _research/ws_client/index.html | 28 +++++------ setup/setup.go | 88 ++++++++++++++++++++++++++++++++-- 2 files changed, 97 insertions(+), 19 deletions(-) diff --git a/_research/ws_client/index.html b/_research/ws_client/index.html index e11663a..f0c88d4 100644 --- a/_research/ws_client/index.html +++ b/_research/ws_client/index.html @@ -17,27 +17,23 @@ - + diff --git a/setup/setup.go b/setup/setup.go index d1768c6..698c422 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -2,6 +2,7 @@ package setup import ( "bufio" + "encoding/json" "fmt" "io/ioutil" "net/http" @@ -16,6 +17,7 @@ import ( "github.com/rs/cors" "github.com/zenazn/goji/web" "github.com/zenazn/goji/web/middleware" + "gopkg.in/igm/sockjs-go.v2/sockjs" "github.com/lavab/api/cache" "github.com/lavab/api/db" @@ -182,8 +184,7 @@ func PrepareMux(flags *env.Flags) *web.Mux { mux.Use(glogrus.NewGlogrus(log, "api")) mux.Use(middleware.Recoverer) mux.Use(cors.New(cors.Options{ - AllowCredentials: true, - AllowedOrigins: []string{"*"}, + AllowedOrigins: []string{"*"}, }).Handler) mux.Use(middleware.AutomaticOptions) @@ -282,7 +283,88 @@ func PrepareMux(flags *env.Flags) *web.Mux { }) }) - mux.Handle("/socket.io/", ws) + mux.Handle("/ws/*", sockjs.NewHandler("/ws", sockjs.DefaultOptions, func(session sockjs.Session) { + // A new goroutine seems to be spawned for each new session + for { + // Read a message from the input + msg, err := session.Recv() + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while reading from a WebSocket") + break + } + + // Decode the message + var input struct { + ID string `json:"id"` + Method string `json:"method"` + Path string `json:"path"` + Body string `json:"body"` + Headers map[string]string `json:"headers"` + } + err = json.Unmarshal([]byte(msg), &input) + if err != nil { + // Return an error response + resp, _ := json.Marshal(map[string]interface{}{ + "error": err, + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + continue + } + + // Perform the request + w := httptest.NewRecorder() + r, err := http.NewRequest(input.Method, "http://api.lavaboom.io"+input.Path, strings.NewReader(input.Body)) + if err != nil { + // Return an error response + resp, _ := json.Marshal(map[string]interface{}{ + "error": err.Error(), + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + continue + } + + r.RequestURI = input.Path + + for key, value := range input.Headers { + r.Header.Set(key, value) + } + + mux.ServeHTTP(w, r) + + // Return the final response + result, _ := json.Marshal(map[string]interface{}{ + "id": input.ID, + "status": w.Code, + "header": w.HeaderMap, + "body": w.Body.String(), + }) + err = session.Send(string(result)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + } + })) // Merge the muxes mux.Handle("/*", auth) From 4a55c84b496fdcec56d48120e535c2da33279fc1 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sat, 27 Dec 2014 12:47:08 +0100 Subject: [PATCH 05/12] Added message type to data sent back to WS client --- setup/setup.go | 1 + 1 file changed, 1 insertion(+) diff --git a/setup/setup.go b/setup/setup.go index 698c422..c169fa5 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -350,6 +350,7 @@ func PrepareMux(flags *env.Flags) *web.Mux { // Return the final response result, _ := json.Marshal(map[string]interface{}{ + "type": "response", "id": input.ID, "status": w.Code, "header": w.HeaderMap, From d8d0ecec2749596086e8a6991ac8e01be81b053c Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sat, 27 Dec 2014 12:50:17 +0100 Subject: [PATCH 06/12] Fixed tests and changed status badges in README.md --- README.md | 4 ++-- circle.yml | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 923460f..fa5f6f7 100644 --- a/README.md +++ b/README.md @@ -56,5 +56,5 @@ version=v0 ## Build status: - - `master` - [![Build Status](https://magnum.travis-ci.com/lavab/api.svg?token=kJbppXeTxzqpCVvt4t5X&branch=master)](https://magnum.travis-ci.com/lavab/api) - - `develop` - [![Build Status](https://magnum.travis-ci.com/lavab/api.svg?token=kJbppXeTxzqpCVvt4t5X&branch=develop)](https://magnum.travis-ci.com/lavab/api) \ No newline at end of file + - `master` - [![Circle CI](https://circleci.com/gh/lavab/api/tree/master.svg?style=svg&circle-token=4a52d619a03d0249906195d6447ceb60a475c0c5)](https://circleci.com/gh/lavab/api/tree/master) + - `develop` - [![Circle CI](https://circleci.com/gh/lavab/api/tree/develop.svg?style=svg&circle-token=4a52d619a03d0249906195d6447ceb60a475c0c5)](https://circleci.com/gh/lavab/api/tree/develop) diff --git a/circle.yml b/circle.yml index 3250787..e0897b4 100644 --- a/circle.yml +++ b/circle.yml @@ -11,11 +11,14 @@ dependencies: - wget http://download.redis.io/releases/redis-2.8.18.tar.gz - tar xvzf redis-2.8.18.tar.gz - cd redis-2.8.18 && make + - go get github.com/apcera/gnatsd post: - rethinkdb --bind all: background: true - src/redis-server: background: true + - gnatsd: + background: true test: override: From 5ea87c34188c886eae248317a2a838ad68259c0a Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sat, 27 Dec 2014 12:56:41 +0100 Subject: [PATCH 07/12] Fixing configuration for automatic tests --- routes/init_test.go | 2 ++ setup/setup_test.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/routes/init_test.go b/routes/init_test.go index a57c5a7..fb0e4e7 100644 --- a/routes/init_test.go +++ b/routes/init_test.go @@ -30,6 +30,8 @@ func init() { RedisAddress: "127.0.0.1:6379", + NATSAddress: "nats://127.0.0.1:4222", + RethinkDBAddress: "127.0.0.1:28015", RethinkDBKey: "", RethinkDBDatabase: "test", diff --git a/setup/setup_test.go b/setup/setup_test.go index 406ecbe..c455a0c 100644 --- a/setup/setup_test.go +++ b/setup/setup_test.go @@ -20,6 +20,8 @@ func TestSetup(t *testing.T) { RedisAddress: "127.0.0.1:6379", + NATSAddress: "nats://127.0.0.1:4222", + RethinkDBAddress: "127.0.0.1:28015", RethinkDBKey: "", RethinkDBDatabase: "test", From ff64a723e71a2e3aa0a609d3c0abfe6e4c6f0a41 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sun, 28 Dec 2014 01:12:11 +0100 Subject: [PATCH 08/12] GET /emails --- db/table_emails.go | 78 ++++++++++++++++++++++++++++++---- routes/emails.go | 103 +++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 165 insertions(+), 16 deletions(-) diff --git a/db/table_emails.go b/db/table_emails.go index 0a14eb9..4e0134e 100644 --- a/db/table_emails.go +++ b/db/table_emails.go @@ -1,6 +1,8 @@ package db import ( + "github.com/dancannon/gorethink" + "github.com/lavab/api/models" ) @@ -10,21 +12,21 @@ type EmailsTable struct { } // GetEmail returns a token with specified name -func (c *EmailsTable) GetEmail(id string) (*models.Email, error) { +func (e *EmailsTable) GetEmail(id string) (*models.Email, error) { var result models.Email - if err := c.FindFetchOne(id, &result); err != nil { + if err := e.FindFetchOne(id, &result); err != nil { return nil, err } return &result, nil } -// GetOwnedBy returns all contacts owned by id -func (c *EmailsTable) GetOwnedBy(id string) ([]*models.Email, error) { +// GetOwnedBy returns all emails owned by id +func (e *EmailsTable) GetOwnedBy(id string) ([]*models.Email, error) { var result []*models.Email - err := c.WhereAndFetch(map[string]interface{}{ + err := e.WhereAndFetch(map[string]interface{}{ "owner": id, }, &result) if err != nil { @@ -34,9 +36,69 @@ func (c *EmailsTable) GetOwnedBy(id string) ([]*models.Email, error) { return result, nil } -// DeleteOwnedBy deletes all contacts owned by id -func (c *EmailsTable) DeleteOwnedBy(id string) error { - return c.Delete(map[string]interface{}{ +// DeleteOwnedBy deletes all emails owned by id +func (e *EmailsTable) DeleteOwnedBy(id string) error { + return e.Delete(map[string]interface{}{ "owner": id, }) } + +func (e *EmailsTable) CountOwnedBy(id string) (int, error) { + return e.FindByAndCount("owner", id) +} + +func (e *EmailsTable) List( + owner string, + sort []string, + offset int, + limit int, +) ([]*models.Email, error) { + // Filter by owner's ID + term := e.GetTable().Filter(map[string]interface{}{ + "owner": owner, + }) + + // If sort array has contents, parse them and add to the term + if sort != nil && len(sort) > 0 { + var conds []interface{} + for _, cond := range sort { + if cond[0] == '-' { + conds = append(conds, gorethink.Desc(cond[1:])) + } else if cond[0] == '+' || cond[0] == ' ' { + conds = append(conds, gorethink.Asc(cond[1:])) + } else { + conds = append(conds, gorethink.Asc(cond)) + } + } + + term = term.OrderBy(conds...) + } + + // Slice the result in 3 cases + if offset != 0 && limit == 0 { + term = term.Skip(offset) + } + + if offset == 0 && limit != 0 { + term = term.Limit(limit) + } + + if offset != 0 && limit != 0 { + term = term.Slice(offset, offset+limit) + } + + // Run the query + cursor, err := term.Run(e.GetSession()) + if err != nil { + return nil, err + } + + // Fetch the cursor + var resp []*models.Email + err = cursor.All(&resp) + if err != nil { + return nil, err + } + + return resp, nil +} diff --git a/routes/emails.go b/routes/emails.go index 3cce604..2d9a367 100644 --- a/routes/emails.go +++ b/routes/emails.go @@ -2,6 +2,8 @@ package routes import ( "net/http" + "strconv" + "strings" "github.com/Sirupsen/logrus" "github.com/ugorji/go/codec" @@ -18,19 +20,104 @@ var ( // EmailsListResponse contains the result of the EmailsList request. type EmailsListResponse struct { - Success bool `json:"success"` - Message string `json:"message,omitempty"` - ItemsCount int `json:"items_count,omitempty"` - Emails []*models.Email `json:"emails,omitempty"` + Success bool `json:"success"` + Message string `json:"message,omitempty"` + Emails *[]*models.Email `json:"emails,omitempty"` } // EmailsList sends a list of the emails in the inbox. -func EmailsList(w http.ResponseWriter, r *http.Request) { +func EmailsList(c web.C, w http.ResponseWriter, r *http.Request) { + // Fetch the current session from the database + session := c.Env["token"].(*models.Token) + + // Parse the query + var ( + query = r.URL.Query() + sortRaw = query.Get("sort") + offsetRaw = query.Get("offset") + limitRaw = query.Get("limit") + sort []string + offset int + limit int + ) + + if offsetRaw != "" { + o, err := strconv.Atoi(offsetRaw) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + "offset": offset, + }).Error("Invalid offset") + + utils.JSONResponse(w, 400, &EmailsListResponse{ + Success: false, + Message: "Invalid offset", + }) + return + } + offset = o + } + + if limitRaw != "" { + l, err := strconv.Atoi(limitRaw) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + "limit": limit, + }).Error("Invalid limit") + + utils.JSONResponse(w, 400, &EmailsListResponse{ + Success: false, + Message: "Invalid limit", + }) + return + } + limit = l + } + + if sortRaw != "" { + sort = strings.Split(sortRaw, ",") + } + + // Get contacts from the database + emails, err := env.Emails.List(session.Owner, sort, offset, limit) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Error("Unable to fetch emails") + + utils.JSONResponse(w, 500, &EmailsListResponse{ + Success: false, + Message: "Internal error (code EM/LI/01)", + }) + return + } + + if offsetRaw != "" || limitRaw != "" { + count, err := env.Emails.CountOwnedBy(session.Owner) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Error("Unable to count emails") + + utils.JSONResponse(w, 500, &EmailsListResponse{ + Success: false, + Message: "Internal error (code EM/LI/02)", + }) + return + } + w.Header().Set("X-Total-Count", strconv.Itoa(count)) + } + utils.JSONResponse(w, 200, &EmailsListResponse{ - Success: true, - ItemsCount: 1, - Emails: []*models.Email{}, + Success: true, + Emails: &emails, }) + + // GET parameters: + // sort - split by commas, prefixes: - is desc, + is asc + // offset, limit - for pagination + // Pagination ADDS X-Total-Count to the response! } type EmailsCreateRequest struct { From b93f0837b1aab31c833b0b714a07d93979975206 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sun, 28 Dec 2014 01:29:17 +0100 Subject: [PATCH 09/12] Removed PUT /emails/:id, implemented DELETE /emails/:id and GET /emails/:id --- routes/emails.go | 93 ++++++++++++++++++++++++++++++++++++------------ setup/setup.go | 1 - 2 files changed, 70 insertions(+), 24 deletions(-) diff --git a/routes/emails.go b/routes/emails.go index 2d9a367..4e7e521 100644 --- a/routes/emails.go +++ b/routes/emails.go @@ -227,30 +227,39 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { // EmailsGetResponse contains the result of the EmailsGet request. type EmailsGetResponse struct { - Success bool `json:"success"` - Message string `json:"message,omitempty"` - Status string `json:"status,omitempty"` + Success bool `json:"success"` + Message string `json:"message,omitempty"` + Email *models.Email `json:"email,omitempty"` } // EmailsGet responds with a single email message -func EmailsGet(w http.ResponseWriter, r *http.Request) { - utils.JSONResponse(w, 200, &EmailsGetResponse{ - Success: true, - Status: "sending", - }) -} +func EmailsGet(c web.C, w http.ResponseWriter, r *http.Request) { + // Get the email from the database + email, err := env.Emails.GetEmail(c.URLParams["id"]) + if err != nil { + utils.JSONResponse(w, 404, &EmailsGetResponse{ + Success: false, + Message: "Email not found", + }) + return + } -// EmailsUpdateResponse contains the result of the EmailsUpdate request. -type EmailsUpdateResponse struct { - Success bool `json:"success"` - Message string `json:"message"` -} + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) -// EmailsUpdate does *something* - TODO -func EmailsUpdate(w http.ResponseWriter, r *http.Request) { - utils.JSONResponse(w, 501, &EmailsUpdateResponse{ - Success: false, - Message: "Sorry, not implemented yet", + // Check for ownership + if email.Owner != session.Owner { + utils.JSONResponse(w, 404, &EmailsGetResponse{ + Success: false, + Message: "Email not found", + }) + return + } + + // Write the email to the response + utils.JSONResponse(w, 200, &EmailsGetResponse{ + Success: true, + Email: email, }) } @@ -261,9 +270,47 @@ type EmailsDeleteResponse struct { } // EmailsDelete remvoes an email from the system -func EmailsDelete(w http.ResponseWriter, r *http.Request) { - utils.JSONResponse(w, 501, &EmailsDeleteResponse{ - Success: false, - Message: "Sorry, not implemented yet", +func EmailsDelete(c web.C, w http.ResponseWriter, r *http.Request) { + // Get the email from the database + email, err := env.Emails.GetEmail(c.URLParams["id"]) + if err != nil { + utils.JSONResponse(w, 404, &EmailsDeleteResponse{ + Success: false, + Message: "Email not found", + }) + return + } + + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Check for ownership + if email.Owner != session.Owner { + utils.JSONResponse(w, 404, &EmailsDeleteResponse{ + Success: false, + Message: "Email not found", + }) + return + } + + // Perform the deletion + err = env.Emails.DeleteID(c.URLParams["id"]) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + "id": c.URLParams["id"], + }).Error("Unable to delete a email") + + utils.JSONResponse(w, 500, &EmailsDeleteResponse{ + Success: false, + Message: "Internal error (code EM/DE/01)", + }) + return + } + + // Write the email to the response + utils.JSONResponse(w, 200, &EmailsDeleteResponse{ + Success: true, + Message: "Email successfully removed", }) } diff --git a/setup/setup.go b/setup/setup.go index c169fa5..3d31cde 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -218,7 +218,6 @@ func PrepareMux(flags *env.Flags) *web.Mux { auth.Get("/emails", routes.EmailsList) auth.Post("/emails", routes.EmailsCreate) auth.Get("/emails/:id", routes.EmailsGet) - auth.Put("/emails/:id", routes.EmailsUpdate) auth.Delete("/emails/:id", routes.EmailsDelete) // Labels From c2fff71700f7e2deee450b6af77640099bd61384 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sun, 28 Dec 2014 15:09:06 +0100 Subject: [PATCH 10/12] setup.go, the 700-line wonder --- setup/setup.go | 367 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 336 insertions(+), 31 deletions(-) diff --git a/setup/setup.go b/setup/setup.go index 3d31cde..5549c6c 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -3,11 +3,11 @@ package setup import ( "bufio" "encoding/json" - "fmt" "io/ioutil" "net/http" "net/http/httptest" "strings" + "sync" "time" "github.com/Sirupsen/logrus" @@ -27,6 +27,12 @@ import ( "github.com/lavab/glogrus" ) +// sessions contains all "subscribing" WebSockets sessions +var ( + sessions = map[string][]sockjs.Session{} + sessionsLock sync.Mutex +) + // PrepareMux sets up the API func PrepareMux(flags *env.Flags) *web.Mux { // Set up a new logger @@ -147,12 +153,84 @@ func PrepareMux(flags *env.Flags) *web.Mux { }).Fatal("Unable to initialize a JSON NATS connection") } - c.Subscribe("delivery", func(s string) { - fmt.Printf("Received a message: %s\n", s) + c.Subscribe("delivery", func(msg *struct { + ID string `json:"id"` + Owner string `json:"owner"` + }) { + // Check if we are handling owner's session + if _, ok := sessions[msg.Owner]; !ok { + return + } + + if len(sessions[msg.Owner]) == 0 { + return + } + + // Resolve the email + email, err := env.Emails.GetEmail(msg.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": msg.ID, + }).Error("Unable to resolve an email from queue") + return + } + + // Send notifications to subscribers + for _, session := range sessions[msg.Owner] { + result, _ := json.Marshal(map[string]interface{}{ + "type": "delivery", + "id": msg.ID, + "name": email.Name, + }) + err = session.Send(string(result)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + } + } }) - c.Subscribe("receipt", func(s string) { - fmt.Printf("Received a message: %s\n", s) + c.Subscribe("receipt", func(msg *struct { + ID string `json:"id"` + Owner string `json:"owner"` + }) { + // Check if we are handling owner's session + if _, ok := sessions[msg.Owner]; !ok { + return + } + + if len(sessions[msg.Owner]) == 0 { + return + } + + // Resolve the email + email, err := env.Emails.GetEmail(msg.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": msg.ID, + }).Error("Unable to resolve an email from queue") + return + } + + // Send notifications to subscribers + for _, session := range sessions[msg.Owner] { + result, _ := json.Marshal(map[string]interface{}{ + "type": "receipt", + "id": msg.ID, + "name": email.Name, + }) + err = session.Send(string(result)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + } + } }) env.NATS = c @@ -283,6 +361,8 @@ func PrepareMux(flags *env.Flags) *web.Mux { }) mux.Handle("/ws/*", sockjs.NewHandler("/ws", sockjs.DefaultOptions, func(session sockjs.Session) { + var subscribed string + // A new goroutine seems to be spawned for each new session for { // Read a message from the input @@ -297,6 +377,8 @@ func PrepareMux(flags *env.Flags) *web.Mux { // Decode the message var input struct { + Type string `json:"type"` + Token string `json:"token"` ID string `json:"id"` Method string `json:"method"` Path string `json:"path"` @@ -307,6 +389,7 @@ func PrepareMux(flags *env.Flags) *web.Mux { if err != nil { // Return an error response resp, _ := json.Marshal(map[string]interface{}{ + "type": "error", "error": err, }) err := session.Send(string(resp)) @@ -320,13 +403,172 @@ func PrepareMux(flags *env.Flags) *web.Mux { continue } - // Perform the request - w := httptest.NewRecorder() - r, err := http.NewRequest(input.Method, "http://api.lavaboom.io"+input.Path, strings.NewReader(input.Body)) - if err != nil { - // Return an error response + // Check message's type + if input.Type == "subscribe" { + // Listen to user's events + + // Check if token is empty + if input.Token == "" { + // Return an error response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "error", + "error": "Invalid token", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + continue + } + + // Check the token in database + token, err := env.Tokens.GetToken(input.Token) + if err != nil { + // Return an error response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "error", + "error": "Invalid token", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + continue + } + + // Do the actual subscription + subscribed = token.Owner + sessionsLock.Lock() + + // Sessions map already contains this owner + if _, ok := sessions[token.Owner]; ok { + sessions[token.Owner] = append(sessions[token.Owner], session) + } else { + // We have to allocate a new slice + sessions[token.Owner] = []sockjs.Session{session} + } + + // Unlock the map write + sessionsLock.Unlock() + + // Return a response resp, _ := json.Marshal(map[string]interface{}{ - "error": err.Error(), + "type": "subscribed", + }) + err = session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + } else if input.Type == "unsubscribe" { + if subscribed == "" { + resp, _ := json.Marshal(map[string]interface{}{ + "type": "error", + "error": "Not subscribed", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + } + + sessionsLock.Lock() + + if _, ok := sessions[subscribed]; !ok { + // Return a response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "unsubscribed", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + sessionsLock.Unlock() + subscribed = "" + break + } + sessionsLock.Unlock() + subscribed = "" + continue + } + + if len(sessions[subscribed]) == 1 { + delete(sessions, subscribed) + + // Return a response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "unsubscribed", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + sessionsLock.Unlock() + subscribed = "" + break + } + sessionsLock.Unlock() + subscribed = "" + continue + } + + // Find the session + index := -1 + for i, session2 := range sessions[subscribed] { + if session == session2 { + index = i + break + } + } + + // We didn't find anything + if index == -1 { + // Return a response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "unsubscribed", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + sessionsLock.Unlock() + subscribed = "" + break + } + sessionsLock.Unlock() + subscribed = "" + continue + } + + // We found it, so we are supposed to slice it + sessions[subscribed][index] = sessions[subscribed][len(sessions[subscribed])-1] + sessions[subscribed][len(sessions[subscribed])-1] = nil + sessions[subscribed] = sessions[subscribed][:len(sessions[subscribed])-1] + + // Return a response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "unsubscribed", }) err := session.Send(string(resp)) if err != nil { @@ -334,36 +576,99 @@ func PrepareMux(flags *env.Flags) *web.Mux { "id": session.ID(), "error": err.Error(), }).Warn("Error while writing to a WebSocket") + sessionsLock.Unlock() + subscribed = "" break } - continue - } + sessionsLock.Unlock() + subscribed = "" + } else if input.Type == "request" { + // Perform the request + w := httptest.NewRecorder() + r, err := http.NewRequest(input.Method, "http://api.lavaboom.io"+input.Path, strings.NewReader(input.Body)) + if err != nil { + // Return an error response + resp, _ := json.Marshal(map[string]interface{}{ + "error": err.Error(), + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + continue + } - r.RequestURI = input.Path + r.RequestURI = input.Path - for key, value := range input.Headers { - r.Header.Set(key, value) + for key, value := range input.Headers { + r.Header.Set(key, value) + } + + mux.ServeHTTP(w, r) + + // Return the final response + result, _ := json.Marshal(map[string]interface{}{ + "type": "response", + "id": input.ID, + "status": w.Code, + "header": w.HeaderMap, + "body": w.Body.String(), + }) + err = session.Send(string(result)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } } + } - mux.ServeHTTP(w, r) + // We have to clear the subscription here too. TODO: make the code shorter + if subscribed == "" { + return + } - // Return the final response - result, _ := json.Marshal(map[string]interface{}{ - "type": "response", - "id": input.ID, - "status": w.Code, - "header": w.HeaderMap, - "body": w.Body.String(), - }) - err = session.Send(string(result)) - if err != nil { - env.Log.WithFields(logrus.Fields{ - "id": session.ID(), - "error": err.Error(), - }).Warn("Error while writing to a WebSocket") + sessionsLock.Lock() + + if _, ok := sessions[subscribed]; !ok { + sessionsLock.Unlock() + return + } + + if len(sessions[subscribed]) == 1 { + delete(sessions, subscribed) + sessionsLock.Unlock() + return + } + + // Find the session + index := -1 + for i, session2 := range sessions[subscribed] { + if session == session2 { + index = i break } } + + // We didn't find anything + if index == -1 { + sessionsLock.Unlock() + return + } + + // We found it, so we are supposed to slice it + sessions[subscribed][index] = sessions[subscribed][len(sessions[subscribed])-1] + sessions[subscribed][len(sessions[subscribed])-1] = nil + sessions[subscribed] = sessions[subscribed][:len(sessions[subscribed])-1] + + // Unlock the mutex + sessionsLock.Unlock() })) // Merge the muxes From 9406d944400bd8334dbc85eee63229a2ada2fc48 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sun, 28 Dec 2014 16:19:38 +0100 Subject: [PATCH 11/12] Weak password test --- routes/accounts_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/routes/accounts_test.go b/routes/accounts_test.go index 6b2f61c..240d34a 100644 --- a/routes/accounts_test.go +++ b/routes/accounts_test.go @@ -149,6 +149,45 @@ func TestAccountsCreateInvitedExisting(t *testing.T) { require.Equal(t, "Username already exists", response.Message) } +func TestAccountsCreateInvitedWeakPassword(t *testing.T) { + const ( + username = "jeremy" + password = "c0067d4af4e87f00dbac63b6156828237059172d1bbeac67427345d6a9fda484" + ) + + // Prepare a token + inviteToken := models.Token{ + Resource: models.MakeResource("", "test invite token"), + Type: "invite", + } + inviteToken.ExpireSoon() + + err := env.Tokens.Insert(inviteToken) + require.Nil(t, err) + + // POST /accounts - invited + result, err := goreq.Request{ + Method: "POST", + Uri: server.URL + "/accounts", + ContentType: "application/json", + Body: routes.AccountsCreateRequest{ + Username: username, + Password: password, + Token: inviteToken.ID, + }, + }.Do() + require.Nil(t, err) + + // Unmarshal the response + var response routes.AccountsCreateResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + // Check the result's contents + require.False(t, response.Success) + require.Equal(t, "Weak password", response.Message) +} + func TestAccountsCreateInvitedExpired(t *testing.T) { const ( username = "jeremy2" From 0953b23d7114c123417d15f1e4505bb29d8410e4 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sun, 28 Dec 2014 17:09:52 +0100 Subject: [PATCH 12/12] Added email tests --- models/email.go | 6 ++ routes/emails.go | 8 +- routes/emails_test.go | 245 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 256 insertions(+), 3 deletions(-) create mode 100644 routes/emails_test.go diff --git a/models/email.go b/models/email.go index 10aaa5e..03bd9e3 100644 --- a/models/email.go +++ b/models/email.go @@ -5,6 +5,12 @@ package models type Email struct { Resource + // Kind of the email. Value is either sent or received. + Kind string `json:"kind" gorethink:"kind"` + + // Who is supposed to receive the email / what email received it. + To []string `json:"to" gorethink:"to"` + // AttachmentsIDs is a slice of the FileIDs associated with this email // For uploading attachments see `POST /upload` AttachmentIDs []string `json:"attachments" gorethink:"attachments"` diff --git a/routes/emails.go b/routes/emails.go index 4e7e521..612324b 100644 --- a/routes/emails.go +++ b/routes/emails.go @@ -125,7 +125,7 @@ type EmailsCreateRequest struct { BCC []string `json:"bcc"` ReplyTo string `json:"reply_to"` ThreadID string `json:"thread_id"` - Title string `json:"title"` + Subject string `json:"title"` Body string `json:"body"` Preview string `json:"preview"` Attachments []string `json:"attachments"` @@ -160,7 +160,7 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { session := c.Env["token"].(*models.Token) // Ensure that the input data isn't empty - if len(input.To) == 0 || input.Title == "" || input.Body == "" { + if len(input.To) == 0 || input.Subject == "" || input.Body == "" { utils.JSONResponse(w, 400, &EmailsCreateResponse{ Success: false, Message: "Invalid request", @@ -170,7 +170,9 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { // Create a new email struct email := &models.Email{ - Resource: models.MakeResource(session.Owner, input.Title), + Kind: "sent", + To: input.To, + Resource: models.MakeResource(session.Owner, input.Subject), AttachmentIDs: input.Attachments, Body: models.Encrypted{ Encoding: "json", diff --git a/routes/emails_test.go b/routes/emails_test.go new file mode 100644 index 0000000..7d4310a --- /dev/null +++ b/routes/emails_test.go @@ -0,0 +1,245 @@ +package routes_test + +import ( + "testing" + + "github.com/franela/goreq" + "github.com/stretchr/testify/require" + + "github.com/lavab/api/env" + "github.com/lavab/api/models" + "github.com/lavab/api/routes" +) + +var ( + emailID string + notOwnedEmailID string +) + +func TestEmailsCreate(t *testing.T) { + request := goreq.Request{ + Method: "POST", + Uri: server.URL + "/emails", + ContentType: "application/json", + Body: routes.EmailsCreateRequest{ + To: []string{"piotr@zduniak.net"}, + Subject: "hello world", + Body: "raw meaty email", + }, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsCreateResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.Equal(t, len(response.Created), 1) + require.True(t, response.Success) + + emailID = response.Created[0] +} + +func TestEmailsCreateInvalidBody(t *testing.T) { + request := goreq.Request{ + Method: "POST", + Uri: server.URL + "/emails", + ContentType: "application/json", + Body: "!@#!@#!@#", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsCreateResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.Equal(t, response.Message, "Invalid input format") + require.False(t, response.Success) +} + +func TestEmailsCreateMissingFields(t *testing.T) { + request := goreq.Request{ + Method: "POST", + Uri: server.URL + "/emails", + ContentType: "application/json", + Body: routes.EmailsCreateRequest{ + To: []string{"piotr@zduniak.net"}, + Subject: "hello world", + }, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsCreateResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.Equal(t, response.Message, "Invalid request") + require.False(t, response.Success) +} + +func TestEmailsGet(t *testing.T) { + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails/" + emailID, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsGetResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.True(t, response.Success) + require.Equal(t, "hello world", response.Email.Name) +} + +func TestEmailsGetInvalidID(t *testing.T) { + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails/nonexisting", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsGetResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.Equal(t, "Email not found", response.Message) + require.False(t, response.Success) +} + +func TestEmailsGetNotOwned(t *testing.T) { + email := &models.Email{ + Resource: models.MakeResource("not", "Carpeus Caesar"), + } + + err := env.Emails.Insert(email) + require.Nil(t, err) + + notOwnedEmailID = email.ID + + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails/" + email.ID, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsGetResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.Equal(t, "Email not found", response.Message) + require.False(t, response.Success) +} + +func TestEmailsList(t *testing.T) { + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails?offset=0&limit=1&sort=+date", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsListResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.True(t, response.Success) + require.Equal(t, "hello world", (*response.Emails)[0].Name) +} + +func TestEmailsListInvalidOffset(t *testing.T) { + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails?offset=pppp&limit=1&sort=+date", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsListResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.False(t, response.Success) + require.Equal(t, "Invalid offset", response.Message) +} + +func TestEmailsListInvalidLimit(t *testing.T) { + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails?offset=0&limit=pppp&sort=+date", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsListResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.False(t, response.Success) + require.Equal(t, "Invalid limit", response.Message) +} + +func TestEmailsDelete(t *testing.T) { + request := goreq.Request{ + Method: "DELETE", + Uri: server.URL + "/emails/" + emailID, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsDeleteResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.True(t, response.Success) + require.Equal(t, "Email successfully removed", response.Message) +} + +func TestEmailsDeleteNotExisting(t *testing.T) { + request := goreq.Request{ + Method: "DELETE", + Uri: server.URL + "/emails/nonexisting", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsDeleteResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.False(t, response.Success) + require.Equal(t, "Email not found", response.Message) +} + +func TestEmailsDeleteNotOwned(t *testing.T) { + request := goreq.Request{ + Method: "DELETE", + Uri: server.URL + "/emails/" + notOwnedEmailID, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsDeleteResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.False(t, response.Success) + require.Equal(t, "Email not found", response.Message) +}