diff --git a/README.md b/README.md index 923460f..fa5f6f7 100644 --- a/README.md +++ b/README.md @@ -56,5 +56,5 @@ version=v0 ## Build status: - - `master` - [![Build Status](https://magnum.travis-ci.com/lavab/api.svg?token=kJbppXeTxzqpCVvt4t5X&branch=master)](https://magnum.travis-ci.com/lavab/api) - - `develop` - [![Build Status](https://magnum.travis-ci.com/lavab/api.svg?token=kJbppXeTxzqpCVvt4t5X&branch=develop)](https://magnum.travis-ci.com/lavab/api) \ No newline at end of file + - `master` - [![Circle CI](https://circleci.com/gh/lavab/api/tree/master.svg?style=svg&circle-token=4a52d619a03d0249906195d6447ceb60a475c0c5)](https://circleci.com/gh/lavab/api/tree/master) + - `develop` - [![Circle CI](https://circleci.com/gh/lavab/api/tree/develop.svg?style=svg&circle-token=4a52d619a03d0249906195d6447ceb60a475c0c5)](https://circleci.com/gh/lavab/api/tree/develop) diff --git a/_research/ws_client/index.html b/_research/ws_client/index.html new file mode 100644 index 0000000..f0c88d4 --- /dev/null +++ b/_research/ws_client/index.html @@ -0,0 +1,40 @@ + + + + + lavab client + + + + +
+
+ + + + + + + diff --git a/_vagrant/Vagrantfile b/_vagrant/Vagrantfile index 53ea370..15c639f 100644 --- a/_vagrant/Vagrantfile +++ b/_vagrant/Vagrantfile @@ -5,21 +5,54 @@ VAGRANTFILE_API_VERSION = "2" Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| - config.vm.box = "ubuntu/trusty64" - # rethinkdb - config.vm.network "forwarded_port", guest: 8080, host: 8080 - config.vm.network "forwarded_port", guest: 28015, host: 28015 - config.vm.network "forwarded_port", guest: 29015, host: 29015 + #config.vm.define "redisthink" do |rethinkdb| + #rethinkdb.vm.box = "ubuntu/trusty64" - # redis - config.vm.network "forwarded_port", guest: 6379, host: 6379 + # rethinkdb + #rethinkdb.vm.network "forwarded_port", guest: 8080, host: 8080 + #rethinkdb.vm.network "forwarded_port", guest: 28015, host: 28015 + #rethinkdb.vm.network "forwarded_port", guest: 29015, host: 29015 - config.vm.provider "virtualbox" do |v| - v.memory = 2048 - v.cpus = 4 - end + # redis + #rethinkdb.vm.network "forwarded_port", guest: 6379, host: 6379 + + #rethinkdb.vm.provider "virtualbox" do |v| + #v.memory = 2048 + #v.cpus = 4 + #end + + # load ansible playbook + #rethinkdb.vm.provision "shell", path: "deploy.sh" + #end + + config.vm.define "docker" do |docker| + docker.vm.box = "ubuntu/trusty64" + + docker.vm.network "forwarded_port", guest: 4222, host: 4222 + docker.vm.network "forwarded_port", guest: 8333, host: 8333 + docker.vm.network "forwarded_port", guest: 6379, host: 6379 + docker.vm.network "forwarded_port", guest: 8080, host: 8080 + docker.vm.network "forwarded_port", guest: 28015, host: 28015 + docker.vm.network "forwarded_port", guest: 29015, host: 29015 - # load ansible playbook - config.vm.provision "shell", path: "deploy.sh" + docker.vm.provider "virtualbox" do |v| + v.customize ["modifyvm", :id, "--natdnshostresolver1", "on"] + v.customize ["modifyvm", :id, "--natdnsproxy1", "on"] + end + + docker.vm.provision "docker" do |d| + d.pull_images "apcera/gnatsd" + d.run "apcera/gnatsd", + args: "--name gnatsd -p 4222:4222 -p 8333:8333" + + d.pull_images "dockerfile/rethinkdb" + d.run "dockerfile/rethinkdb", + args: "--name rethinkdb -p 8080:8080 -p 28015:28015 -p 29015:29015" + + d.pull_images "dockerfile/redis" + d.run "dockerfile/redis", + args: "--name redis -p 6379:6379" + end + end end diff --git a/circle.yml b/circle.yml index 3250787..e0897b4 100644 --- a/circle.yml +++ b/circle.yml @@ -11,11 +11,14 @@ dependencies: - wget http://download.redis.io/releases/redis-2.8.18.tar.gz - tar xvzf redis-2.8.18.tar.gz - cd redis-2.8.18 && make + - go get github.com/apcera/gnatsd post: - rethinkdb --bind all: background: true - src/redis-server: background: true + - gnatsd: + background: true test: override: diff --git a/db/table_emails.go b/db/table_emails.go new file mode 100644 index 0000000..4e0134e --- /dev/null +++ b/db/table_emails.go @@ -0,0 +1,104 @@ +package db + +import ( + "github.com/dancannon/gorethink" + + "github.com/lavab/api/models" +) + +// Emails implements the CRUD interface for tokens +type EmailsTable struct { + RethinkCRUD +} + +// GetEmail returns a token with specified name +func (e *EmailsTable) GetEmail(id string) (*models.Email, error) { + var result models.Email + + if err := e.FindFetchOne(id, &result); err != nil { + return nil, err + } + + return &result, nil +} + +// GetOwnedBy returns all emails owned by id +func (e *EmailsTable) GetOwnedBy(id string) ([]*models.Email, error) { + var result []*models.Email + + err := e.WhereAndFetch(map[string]interface{}{ + "owner": id, + }, &result) + if err != nil { + return nil, err + } + + return result, nil +} + +// DeleteOwnedBy deletes all emails owned by id +func (e *EmailsTable) DeleteOwnedBy(id string) error { + return e.Delete(map[string]interface{}{ + "owner": id, + }) +} + +func (e *EmailsTable) CountOwnedBy(id string) (int, error) { + return e.FindByAndCount("owner", id) +} + +func (e *EmailsTable) List( + owner string, + sort []string, + offset int, + limit int, +) ([]*models.Email, error) { + // Filter by owner's ID + term := e.GetTable().Filter(map[string]interface{}{ + "owner": owner, + }) + + // If sort array has contents, parse them and add to the term + if sort != nil && len(sort) > 0 { + var conds []interface{} + for _, cond := range sort { + if cond[0] == '-' { + conds = append(conds, gorethink.Desc(cond[1:])) + } else if cond[0] == '+' || cond[0] == ' ' { + conds = append(conds, gorethink.Asc(cond[1:])) + } else { + conds = append(conds, gorethink.Asc(cond)) + } + } + + term = term.OrderBy(conds...) + } + + // Slice the result in 3 cases + if offset != 0 && limit == 0 { + term = term.Skip(offset) + } + + if offset == 0 && limit != 0 { + term = term.Limit(limit) + } + + if offset != 0 && limit != 0 { + term = term.Slice(offset, offset+limit) + } + + // Run the query + cursor, err := term.Run(e.GetSession()) + if err != nil { + return nil, err + } + + // Fetch the cursor + var resp []*models.Email + err = cursor.All(&resp) + if err != nil { + return nil, err + } + + return resp, nil +} diff --git a/env/config.go b/env/config.go index c07ee6a..6b655df 100644 --- a/env/config.go +++ b/env/config.go @@ -19,6 +19,8 @@ type Flags struct { RethinkDBKey string RethinkDBDatabase string + NATSAddress string + YubiCloudID string YubiCloudKey string } diff --git a/env/env.go b/env/env.go index e7f5ec5..ec7444a 100644 --- a/env/env.go +++ b/env/env.go @@ -2,6 +2,7 @@ package env import ( "github.com/Sirupsen/logrus" + "github.com/apcera/nats" "github.com/dancannon/gorethink" "github.com/lavab/api/cache" @@ -28,6 +29,10 @@ var ( Contacts *db.ContactsTable // Reservations is the global instance of ReservationsTable Reservations *db.ReservationsTable + // Emails is the global instance of EmailsTable + Emails *db.EmailsTable // Factors contains all currently registered factors Factors map[string]factor.Factor + // NATS is the encoded connection to the NATS queue + NATS *nats.EncodedConn ) diff --git a/main.go b/main.go index 7eb6bbd..787ee23 100644 --- a/main.go +++ b/main.go @@ -55,6 +55,14 @@ var ( } return database }(), "Database name on the RethinkDB server") + // NATS address + natsAddress = flag.String("nats_address", func() string { + address := os.Getenv("NATS_PORT_4222_TCP_ADDR") + if address == "" { + address = "127.0.0.1" + } + return "nats://" + address + ":4222" + }(), "Address of the NATS server") // YubiCloud params yubiCloudID = flag.String("yubicloud_id", "", "YubiCloud API id") yubiCloudKey = flag.String("yubicloud_key", "", "YubiCloud API key") @@ -83,6 +91,8 @@ func main() { RethinkDBKey: *rethinkdbKey, RethinkDBDatabase: *rethinkdbDatabase, + NATSAddress: *natsAddress, + YubiCloudID: *yubiCloudID, YubiCloudKey: *yubiCloudKey, } diff --git a/models/base_encrypted.go b/models/base_encrypted.go index 44eec0b..0597cd1 100644 --- a/models/base_encrypted.go +++ b/models/base_encrypted.go @@ -5,8 +5,8 @@ type Encrypted struct { // Encoding tells the reader how to decode the data; can be "json", "protobuf", maybe more in the future Encoding string `json:"encoding" gorethink:"encoding"` - // PgpFingerprints contains the fingerprints of the PGP public keys used to encrypt the data. - PgpFingerprints []string `json:"pgp_fingerprints" gorethink:"pgp_fingerprints"` + // PGPFingerprints contains the fingerprints of the PGP public keys used to encrypt the data. + PGPFingerprints []string `json:"pgp_fingerprints" gorethink:"pgp_fingerprints"` // Data is the raw, PGP-encrypted data Data string `json:"raw" gorethink:"raw"` diff --git a/models/email.go b/models/email.go index 610570c..03bd9e3 100644 --- a/models/email.go +++ b/models/email.go @@ -5,6 +5,12 @@ package models type Email struct { Resource + // Kind of the email. Value is either sent or received. + Kind string `json:"kind" gorethink:"kind"` + + // Who is supposed to receive the email / what email received it. + To []string `json:"to" gorethink:"to"` + // AttachmentsIDs is a slice of the FileIDs associated with this email // For uploading attachments see `POST /upload` AttachmentIDs []string `json:"attachments" gorethink:"attachments"` @@ -23,4 +29,6 @@ type Email struct { // ThreadID ThreadID string `json:"thread_id" gorethink:"thread_id"` + + Status string `json:"status" gorethink:"status"` } diff --git a/routes/accounts_test.go b/routes/accounts_test.go index 6b2f61c..240d34a 100644 --- a/routes/accounts_test.go +++ b/routes/accounts_test.go @@ -149,6 +149,45 @@ func TestAccountsCreateInvitedExisting(t *testing.T) { require.Equal(t, "Username already exists", response.Message) } +func TestAccountsCreateInvitedWeakPassword(t *testing.T) { + const ( + username = "jeremy" + password = "c0067d4af4e87f00dbac63b6156828237059172d1bbeac67427345d6a9fda484" + ) + + // Prepare a token + inviteToken := models.Token{ + Resource: models.MakeResource("", "test invite token"), + Type: "invite", + } + inviteToken.ExpireSoon() + + err := env.Tokens.Insert(inviteToken) + require.Nil(t, err) + + // POST /accounts - invited + result, err := goreq.Request{ + Method: "POST", + Uri: server.URL + "/accounts", + ContentType: "application/json", + Body: routes.AccountsCreateRequest{ + Username: username, + Password: password, + Token: inviteToken.ID, + }, + }.Do() + require.Nil(t, err) + + // Unmarshal the response + var response routes.AccountsCreateResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + // Check the result's contents + require.False(t, response.Success) + require.Equal(t, "Weak password", response.Message) +} + func TestAccountsCreateInvitedExpired(t *testing.T) { const ( username = "jeremy2" diff --git a/routes/emails.go b/routes/emails.go index deb3dd2..612324b 100644 --- a/routes/emails.go +++ b/routes/emails.go @@ -2,26 +2,134 @@ package routes import ( "net/http" + "strconv" + "strings" + "github.com/Sirupsen/logrus" + "github.com/ugorji/go/codec" + "github.com/zenazn/goji/web" + + "github.com/lavab/api/env" "github.com/lavab/api/models" "github.com/lavab/api/utils" ) +var ( + msgpackCodec codec.MsgpackHandle +) + // EmailsListResponse contains the result of the EmailsList request. type EmailsListResponse struct { - Success bool `json:"success"` - Message string `json:"message,omitempty"` - ItemsCount int `json:"items_count,omitempty"` - Emails []*models.Email `json:"emails,omitempty"` + Success bool `json:"success"` + Message string `json:"message,omitempty"` + Emails *[]*models.Email `json:"emails,omitempty"` } // EmailsList sends a list of the emails in the inbox. -func EmailsList(w http.ResponseWriter, r *http.Request) { +func EmailsList(c web.C, w http.ResponseWriter, r *http.Request) { + // Fetch the current session from the database + session := c.Env["token"].(*models.Token) + + // Parse the query + var ( + query = r.URL.Query() + sortRaw = query.Get("sort") + offsetRaw = query.Get("offset") + limitRaw = query.Get("limit") + sort []string + offset int + limit int + ) + + if offsetRaw != "" { + o, err := strconv.Atoi(offsetRaw) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + "offset": offset, + }).Error("Invalid offset") + + utils.JSONResponse(w, 400, &EmailsListResponse{ + Success: false, + Message: "Invalid offset", + }) + return + } + offset = o + } + + if limitRaw != "" { + l, err := strconv.Atoi(limitRaw) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + "limit": limit, + }).Error("Invalid limit") + + utils.JSONResponse(w, 400, &EmailsListResponse{ + Success: false, + Message: "Invalid limit", + }) + return + } + limit = l + } + + if sortRaw != "" { + sort = strings.Split(sortRaw, ",") + } + + // Get contacts from the database + emails, err := env.Emails.List(session.Owner, sort, offset, limit) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Error("Unable to fetch emails") + + utils.JSONResponse(w, 500, &EmailsListResponse{ + Success: false, + Message: "Internal error (code EM/LI/01)", + }) + return + } + + if offsetRaw != "" || limitRaw != "" { + count, err := env.Emails.CountOwnedBy(session.Owner) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Error("Unable to count emails") + + utils.JSONResponse(w, 500, &EmailsListResponse{ + Success: false, + Message: "Internal error (code EM/LI/02)", + }) + return + } + w.Header().Set("X-Total-Count", strconv.Itoa(count)) + } + utils.JSONResponse(w, 200, &EmailsListResponse{ - Success: true, - ItemsCount: 1, - Emails: []*models.Email{}, + Success: true, + Emails: &emails, }) + + // GET parameters: + // sort - split by commas, prefixes: - is desc, + is asc + // offset, limit - for pagination + // Pagination ADDS X-Total-Count to the response! +} + +type EmailsCreateRequest struct { + To []string `json:"to"` + BCC []string `json:"bcc"` + ReplyTo string `json:"reply_to"` + ThreadID string `json:"thread_id"` + Subject string `json:"title"` + Body string `json:"body"` + Preview string `json:"preview"` + Attachments []string `json:"attachments"` + PGPFingerprints []string `json:"pgp_fingerprints"` } // EmailsCreateResponse contains the result of the EmailsCreate request. @@ -32,39 +140,128 @@ type EmailsCreateResponse struct { } // EmailsCreate sends a new email -func EmailsCreate(w http.ResponseWriter, r *http.Request) { - utils.JSONResponse(w, 200, &EmailsCreateResponse{ +func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { + // Decode the request + var input EmailsCreateRequest + err := utils.ParseRequest(r, &input) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Warn("Unable to decode a request") + + utils.JSONResponse(w, 400, &EmailsCreateResponse{ + Success: false, + Message: "Invalid input format", + }) + return + } + + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Ensure that the input data isn't empty + if len(input.To) == 0 || input.Subject == "" || input.Body == "" { + utils.JSONResponse(w, 400, &EmailsCreateResponse{ + Success: false, + Message: "Invalid request", + }) + return + } + + // Create a new email struct + email := &models.Email{ + Kind: "sent", + To: input.To, + Resource: models.MakeResource(session.Owner, input.Subject), + AttachmentIDs: input.Attachments, + Body: models.Encrypted{ + Encoding: "json", + PGPFingerprints: input.PGPFingerprints, + Data: input.Body, + Schema: "email_body", + VersionMajor: 1, + VersionMinor: 0, + }, + Preview: models.Encrypted{ + Encoding: "json", + PGPFingerprints: input.PGPFingerprints, + Data: input.Preview, + Schema: "email_preview", + VersionMajor: 1, + VersionMinor: 0, + }, + ThreadID: input.ThreadID, + Status: "queued", + } + + // Insert the email into the database + if err := env.Emails.Insert(email); err != nil { + utils.JSONResponse(w, 500, &EmailsCreateResponse{ + Success: false, + Message: "internal server error - EM/CR/01", + }) + + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Error("Could not insert an email into the database") + return + } + + // Add a send request to the queue + err = env.NATS.Publish("send", email.ID) + if err != nil { + utils.JSONResponse(w, 500, &EmailsCreateResponse{ + Success: false, + Message: "internal server error - EM/CR/03", + }) + + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Error("Could not publish an email send request") + return + } + + utils.JSONResponse(w, 201, &EmailsCreateResponse{ Success: true, - Created: []string{"123"}, + Created: []string{email.ID}, }) } // EmailsGetResponse contains the result of the EmailsGet request. type EmailsGetResponse struct { - Success bool `json:"success"` - Message string `json:"message,omitempty"` - Status string `json:"status,omitempty"` + Success bool `json:"success"` + Message string `json:"message,omitempty"` + Email *models.Email `json:"email,omitempty"` } // EmailsGet responds with a single email message -func EmailsGet(w http.ResponseWriter, r *http.Request) { - utils.JSONResponse(w, 200, &EmailsGetResponse{ - Success: true, - Status: "sending", - }) -} +func EmailsGet(c web.C, w http.ResponseWriter, r *http.Request) { + // Get the email from the database + email, err := env.Emails.GetEmail(c.URLParams["id"]) + if err != nil { + utils.JSONResponse(w, 404, &EmailsGetResponse{ + Success: false, + Message: "Email not found", + }) + return + } -// EmailsUpdateResponse contains the result of the EmailsUpdate request. -type EmailsUpdateResponse struct { - Success bool `json:"success"` - Message string `json:"message"` -} + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Check for ownership + if email.Owner != session.Owner { + utils.JSONResponse(w, 404, &EmailsGetResponse{ + Success: false, + Message: "Email not found", + }) + return + } -// EmailsUpdate does *something* - TODO -func EmailsUpdate(w http.ResponseWriter, r *http.Request) { - utils.JSONResponse(w, 501, &EmailsUpdateResponse{ - Success: false, - Message: "Sorry, not implemented yet", + // Write the email to the response + utils.JSONResponse(w, 200, &EmailsGetResponse{ + Success: true, + Email: email, }) } @@ -75,9 +272,47 @@ type EmailsDeleteResponse struct { } // EmailsDelete remvoes an email from the system -func EmailsDelete(w http.ResponseWriter, r *http.Request) { - utils.JSONResponse(w, 501, &EmailsDeleteResponse{ - Success: false, - Message: "Sorry, not implemented yet", +func EmailsDelete(c web.C, w http.ResponseWriter, r *http.Request) { + // Get the email from the database + email, err := env.Emails.GetEmail(c.URLParams["id"]) + if err != nil { + utils.JSONResponse(w, 404, &EmailsDeleteResponse{ + Success: false, + Message: "Email not found", + }) + return + } + + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Check for ownership + if email.Owner != session.Owner { + utils.JSONResponse(w, 404, &EmailsDeleteResponse{ + Success: false, + Message: "Email not found", + }) + return + } + + // Perform the deletion + err = env.Emails.DeleteID(c.URLParams["id"]) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + "id": c.URLParams["id"], + }).Error("Unable to delete a email") + + utils.JSONResponse(w, 500, &EmailsDeleteResponse{ + Success: false, + Message: "Internal error (code EM/DE/01)", + }) + return + } + + // Write the email to the response + utils.JSONResponse(w, 200, &EmailsDeleteResponse{ + Success: true, + Message: "Email successfully removed", }) } diff --git a/routes/emails_test.go b/routes/emails_test.go new file mode 100644 index 0000000..7d4310a --- /dev/null +++ b/routes/emails_test.go @@ -0,0 +1,245 @@ +package routes_test + +import ( + "testing" + + "github.com/franela/goreq" + "github.com/stretchr/testify/require" + + "github.com/lavab/api/env" + "github.com/lavab/api/models" + "github.com/lavab/api/routes" +) + +var ( + emailID string + notOwnedEmailID string +) + +func TestEmailsCreate(t *testing.T) { + request := goreq.Request{ + Method: "POST", + Uri: server.URL + "/emails", + ContentType: "application/json", + Body: routes.EmailsCreateRequest{ + To: []string{"piotr@zduniak.net"}, + Subject: "hello world", + Body: "raw meaty email", + }, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsCreateResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.Equal(t, len(response.Created), 1) + require.True(t, response.Success) + + emailID = response.Created[0] +} + +func TestEmailsCreateInvalidBody(t *testing.T) { + request := goreq.Request{ + Method: "POST", + Uri: server.URL + "/emails", + ContentType: "application/json", + Body: "!@#!@#!@#", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsCreateResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.Equal(t, response.Message, "Invalid input format") + require.False(t, response.Success) +} + +func TestEmailsCreateMissingFields(t *testing.T) { + request := goreq.Request{ + Method: "POST", + Uri: server.URL + "/emails", + ContentType: "application/json", + Body: routes.EmailsCreateRequest{ + To: []string{"piotr@zduniak.net"}, + Subject: "hello world", + }, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsCreateResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.Equal(t, response.Message, "Invalid request") + require.False(t, response.Success) +} + +func TestEmailsGet(t *testing.T) { + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails/" + emailID, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsGetResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.True(t, response.Success) + require.Equal(t, "hello world", response.Email.Name) +} + +func TestEmailsGetInvalidID(t *testing.T) { + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails/nonexisting", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsGetResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.Equal(t, "Email not found", response.Message) + require.False(t, response.Success) +} + +func TestEmailsGetNotOwned(t *testing.T) { + email := &models.Email{ + Resource: models.MakeResource("not", "Carpeus Caesar"), + } + + err := env.Emails.Insert(email) + require.Nil(t, err) + + notOwnedEmailID = email.ID + + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails/" + email.ID, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsGetResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.Equal(t, "Email not found", response.Message) + require.False(t, response.Success) +} + +func TestEmailsList(t *testing.T) { + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails?offset=0&limit=1&sort=+date", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsListResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.True(t, response.Success) + require.Equal(t, "hello world", (*response.Emails)[0].Name) +} + +func TestEmailsListInvalidOffset(t *testing.T) { + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails?offset=pppp&limit=1&sort=+date", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsListResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.False(t, response.Success) + require.Equal(t, "Invalid offset", response.Message) +} + +func TestEmailsListInvalidLimit(t *testing.T) { + request := goreq.Request{ + Method: "GET", + Uri: server.URL + "/emails?offset=0&limit=pppp&sort=+date", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsListResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.False(t, response.Success) + require.Equal(t, "Invalid limit", response.Message) +} + +func TestEmailsDelete(t *testing.T) { + request := goreq.Request{ + Method: "DELETE", + Uri: server.URL + "/emails/" + emailID, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsDeleteResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.True(t, response.Success) + require.Equal(t, "Email successfully removed", response.Message) +} + +func TestEmailsDeleteNotExisting(t *testing.T) { + request := goreq.Request{ + Method: "DELETE", + Uri: server.URL + "/emails/nonexisting", + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsDeleteResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.False(t, response.Success) + require.Equal(t, "Email not found", response.Message) +} + +func TestEmailsDeleteNotOwned(t *testing.T) { + request := goreq.Request{ + Method: "DELETE", + Uri: server.URL + "/emails/" + notOwnedEmailID, + } + request.AddHeader("Authorization", "Bearer "+authToken) + result, err := request.Do() + require.Nil(t, err) + + var response routes.EmailsDeleteResponse + err = result.Body.FromJsonTo(&response) + require.Nil(t, err) + + require.False(t, response.Success) + require.Equal(t, "Email not found", response.Message) +} diff --git a/routes/init_test.go b/routes/init_test.go index a57c5a7..fb0e4e7 100644 --- a/routes/init_test.go +++ b/routes/init_test.go @@ -30,6 +30,8 @@ func init() { RedisAddress: "127.0.0.1:6379", + NATSAddress: "nats://127.0.0.1:4222", + RethinkDBAddress: "127.0.0.1:28015", RethinkDBKey: "", RethinkDBDatabase: "test", diff --git a/setup/setup.go b/setup/setup.go index eabe49d..5549c6c 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -1,12 +1,23 @@ package setup import ( + "bufio" + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httptest" + "strings" + "sync" "time" "github.com/Sirupsen/logrus" + "github.com/apcera/nats" "github.com/dancannon/gorethink" + "github.com/googollee/go-socket.io" + "github.com/rs/cors" "github.com/zenazn/goji/web" "github.com/zenazn/goji/web/middleware" + "gopkg.in/igm/sockjs-go.v2/sockjs" "github.com/lavab/api/cache" "github.com/lavab/api/db" @@ -16,6 +27,12 @@ import ( "github.com/lavab/glogrus" ) +// sessions contains all "subscribing" WebSockets sessions +var ( + sessions = map[string][]sockjs.Session{} + sessionsLock sync.Mutex +) + // PrepareMux sets up the API func PrepareMux(flags *env.Flags) *web.Mux { // Set up a new logger @@ -111,6 +128,112 @@ func PrepareMux(flags *env.Flags) *web.Mux { "reservations", ), } + env.Emails = &db.EmailsTable{ + RethinkCRUD: db.NewCRUDTable( + rethinkSession, + rethinkOpts.Database, + "emails", + ), + } + + // NATS queue connection + nc, err := nats.Connect(flags.NATSAddress) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + "address": flags.NATSAddress, + }).Fatal("Unable to connect to NATS") + } + + c, err := nats.NewEncodedConn(nc, "json") + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + "address": flags.NATSAddress, + }).Fatal("Unable to initialize a JSON NATS connection") + } + + c.Subscribe("delivery", func(msg *struct { + ID string `json:"id"` + Owner string `json:"owner"` + }) { + // Check if we are handling owner's session + if _, ok := sessions[msg.Owner]; !ok { + return + } + + if len(sessions[msg.Owner]) == 0 { + return + } + + // Resolve the email + email, err := env.Emails.GetEmail(msg.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": msg.ID, + }).Error("Unable to resolve an email from queue") + return + } + + // Send notifications to subscribers + for _, session := range sessions[msg.Owner] { + result, _ := json.Marshal(map[string]interface{}{ + "type": "delivery", + "id": msg.ID, + "name": email.Name, + }) + err = session.Send(string(result)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + } + } + }) + + c.Subscribe("receipt", func(msg *struct { + ID string `json:"id"` + Owner string `json:"owner"` + }) { + // Check if we are handling owner's session + if _, ok := sessions[msg.Owner]; !ok { + return + } + + if len(sessions[msg.Owner]) == 0 { + return + } + + // Resolve the email + email, err := env.Emails.GetEmail(msg.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": msg.ID, + }).Error("Unable to resolve an email from queue") + return + } + + // Send notifications to subscribers + for _, session := range sessions[msg.Owner] { + result, _ := json.Marshal(map[string]interface{}{ + "type": "receipt", + "id": msg.ID, + "name": email.Name, + }) + err = session.Send(string(result)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + } + } + }) + + env.NATS = c // Initialize factors env.Factors = make(map[string]factor.Factor) @@ -138,6 +261,9 @@ func PrepareMux(flags *env.Flags) *web.Mux { mux.Use(middleware.RequestID) mux.Use(glogrus.NewGlogrus(log, "api")) mux.Use(middleware.Recoverer) + mux.Use(cors.New(cors.Options{ + AllowedOrigins: []string{"*"}, + }).Handler) mux.Use(middleware.AutomaticOptions) // Set up an auth'd mux @@ -170,7 +296,6 @@ func PrepareMux(flags *env.Flags) *web.Mux { auth.Get("/emails", routes.EmailsList) auth.Post("/emails", routes.EmailsCreate) auth.Get("/emails/:id", routes.EmailsGet) - auth.Put("/emails/:id", routes.EmailsUpdate) auth.Delete("/emails/:id", routes.EmailsDelete) // Labels @@ -193,6 +318,359 @@ func PrepareMux(flags *env.Flags) *web.Mux { mux.Get("/keys/:id", routes.KeysGet) auth.Post("/keys/:id/vote", routes.KeysVote) + // WebSockets handler + ws, err := socketio.NewServer(nil) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err, + }).Fatal("Unable to create a socket.io server") + } + ws.On("connection", func(so socketio.Socket) { + env.Log.WithFields(logrus.Fields{ + "id": so.Id(), + }).Info("New WebSockets connection") + + so.On("request", func(id string, method string, path string, data string, headers map[string]string) { + w := httptest.NewRecorder() + r, err := http.NewRequest(method, "http://api.lavaboom.io"+path, strings.NewReader(data)) + if err != nil { + so.Emit("error", err.Error()) + return + } + + for key, value := range headers { + r.Header.Set(key, value) + } + + mux.ServeHTTP(w, r) + + resp, err := http.ReadResponse(bufio.NewReader(w.Body), r) + if err != nil { + so.Emit("error", err.Error()) + return + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + so.Emit("error", err.Error()) + return + } + + so.Emit("response", id, resp.StatusCode, resp.Header, body) + }) + }) + + mux.Handle("/ws/*", sockjs.NewHandler("/ws", sockjs.DefaultOptions, func(session sockjs.Session) { + var subscribed string + + // A new goroutine seems to be spawned for each new session + for { + // Read a message from the input + msg, err := session.Recv() + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while reading from a WebSocket") + break + } + + // Decode the message + var input struct { + Type string `json:"type"` + Token string `json:"token"` + ID string `json:"id"` + Method string `json:"method"` + Path string `json:"path"` + Body string `json:"body"` + Headers map[string]string `json:"headers"` + } + err = json.Unmarshal([]byte(msg), &input) + if err != nil { + // Return an error response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "error", + "error": err, + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + continue + } + + // Check message's type + if input.Type == "subscribe" { + // Listen to user's events + + // Check if token is empty + if input.Token == "" { + // Return an error response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "error", + "error": "Invalid token", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + continue + } + + // Check the token in database + token, err := env.Tokens.GetToken(input.Token) + if err != nil { + // Return an error response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "error", + "error": "Invalid token", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + continue + } + + // Do the actual subscription + subscribed = token.Owner + sessionsLock.Lock() + + // Sessions map already contains this owner + if _, ok := sessions[token.Owner]; ok { + sessions[token.Owner] = append(sessions[token.Owner], session) + } else { + // We have to allocate a new slice + sessions[token.Owner] = []sockjs.Session{session} + } + + // Unlock the map write + sessionsLock.Unlock() + + // Return a response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "subscribed", + }) + err = session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + } else if input.Type == "unsubscribe" { + if subscribed == "" { + resp, _ := json.Marshal(map[string]interface{}{ + "type": "error", + "error": "Not subscribed", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + } + + sessionsLock.Lock() + + if _, ok := sessions[subscribed]; !ok { + // Return a response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "unsubscribed", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + sessionsLock.Unlock() + subscribed = "" + break + } + sessionsLock.Unlock() + subscribed = "" + continue + } + + if len(sessions[subscribed]) == 1 { + delete(sessions, subscribed) + + // Return a response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "unsubscribed", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + sessionsLock.Unlock() + subscribed = "" + break + } + sessionsLock.Unlock() + subscribed = "" + continue + } + + // Find the session + index := -1 + for i, session2 := range sessions[subscribed] { + if session == session2 { + index = i + break + } + } + + // We didn't find anything + if index == -1 { + // Return a response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "unsubscribed", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + sessionsLock.Unlock() + subscribed = "" + break + } + sessionsLock.Unlock() + subscribed = "" + continue + } + + // We found it, so we are supposed to slice it + sessions[subscribed][index] = sessions[subscribed][len(sessions[subscribed])-1] + sessions[subscribed][len(sessions[subscribed])-1] = nil + sessions[subscribed] = sessions[subscribed][:len(sessions[subscribed])-1] + + // Return a response + resp, _ := json.Marshal(map[string]interface{}{ + "type": "unsubscribed", + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + sessionsLock.Unlock() + subscribed = "" + break + } + sessionsLock.Unlock() + subscribed = "" + } else if input.Type == "request" { + // Perform the request + w := httptest.NewRecorder() + r, err := http.NewRequest(input.Method, "http://api.lavaboom.io"+input.Path, strings.NewReader(input.Body)) + if err != nil { + // Return an error response + resp, _ := json.Marshal(map[string]interface{}{ + "error": err.Error(), + }) + err := session.Send(string(resp)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + continue + } + + r.RequestURI = input.Path + + for key, value := range input.Headers { + r.Header.Set(key, value) + } + + mux.ServeHTTP(w, r) + + // Return the final response + result, _ := json.Marshal(map[string]interface{}{ + "type": "response", + "id": input.ID, + "status": w.Code, + "header": w.HeaderMap, + "body": w.Body.String(), + }) + err = session.Send(string(result)) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "id": session.ID(), + "error": err.Error(), + }).Warn("Error while writing to a WebSocket") + break + } + } + } + + // We have to clear the subscription here too. TODO: make the code shorter + if subscribed == "" { + return + } + + sessionsLock.Lock() + + if _, ok := sessions[subscribed]; !ok { + sessionsLock.Unlock() + return + } + + if len(sessions[subscribed]) == 1 { + delete(sessions, subscribed) + sessionsLock.Unlock() + return + } + + // Find the session + index := -1 + for i, session2 := range sessions[subscribed] { + if session == session2 { + index = i + break + } + } + + // We didn't find anything + if index == -1 { + sessionsLock.Unlock() + return + } + + // We found it, so we are supposed to slice it + sessions[subscribed][index] = sessions[subscribed][len(sessions[subscribed])-1] + sessions[subscribed][len(sessions[subscribed])-1] = nil + sessions[subscribed] = sessions[subscribed][:len(sessions[subscribed])-1] + + // Unlock the mutex + sessionsLock.Unlock() + })) + // Merge the muxes mux.Handle("/*", auth) diff --git a/setup/setup_test.go b/setup/setup_test.go index 406ecbe..c455a0c 100644 --- a/setup/setup_test.go +++ b/setup/setup_test.go @@ -20,6 +20,8 @@ func TestSetup(t *testing.T) { RedisAddress: "127.0.0.1:6379", + NATSAddress: "nats://127.0.0.1:4222", + RethinkDBAddress: "127.0.0.1:28015", RethinkDBKey: "", RethinkDBDatabase: "test",