diff --git a/.drone.yml b/.drone.yml index 1956af8..59ae393 100644 --- a/.drone.yml +++ b/.drone.yml @@ -3,7 +3,7 @@ env: - GOPATH=/var/cache/drone services: - redis - - apcera/gnatsd + - mikedewar/nsqd - dockerfile/rethinkdb script: - pip install fabric @@ -18,9 +18,9 @@ script: - "if [ \"$DRONE_BRANCH\" = \"develop\" ]; then fab -H bart.lavaboom.io:36467 deploy; fi" notify: slack: - - webhook_url: $$SLACK_URL - - channel: $$SLACK_CHANNEL - - username: lavadrone - - on_started: true - - on_success: true - - on_failure: true \ No newline at end of file + webhook_url: $$SLACK_URL + channel: $$SLACK_CHANNEL + username: lavadrone + on_started: true + on_success: true + on_failure: true \ No newline at end of file diff --git a/circle.yml b/circle.yml deleted file mode 100644 index acf9014..0000000 --- a/circle.yml +++ /dev/null @@ -1,40 +0,0 @@ -machine: - timezone: - Europe/Berlin - services: - - docker - -dependencies: - cache_directories: - - /home/ubuntu/api/redis-2.8.18 - pre: - - source /etc/lsb-release && echo "deb http://download.rethinkdb.com/apt $DISTRIB_CODENAME main" | sudo tee /etc/apt/sources.list.d/rethinkdb.list - - wget -qO- http://download.rethinkdb.com/apt/pubkey.gpg | sudo apt-key add - - - sudo apt-get update - - sudo apt-get install rethinkdb - - if [[ ! -e redis-2.8.18/src/redis-server ]]; then wget http://download.redis.io/releases/redis-2.8.18.tar.gz && tar xvzf redis-2.8.18.tar.gz; fi - - if [[ ! -e redis-2.8.18/src/redis-server ]]; then cd redis-2.8.18 && make; fi - - go get github.com/apcera/gnatsd - - go get golang.org/x/tools/cmd/cover - - go get github.com/mattn/goveralls - post: - - rethinkdb --bind all: - background: true - - src/redis-server: - background: true - - gnatsd: - background: true - -test: - override: - - go test -v github.com/lavab/api/setup - - go test -v -covermode=count -coverprofile=coverage.out github.com/lavab/api/routes - - goveralls -coverprofile=coverage.out -service=circleci -repotoken $COVERALLS_TOKEN - -deployment: - hub: - branch: master - commands: - - docker login -e circleci@lavaboom.io -u $DOCKER_USER -p $DOCKER_PASS https://registry.lavaboom.io - - docker build -t registry.lavaboom.io/lavaboom/api . - - docker push registry.lavaboom.io/lavaboom/api \ No newline at end of file diff --git a/db/setup.go b/db/setup.go index 5aa049b..f67d9d9 100644 --- a/db/setup.go +++ b/db/setup.go @@ -14,13 +14,13 @@ var ( var tableIndexes = map[string][]string{ "accounts": []string{"name"}, "contacts": []string{"owner"}, - "emails": []string{"owner", "label_ids"}, + "emails": []string{"owner", "label_ids", "date_created", "thread"}, + "files": []string{"owner"}, "keys": []string{"owner", "key_id"}, "labels": []string{"owner"}, "reservations": []string{"email", "name"}, - "threads": []string{"owner"}, + "threads": []string{"owner", "subject_hash"}, "tokens": []string{"owner"}, - "attachments": []string{"owner"}, } // List of names of databases diff --git a/db/table_attachments.go b/db/table_attachments.go deleted file mode 100644 index 165af2a..0000000 --- a/db/table_attachments.go +++ /dev/null @@ -1,95 +0,0 @@ -package db - -import ( - "github.com/lavab/api/models" - - "github.com/dancannon/gorethink" -) - -type AttachmentsTable struct { - RethinkCRUD - Emails *EmailsTable -} - -func (a *AttachmentsTable) GetAttachment(id string) (*models.Attachment, error) { - var result models.Attachment - - if err := a.FindFetchOne(id, &result); err != nil { - return nil, err - } - - return &result, nil -} - -func (a *AttachmentsTable) GetOwnedBy(id string) ([]*models.Attachment, error) { - var result []*models.Attachment - - err := a.WhereAndFetch(map[string]interface{}{ - "owner": id, - }, &result) - if err != nil { - return nil, err - } - - return result, nil -} - -func (a *AttachmentsTable) DeleteOwnedBy(id string) error { - return a.Delete(map[string]interface{}{ - "owner": id, - }) -} - -func (a *AttachmentsTable) GetEmailAttachments(id string) ([]*models.Attachment, error) { - email, err := a.Emails.GetEmail(id) - if err != nil { - return nil, err - } - - query, err := a.Emails.GetTable().Filter(func(row gorethink.Term) gorethink.Term { - return gorethink.Expr(email.Attachments).Contains(row.Field("id")) - }).GetAll().Run(a.Emails.GetSession()) - if err != nil { - return nil, err - } - - var result []*models.Attachment - err = query.All(&result) - if err != nil { - return nil, err - } - - return result, nil -} - -func (a *AttachmentsTable) CountByEmail(id string) (int, error) { - query, err := a.GetTable().GetAllByIndex("owner", id).Count().Run(a.GetSession()) - if err != nil { - return 0, err - } - - var result int - err = query.One(&result) - if err != nil { - return 0, err - } - - return result, nil -} - -func (a *AttachmentsTable) CountByThread(id ...interface{}) (int, error) { - query, err := a.GetTable().Filter(func(row gorethink.Term) gorethink.Term { - return gorethink.Table("emails").GetAllByIndex("owner", id...).Field("attachments").Contains(row.Field("id")) - }).Count().Run(a.GetSession()) - if err != nil { - return 0, err - } - - var result int - err = query.One(&result) - if err != nil { - return 0, err - } - - return result, nil -} diff --git a/db/table_emails.go b/db/table_emails.go index e4229a4..6d388d4 100644 --- a/db/table_emails.go +++ b/db/table_emails.go @@ -65,7 +65,7 @@ func (e *EmailsTable) List( filter["thread"] = thread } - term := e.GetTable().Filter(filter) + term := e.GetTable().Filter(filter).Filter(gorethink.Not(gorethink.Row.Field("status").Eq(gorethink.Expr("queued")))) // If sort array has contents, parse them and add to the term if sort != nil && len(sort) > 0 { @@ -128,3 +128,24 @@ func (e *EmailsTable) DeleteByThread(id string) error { "thread": id, }) } + +func (e *EmailsTable) GetThreadManifest(thread string) (string, error) { + cursor, err := e.GetTable(). + GetAllByIndex("thread", thread). + OrderBy("date_created"). + Limit(1). + Pluck("manifest"). + Field("manifest"). + Run(e.GetSession()) + if err != nil { + return "", err + } + + var manifest string + err = cursor.One(&manifest) + if err != nil { + return "", err + } + + return manifest, nil +} diff --git a/db/table_files.go b/db/table_files.go new file mode 100644 index 0000000..07ea9f4 --- /dev/null +++ b/db/table_files.go @@ -0,0 +1,95 @@ +package db + +import ( + "github.com/lavab/api/models" + + "github.com/dancannon/gorethink" +) + +type FilesTable struct { + RethinkCRUD + Emails *EmailsTable +} + +func (f *FilesTable) GetFile(id string) (*models.File, error) { + var result models.File + + if err := f.FindFetchOne(id, &result); err != nil { + return nil, err + } + + return &result, nil +} + +func (f *FilesTable) GetOwnedBy(id string) ([]*models.File, error) { + var result []*models.File + + err := f.WhereAndFetch(map[string]interface{}{ + "owner": id, + }, &result) + if err != nil { + return nil, err + } + + return result, nil +} + +func (f *FilesTable) DeleteOwnedBy(id string) error { + return f.Delete(map[string]interface{}{ + "owner": id, + }) +} + +func (f *FilesTable) GetEmailFiles(id string) ([]*models.File, error) { + email, err := f.Emails.GetEmail(id) + if err != nil { + return nil, err + } + + query, err := f.Emails.GetTable().Filter(func(row gorethink.Term) gorethink.Term { + return gorethink.Expr(email.Files).Contains(row.Field("id")) + }).GetAll().Run(f.Emails.GetSession()) + if err != nil { + return nil, err + } + + var result []*models.File + err = query.All(&result) + if err != nil { + return nil, err + } + + return result, nil +} + +func (f *FilesTable) CountByEmail(id string) (int, error) { + query, err := f.GetTable().GetAllByIndex("owner", id).Count().Run(f.GetSession()) + if err != nil { + return 0, err + } + + var result int + err = query.One(&result) + if err != nil { + return 0, err + } + + return result, nil +} + +func (f *FilesTable) CountByThread(id ...interface{}) (int, error) { + query, err := f.GetTable().Filter(func(row gorethink.Term) gorethink.Term { + return gorethink.Table("emails").GetAllByIndex("thread", id...).Field("files").Contains(row.Field("id")) + }).Count().Run(f.GetSession()) + if err != nil { + return 0, err + } + + var result int + err = query.One(&result) + if err != nil { + return 0, err + } + + return result, nil +} diff --git a/db/table_threads.go b/db/table_threads.go index 9cfceb1..23a0995 100644 --- a/db/table_threads.go +++ b/db/table_threads.go @@ -60,18 +60,16 @@ func (t *ThreadsTable) List( row.Field("labels").Contains(label), ) }) - } - - if owner != "" && label == "" { + } else if owner != "" && label == "" { term = t.GetTable().Filter(map[string]interface{}{ "owner": owner, }) - } - - if owner == "" && label != "" { + } else if owner == "" && label != "" { term = t.GetTable().Filter(func(row gorethink.Term) gorethink.Term { return row.Field("labels").Contains(label) }) + } else { + term = t.GetTable() } // If sort array has contents, parse them and add to the term @@ -103,6 +101,15 @@ func (t *ThreadsTable) List( term = term.Slice(offset, offset+limit) } + // Add manifests + term = term.InnerJoin(gorethink.Db(t.GetDBName()).Table("emails").Pluck("thread", "manifest"), func(thread gorethink.Term, email gorethink.Term) gorethink.Term { + return thread.Field("id").Eq(email.Field("thread")) + }).Without(map[string]interface{}{ + "right": map[string]interface{}{ + "thread": true, + }, + }).Zip() + // Run the query cursor, err := term.Run(t.GetSession()) if err != nil { diff --git a/env/config.go b/env/config.go index 2ca5ddd..e1f2a67 100644 --- a/env/config.go +++ b/env/config.go @@ -18,7 +18,8 @@ type Flags struct { RethinkDBKey string RethinkDBDatabase string - NATSAddress string + LookupdAddress string + NSQdAddress string YubiCloudID string YubiCloudKey string diff --git a/env/env.go b/env/env.go index da9ba66..00ab65c 100644 --- a/env/env.go +++ b/env/env.go @@ -2,7 +2,7 @@ package env import ( "github.com/Sirupsen/logrus" - "github.com/apcera/nats" + "github.com/bitly/go-nsq" "github.com/dancannon/gorethink" "github.com/lavab/api/cache" @@ -33,12 +33,12 @@ var ( Emails *db.EmailsTable // Labels is the global instance of LabelsTable Labels *db.LabelsTable - // Attachments is the global instance of AttachmentsTable - Attachments *db.AttachmentsTable + // Files is the global instance of FilesTable + Files *db.FilesTable // Threads is the global instance of ThreadsTable Threads *db.ThreadsTable // Factors contains all currently registered factors Factors map[string]factor.Factor - // NATS is the encoded connection to the NATS queue - NATS *nats.EncodedConn + // Producer is the nsq producer used to send messages to other components of the system + Producer *nsq.Producer ) diff --git a/main.go b/main.go index ead3bd2..cfec8b8 100644 --- a/main.go +++ b/main.go @@ -54,14 +54,21 @@ var ( } return database }(), "Database name on the RethinkDB server") - // NATS address - natsAddress = flag.String("nats_address", func() string { - address := os.Getenv("NATS_PORT_4222_TCP_ADDR") + // nsq and lookupd addresses + nsqdAddress = flag.String("nsqd_address", func() string { + address := os.Getenv("NSQD_PORT_4150_TCP_ADDR") if address == "" { address = "127.0.0.1" } - return "nats://" + address + ":4222" - }(), "Address of the NATS server") + return address + ":4150" + }(), "Address of the nsqd server") + lookupdAddress = flag.String("lookupd_address", func() string { + address := os.Getenv("NSQLOOKUPD_PORT_4160_TCP_ADDR") + if address == "" { + address = "127.0.0.1" + } + return address + ":4160" + }(), "Address of the lookupd server") // YubiCloud params yubiCloudID = flag.String("yubicloud_id", "", "YubiCloud API id") yubiCloudKey = flag.String("yubicloud_key", "", "YubiCloud API key") @@ -76,7 +83,7 @@ var ( // slack slackURL = flag.String("slack_url", "", "URL of the Slack Incoming webhook") slackLevels = flag.String("slack_level", "warning", "minimal level required to have messages sent to slack") - slackChannel = flag.String("slack_channel", "#api-logs", "channel to which Slack bot will send messages") + slackChannel = flag.String("slack_channel", "#notif-api-logs", "channel to which Slack bot will send messages") slackIcon = flag.String("slack_icon", ":ghost:", "emoji icon of the Slack bot") slackUsername = flag.String("slack_username", "API", "username of the Slack bot") ) @@ -103,7 +110,8 @@ func main() { RethinkDBKey: *rethinkdbKey, RethinkDBDatabase: *rethinkdbDatabase, - NATSAddress: *natsAddress, + NSQdAddress: *nsqdAddress, + LookupdAddress: *lookupdAddress, YubiCloudID: *yubiCloudID, YubiCloudKey: *yubiCloudKey, diff --git a/models/account.go b/models/account.go index 02b39b6..d82327b 100644 --- a/models/account.go +++ b/models/account.go @@ -4,6 +4,7 @@ import ( "github.com/gyepisam/mcf" _ "github.com/gyepisam/mcf/scrypt" // Required to have mcf hash the password into scrypt "github.com/lavab/api/factor" + "golang.org/x/crypto/openpgp" ) // Account stores essential data for a Lavaboom user, and is thus not encrypted. @@ -38,6 +39,8 @@ type Account struct { FactorValue []string `json:"-" gorethink:"factor_value"` Status string `json:"status" gorethink:"status"` + + Key *openpgp.Entity `json:"-" gorethink:"-"` } // SetPassword changes the account's password diff --git a/models/attachment.go b/models/attachment.go deleted file mode 100644 index b4c0e61..0000000 --- a/models/attachment.go +++ /dev/null @@ -1,14 +0,0 @@ -package models - -// Attachment is an encrypted file stored by Lavaboom -type Attachment struct { - Encrypted - Resource - - // Mime is the Internet media type of the attachment - // Format: "type/subtype" – more info: en.wikipedia.org/wiki/Internet_media_type - MIME string `json:"mime" gorethink:"mime"` - - // Size is the size of the file in bytes i.e. len(file.Data) - Size int `json:"size" gorethink:"size"` -} diff --git a/models/email.go b/models/email.go index ac6839c..298ca10 100644 --- a/models/email.go +++ b/models/email.go @@ -1,42 +1,42 @@ package models -// Email is the cornerstone of our application. -// TODO mime info +// Email is a message in a thread type Email struct { Resource - // Kind of the email. Value is either sent or received. + // Kind is the type of encryption used in the email: + // - raw - when sending raw emails before they get sent + // - manifest - Manifest field is not empty, + // - pgpmime - PGP/MIME format, aka everything is in body Kind string `json:"kind" gorethink:"kind"` - From []string `json:"from" gorethink:"from"` + // Unencrypted metadata information, available in both received in sent emails + From string `json:"from" gorethink:"from"` + To []string `json:"to" gorethink:"to"` + CC []string `json:"cc" gorethink:"cc"` - // Who is supposed to receive the email / what email received it. - To []string `json:"to" gorethink:"to"` + // BCC is only visible in sent emails + BCC []string `json:"bcc" gorethink:"bcc"` - CC []string `json:"cc" gorethink:"cc"` + // Fingerprints used for body and manifest + PGPFingerprints []string `json:"pgp_fingerprints" gorethink:"pgp_fingerprints"` - BCC []string `json:"bcc" gorethink:"bcc"` + // Files contains IDs of other files + Files []string `json:"files" gorethink:"files"` - // AttachmentsIDs is a slice of the FileIDs associated with this email - // For uploading attachments see `POST /upload` - Attachments []string `json:"attachments" gorethink:"attachments"` + // Manifest is only available in emails that were encrypted using PGP manifests + Manifest string `json:"manifest" gorethink:"manifest"` // Body contains all the data needed to send this email - Body Encrypted `json:"body" gorethink:"body"` + Body string `json:"body" gorethink:"body"` - // Preview contains the encrypted preview information (needed to show a list of emails) - // Example: Headers []string, Body string, - // Headers []string - // Body string - // Snippet string - //Preview Encrypted `json:"preview" gorethink:"preview"` + // ContentType of the body in unencrypted emails + ContentType string `json:"content_type" gorethink:"content_type"` + ReplyTo string `json:"reply_to" gorethink:"reply_to"` - Headers []string `json:"headers" gorethink:"headers"` - - // ThreadID + // Contains ID of the thread Thread string `json:"thread" gorethink:"thread"` + // received or (queued|processed) Status string `json:"status" gorethink:"status"` - - IsRead string `json:"is_read" gorethink:"is_read"` } diff --git a/models/file.go b/models/file.go new file mode 100644 index 0000000..0414578 --- /dev/null +++ b/models/file.go @@ -0,0 +1,7 @@ +package models + +// File is an encrypted file stored by Lavaboom +type File struct { + Encrypted + Resource +} diff --git a/models/label.go b/models/label.go index 6daa02c..588044c 100644 --- a/models/label.go +++ b/models/label.go @@ -16,6 +16,6 @@ type Label struct { // Examples: inbox, trash, spam, drafts, starred, etc. Builtin bool `json:"builtin" gorethink:"builtin"` - EmailsUnread int `json:"emails_unread" gorethink:"-"` - EmailsTotal int `json:"emails_total" gorethink:"-"` + UnreadThreadsCount int `json:"unread_threads_count" gorethink:"-"` + TotalThreadsCount int `json:"total_threads_count" gorethink:"-"` } diff --git a/models/thread.go b/models/thread.go index f5f2392..2270965 100644 --- a/models/thread.go +++ b/models/thread.go @@ -17,5 +17,9 @@ type Thread struct { IsRead bool `json:"is_read" gorethink:"is_read"` LastRead string `json:"last_read" gorethink:"last_read"` - AttachmentsCount *int `json:"attachments_count,omitempty" gorethink:"attachments_count,omitempty"` + FilesCount *int `json:"files_count,omitempty" gorethink:"-"` + Manifest string `json:"manifest,omitempty" gorethink:"manifest"` + + // SHA256 hash of the raw subject without prefixes + SubjectHash string `json:"subject_hash" gorethink:"subject_hash"` } diff --git a/routes/attachments.go b/routes/attachments.go deleted file mode 100644 index ca86e99..0000000 --- a/routes/attachments.go +++ /dev/null @@ -1,310 +0,0 @@ -package routes - -import ( - "net/http" - - "github.com/Sirupsen/logrus" - "github.com/zenazn/goji/web" - - "github.com/lavab/api/env" - "github.com/lavab/api/models" - "github.com/lavab/api/utils" -) - -type AttachmentsListResponse struct { - Success bool `json:"success"` - Message string `json:"message,omitempty"` - Attachments *[]*models.Attachment `json:"attachments,omitempty"` -} - -func AttachmentsList(c web.C, w http.ResponseWriter, r *http.Request) { - session := c.Env["token"].(*models.Token) - - attachments, err := env.Attachments.GetOwnedBy(session.Owner) - if err != nil { - env.Log.WithFields(logrus.Fields{ - "error": err.Error(), - }).Error("Unable to fetch attachments") - - utils.JSONResponse(w, 500, &AttachmentsListResponse{ - Success: false, - Message: "Internal error (code AT/LI/01)", - }) - return - } - - utils.JSONResponse(w, 200, &AttachmentsListResponse{ - Success: true, - Attachments: &attachments, - }) -} - -type AttachmentsCreateRequest struct { - Data string `json:"data" schema:"data"` - Name string `json:"name" schema:"name"` - Encoding string `json:"encoding" schema:"encoding"` - VersionMajor int `json:"version_major" schema:"version_major"` - VersionMinor int `json:"version_minor" schema:"version_minor"` - PGPFingerprints []string `json:"pgp_fingerprints" schema:"pgp_fingerprints"` -} - -type AttachmentsCreateResponse struct { - Success bool `json:"success"` - Message string `json:"message"` - Attachment *models.Attachment `json:"attachment,omitempty"` -} - -// AttachmentsCreate creates a new attachment -func AttachmentsCreate(c web.C, w http.ResponseWriter, r *http.Request) { - // Decode the request - var input AttachmentsCreateRequest - err := utils.ParseRequest(r, &input) - if err != nil { - env.Log.WithFields(logrus.Fields{ - "error": err.Error(), - }).Warn("Unable to decode a request") - - utils.JSONResponse(w, 400, &AttachmentsCreateResponse{ - Success: false, - Message: "Invalid input format", - }) - return - } - - // Fetch the current session from the middleware - session := c.Env["token"].(*models.Token) - - // Ensure that the input data isn't empty - if input.Data == "" || input.Name == "" || input.Encoding == "" || - input.PGPFingerprints == nil || len(input.PGPFingerprints) == 0 { - utils.JSONResponse(w, 400, &AttachmentsCreateResponse{ - Success: false, - Message: "Invalid request", - }) - return - } - - // Create a new attachment struct - attachment := &models.Attachment{ - Encrypted: models.Encrypted{ - Encoding: input.Encoding, - Data: input.Data, - Schema: "attachment", - VersionMajor: input.VersionMajor, - VersionMinor: input.VersionMinor, - PGPFingerprints: input.PGPFingerprints, - }, - Resource: models.MakeResource(session.Owner, input.Name), - } - - // Insert the attachment into the database - if err := env.Attachments.Insert(attachment); err != nil { - utils.JSONResponse(w, 500, &AttachmentsCreateResponse{ - Success: false, - Message: "internal server error - AT/CR/01", - }) - - env.Log.WithFields(logrus.Fields{ - "error": err.Error(), - }).Error("Could not insert a attachment into the database") - return - } - - utils.JSONResponse(w, 201, &AttachmentsCreateResponse{ - Success: true, - Message: "A new attachment was successfully created", - Attachment: attachment, - }) -} - -// AttachmentsGetResponse contains the result of the AttachmentsGet request. -type AttachmentsGetResponse struct { - Success bool `json:"success"` - Message string `json:"message,omitempty"` - Attachment *models.Attachment `json:"attachment,omitempty"` -} - -// AttachmentsGet gets the requested attachment from the database -func AttachmentsGet(c web.C, w http.ResponseWriter, r *http.Request) { - // Get the attachment from the database - attachment, err := env.Attachments.GetAttachment(c.URLParams["id"]) - if err != nil { - utils.JSONResponse(w, 404, &AttachmentsGetResponse{ - Success: false, - Message: "Attachment not found", - }) - return - } - - // Fetch the current session from the middleware - session := c.Env["token"].(*models.Token) - - // Check for ownership - if attachment.Owner != session.Owner { - utils.JSONResponse(w, 404, &AttachmentsGetResponse{ - Success: false, - Message: "Attachment not found", - }) - return - } - - // Write the attachment to the response - utils.JSONResponse(w, 200, &AttachmentsGetResponse{ - Success: true, - Attachment: attachment, - }) -} - -// AttachmentsUpdateRequest is the payload passed to PUT /contacts/:id -type AttachmentsUpdateRequest struct { - Data string `json:"data" schema:"data"` - Name string `json:"name" schema:"name"` - Encoding string `json:"encoding" schema:"encoding"` - VersionMajor *int `json:"version_major" schema:"version_major"` - VersionMinor *int `json:"version_minor" schema:"version_minor"` - PGPFingerprints []string `json:"pgp_fingerprints" schema:"pgp_fingerprints"` -} - -// AttachmentsUpdateResponse contains the result of the AttachmentsUpdate request. -type AttachmentsUpdateResponse struct { - Success bool `json:"success"` - Message string `json:"message,omitempty"` - Attachment *models.Attachment `json:"attachment,omitempty"` -} - -// AttachmentsUpdate updates an existing attachment in the database -func AttachmentsUpdate(c web.C, w http.ResponseWriter, r *http.Request) { - // Decode the request - var input AttachmentsUpdateRequest - err := utils.ParseRequest(r, &input) - if err != nil { - env.Log.WithFields(logrus.Fields{ - "error": err.Error(), - }).Warn("Unable to decode a request") - - utils.JSONResponse(w, 400, &AttachmentsUpdateResponse{ - Success: false, - Message: "Invalid input format", - }) - return - } - - // Get the attachment from the database - attachment, err := env.Attachments.GetAttachment(c.URLParams["id"]) - if err != nil { - utils.JSONResponse(w, 404, &AttachmentsUpdateResponse{ - Success: false, - Message: "Attachment not found", - }) - return - } - - // Fetch the current session from the middleware - session := c.Env["token"].(*models.Token) - - // Check for ownership - if attachment.Owner != session.Owner { - utils.JSONResponse(w, 404, &AttachmentsUpdateResponse{ - Success: false, - Message: "Attachment not found", - }) - return - } - - if input.Data != "" { - attachment.Data = input.Data - } - - if input.Name != "" { - attachment.Name = input.Name - } - - if input.Encoding != "" { - attachment.Encoding = input.Encoding - } - - if input.VersionMajor != nil { - attachment.VersionMajor = *input.VersionMajor - } - - if input.VersionMinor != nil { - attachment.VersionMinor = *input.VersionMinor - } - - if input.PGPFingerprints != nil { - attachment.PGPFingerprints = input.PGPFingerprints - } - - // Perform the update - err = env.Attachments.UpdateID(c.URLParams["id"], attachment) - if err != nil { - env.Log.WithFields(logrus.Fields{ - "error": err.Error(), - "id": c.URLParams["id"], - }).Error("Unable to update a attachment") - - utils.JSONResponse(w, 500, &AttachmentsUpdateResponse{ - Success: false, - Message: "Internal error (code AT/UP/01)", - }) - return - } - - // Write the attachment to the response - utils.JSONResponse(w, 200, &AttachmentsUpdateResponse{ - Success: true, - Attachment: attachment, - }) -} - -// AttachmentsDeleteResponse contains the result of the Delete request. -type AttachmentsDeleteResponse struct { - Success bool `json:"success"` - Message string `json:"message"` -} - -// AttachmentsDelete removes a attachment from the database -func AttachmentsDelete(c web.C, w http.ResponseWriter, r *http.Request) { - // Get the attachment from the database - attachment, err := env.Attachments.GetAttachment(c.URLParams["id"]) - if err != nil { - utils.JSONResponse(w, 404, &AttachmentsDeleteResponse{ - Success: false, - Message: "Attachment not found", - }) - return - } - - // Fetch the current session from the middleware - session := c.Env["token"].(*models.Token) - - // Check for ownership - if attachment.Owner != session.Owner { - utils.JSONResponse(w, 404, &AttachmentsDeleteResponse{ - Success: false, - Message: "Attachment not found", - }) - return - } - - // Perform the deletion - err = env.Attachments.DeleteID(c.URLParams["id"]) - if err != nil { - env.Log.WithFields(logrus.Fields{ - "error": err.Error(), - "id": c.URLParams["id"], - }).Error("Unable to delete a attachment") - - utils.JSONResponse(w, 500, &AttachmentsDeleteResponse{ - Success: false, - Message: "Internal error (code AT/DE/01)", - }) - return - } - - // Write the attachment to the response - utils.JSONResponse(w, 200, &AttachmentsDeleteResponse{ - Success: true, - Message: "Attachment successfully removed", - }) -} diff --git a/routes/emails.go b/routes/emails.go index def605d..302aa39 100644 --- a/routes/emails.go +++ b/routes/emails.go @@ -1,8 +1,10 @@ package routes import ( - "bytes" - "io" + //"bytes" + //"io" + "crypto/sha256" + "encoding/hex" "net/http" "regexp" "strconv" @@ -10,8 +12,8 @@ import ( "github.com/Sirupsen/logrus" "github.com/zenazn/goji/web" - "golang.org/x/crypto/openpgp" - "golang.org/x/crypto/openpgp/armor" + //"golang.org/x/crypto/openpgp" + //"golang.org/x/crypto/openpgp/armor" _ "golang.org/x/crypto/ripemd160" "github.com/lavab/api/env" @@ -125,21 +127,25 @@ func EmailsList(c web.C, w http.ResponseWriter, r *http.Request) { } type EmailsCreateRequest struct { - To []string `json:"to"` - CC []string `json:"cc"` - BCC []string `json:"bcc"` - ReplyTo string `json:"reply_to"` - Thread string `json:"thread"` - Subject string `json:"subject"` - Body string `json:"body"` - BodyVersionMajor int `json:"body_version_major"` - BodyVersionMinor int `json:"body_version_minor"` - Preview string `json:"preview"` - PreviewVersionMajor int `json:"preview_version_major"` - PreviewVersionMinor int `json:"preview_version_minor"` - Encoding string `json:"encoding"` - Attachments []string `json:"attachments"` - PGPFingerprints []string `json:"pgp_fingerprints"` + // Internal properties + Kind string `json:"kind"` + Thread string `json:"thread"` + + // Metadata that has to be leaked + To []string `json:"to"` + CC []string `json:"cc"` + BCC []string `json:"bcc"` + + // Encrypted parts + PGPFingerprints []string `json:"pgp_fingerprints"` + Manifest string `json:"manifest"` + Body string `json:"body"` + Files []string `json:"files"` + + // Temporary partials if you're sending unencrypted + Subject string `json:"subject"` + ContentType string `json:"content_type"` + ReplyTo string `json:"reply_to"` } // EmailsCreateResponse contains the result of the EmailsCreate request. @@ -169,15 +175,32 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { // Fetch the current session from the middleware session := c.Env["token"].(*models.Token) - // Ensure that the input data isn't empty - if len(input.To) == 0 || input.Subject == "" || input.Body == "" { + // Ensure that the kind is valid + if input.Kind != "raw" && input.Kind != "manifest" && input.Kind != "pgpmime" { utils.JSONResponse(w, 400, &EmailsCreateResponse{ Success: false, - Message: "Invalid request", + Message: "Invalid email encryption kind", }) return } + // Ensure that there's at least one recipient and that there's body + if len(input.To) == 0 || input.Body == "" { + utils.JSONResponse(w, 400, &EmailsCreateResponse{ + Success: false, + Message: "Invalid email", + }) + return + } + + // Create an email resource + resource := models.MakeResource(session.Owner, input.Subject) + + // Generate metadata for manifests + if input.Kind == "manifest" { + resource.Name = "Encrypted message (" + resource.ID + ")" + } + // Fetch the user object from the database account, err := env.Accounts.GetTokenOwner(c.Env["token"].(*models.Token)) if err != nil { @@ -194,9 +217,6 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { return } - // Create an email resource - emailResource := models.MakeResource(session.Owner, input.Subject) - // Get the "Sent" label's ID var label *models.Label err = env.Labels.WhereAndFetchOne(map[string]interface{}{ @@ -234,12 +254,15 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { return } } else { + hash := sha256.Sum256([]byte(input.Subject)) + thread := &models.Thread{ - Resource: models.MakeResource(account.ID, input.Subject), - Emails: []string{emailResource.ID}, - Labels: []string{label.ID}, - Members: append(append(input.To, input.CC...), input.BCC...), - IsRead: true, + Resource: models.MakeResource(account.ID, "Encrypted thread"), + Emails: []string{resource.ID}, + Labels: []string{label.ID}, + Members: append(append(input.To, input.CC...), input.BCC...), + IsRead: true, + SubjectHash: hex.EncodeToString(hash[:]), } err := env.Threads.Insert(thread) @@ -260,22 +283,24 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { // Create a new email struct email := &models.Email{ - Kind: "sent", - From: []string{account.Name + "@" + env.Config.EmailDomain}, - To: input.To, - CC: input.CC, - BCC: input.BCC, - Resource: emailResource, - Attachments: input.Attachments, - Thread: input.Thread, - Body: models.Encrypted{ - Encoding: "json", - PGPFingerprints: input.PGPFingerprints, - Data: input.Body, - Schema: "email", - VersionMajor: input.BodyVersionMajor, - VersionMinor: input.BodyVersionMinor, - }, + Resource: resource, + + Kind: input.Kind, + Thread: input.Thread, + + From: account.Name + "@" + env.Config.EmailDomain, + To: input.To, + CC: input.CC, + BCC: input.BCC, + + PGPFingerprints: input.PGPFingerprints, + Manifest: input.Manifest, + Body: input.Body, + Files: input.Files, + + ContentType: input.ContentType, + ReplyTo: input.ReplyTo, + Status: "queued", } @@ -295,7 +320,7 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { // I'm going to whine at this part, as we are doubling the email sending code // Check if To contains lavaboom emails - for _, address := range email.To { + /*for _, address := range email.To { parts := strings.SplitN(address, "@", 2) if parts[1] == env.Config.EmailDomain { go sendEmail(parts[0], email) @@ -316,10 +341,10 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { if parts[1] == env.Config.EmailDomain { go sendEmail(parts[0], email) } - } + }*/ // Add a send request to the queue - err = env.NATS.Publish("send", email.ID) + err = env.Producer.Publish("send_email", []byte(`"`+email.ID+`"`)) if err != nil { utils.JSONResponse(w, 500, &EmailsCreateResponse{ Success: false, @@ -428,7 +453,7 @@ func EmailsDelete(c web.C, w http.ResponseWriter, r *http.Request) { }) } -func sendEmail(account string, email *models.Email) { +/*func sendEmail(account string, email *models.Email) { // find recipient's account recipient, err := env.Accounts.FindAccountByName(account) if err != nil { @@ -630,7 +655,7 @@ func sendEmail(account string, email *models.Email) { } // Send notifications - err = env.NATS.Publish("delivery", map[string]interface{}{ + err = env.Producer.Publish("email_delivery", map[string]interface{}{ "id": email.ID, "owner": email.Owner, }) @@ -641,7 +666,7 @@ func sendEmail(account string, email *models.Email) { }).Error("Unable to publish a delivery message") } - err = env.NATS.Publish("receipt", map[string]interface{}{ + err = env.NATS.Publish("email_receipt", map[string]interface{}{ "id": newEmail.ID, "owner": newEmail.Owner, }) @@ -651,4 +676,4 @@ func sendEmail(account string, email *models.Email) { "error": err.Error(), }).Error("Unable to publish a receipt message") } -} +}*/ diff --git a/routes/emails_test.go b/routes/emails_test.go_ similarity index 100% rename from routes/emails_test.go rename to routes/emails_test.go_ diff --git a/routes/files.go b/routes/files.go new file mode 100644 index 0000000..8f9de53 --- /dev/null +++ b/routes/files.go @@ -0,0 +1,310 @@ +package routes + +import ( + "net/http" + + "github.com/Sirupsen/logrus" + "github.com/zenazn/goji/web" + + "github.com/lavab/api/env" + "github.com/lavab/api/models" + "github.com/lavab/api/utils" +) + +type FilesListResponse struct { + Success bool `json:"success"` + Message string `json:"message,omitempty"` + Files *[]*models.File `json:"files,omitempty"` +} + +func FilesList(c web.C, w http.ResponseWriter, r *http.Request) { + session := c.Env["token"].(*models.Token) + + files, err := env.Files.GetOwnedBy(session.Owner) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + }).Error("Unable to fetch files") + + utils.JSONResponse(w, 500, &FilesListResponse{ + Success: false, + Message: "Internal error (code FI/LI/01)", + }) + return + } + + utils.JSONResponse(w, 200, &FilesListResponse{ + Success: true, + Files: &files, + }) +} + +type FilesCreateRequest struct { + Data string `json:"data" schema:"data"` + Name string `json:"name" schema:"name"` + Encoding string `json:"encoding" schema:"encoding"` + VersionMajor int `json:"version_major" schema:"version_major"` + VersionMinor int `json:"version_minor" schema:"version_minor"` + PGPFingerprints []string `json:"pgp_fingerprints" schema:"pgp_fingerprints"` +} + +type FilesCreateResponse struct { + Success bool `json:"success"` + Message string `json:"message"` + File *models.File `json:"file,omitempty"` +} + +// FilesCreate creates a new file +func FilesCreate(c web.C, w http.ResponseWriter, r *http.Request) { + // Decode the request + var input FilesCreateRequest + err := utils.ParseRequest(r, &input) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + }).Warn("Unable to decode a request") + + utils.JSONResponse(w, 400, &FilesCreateResponse{ + Success: false, + Message: "Invalid input format", + }) + return + } + + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Ensure that the input data isn't empty + if input.Data == "" || input.Name == "" || input.Encoding == "" || + input.PGPFingerprints == nil || len(input.PGPFingerprints) == 0 { + utils.JSONResponse(w, 400, &FilesCreateResponse{ + Success: false, + Message: "Invalid request", + }) + return + } + + // Create a new file struct + file := &models.File{ + Encrypted: models.Encrypted{ + Encoding: input.Encoding, + Data: input.Data, + Schema: "file", + VersionMajor: input.VersionMajor, + VersionMinor: input.VersionMinor, + PGPFingerprints: input.PGPFingerprints, + }, + Resource: models.MakeResource(session.Owner, input.Name), + } + + // Insert the file into the database + if err := env.Files.Insert(file); err != nil { + utils.JSONResponse(w, 500, &FilesCreateResponse{ + Success: false, + Message: "internal server error - FI/CR/01", + }) + + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + }).Error("Could not insert a file into the database") + return + } + + utils.JSONResponse(w, 201, &FilesCreateResponse{ + Success: true, + Message: "A new file was successfully created", + File: file, + }) +} + +// FilesGetResponse contains the result of the FilesGet request. +type FilesGetResponse struct { + Success bool `json:"success"` + Message string `json:"message,omitempty"` + File *models.File `json:"file,omitempty"` +} + +// FilesGet gets the requested file from the database +func FilesGet(c web.C, w http.ResponseWriter, r *http.Request) { + // Get the file from the database + file, err := env.Files.GetFile(c.URLParams["id"]) + if err != nil { + utils.JSONResponse(w, 404, &FilesGetResponse{ + Success: false, + Message: "File not found", + }) + return + } + + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Check for ownership + if file.Owner != session.Owner { + utils.JSONResponse(w, 404, &FilesGetResponse{ + Success: false, + Message: "File not found", + }) + return + } + + // Write the file to the response + utils.JSONResponse(w, 200, &FilesGetResponse{ + Success: true, + File: file, + }) +} + +// FilesUpdateRequest is the payload passed to PUT /files/:id +type FilesUpdateRequest struct { + Data string `json:"data" schema:"data"` + Name string `json:"name" schema:"name"` + Encoding string `json:"encoding" schema:"encoding"` + VersionMajor *int `json:"version_major" schema:"version_major"` + VersionMinor *int `json:"version_minor" schema:"version_minor"` + PGPFingerprints []string `json:"pgp_fingerprints" schema:"pgp_fingerprints"` +} + +// FilesUpdateResponse contains the result of the FilesUpdate request. +type FilesUpdateResponse struct { + Success bool `json:"success"` + Message string `json:"message,omitempty"` + File *models.File `json:"file,omitempty"` +} + +// FilesUpdate updates an existing file in the database +func FilesUpdate(c web.C, w http.ResponseWriter, r *http.Request) { + // Decode the request + var input FilesUpdateRequest + err := utils.ParseRequest(r, &input) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + }).Warn("Unable to decode a request") + + utils.JSONResponse(w, 400, &FilesUpdateResponse{ + Success: false, + Message: "Invalid input format", + }) + return + } + + // Get the file from the database + file, err := env.Files.GetFile(c.URLParams["id"]) + if err != nil { + utils.JSONResponse(w, 404, &FilesUpdateResponse{ + Success: false, + Message: "File not found", + }) + return + } + + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Check for ownership + if file.Owner != session.Owner { + utils.JSONResponse(w, 404, &FilesUpdateResponse{ + Success: false, + Message: "File not found", + }) + return + } + + if input.Data != "" { + file.Data = input.Data + } + + if input.Name != "" { + file.Name = input.Name + } + + if input.Encoding != "" { + file.Encoding = input.Encoding + } + + if input.VersionMajor != nil { + file.VersionMajor = *input.VersionMajor + } + + if input.VersionMinor != nil { + file.VersionMinor = *input.VersionMinor + } + + if input.PGPFingerprints != nil { + file.PGPFingerprints = input.PGPFingerprints + } + + // Perform the update + err = env.Files.UpdateID(c.URLParams["id"], file) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": c.URLParams["id"], + }).Error("Unable to update a file") + + utils.JSONResponse(w, 500, &FilesUpdateResponse{ + Success: false, + Message: "Internal error (code FI/UP/01)", + }) + return + } + + // Write the file to the response + utils.JSONResponse(w, 200, &FilesUpdateResponse{ + Success: true, + File: file, + }) +} + +// FilesDeleteResponse contains the result of the Delete request. +type FilesDeleteResponse struct { + Success bool `json:"success"` + Message string `json:"message"` +} + +// FilesDelete removes a file from the database +func FilesDelete(c web.C, w http.ResponseWriter, r *http.Request) { + // Get the file from the database + file, err := env.Files.GetFile(c.URLParams["id"]) + if err != nil { + utils.JSONResponse(w, 404, &FilesDeleteResponse{ + Success: false, + Message: "File not found", + }) + return + } + + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Check for ownership + if file.Owner != session.Owner { + utils.JSONResponse(w, 404, &FilesDeleteResponse{ + Success: false, + Message: "File not found", + }) + return + } + + // Perform the deletion + err = env.Files.DeleteID(c.URLParams["id"]) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": c.URLParams["id"], + }).Error("Unable to delete a file") + + utils.JSONResponse(w, 500, &FilesDeleteResponse{ + Success: false, + Message: "Internal error (code FI/DE/01)", + }) + return + } + + // Write the file to the response + utils.JSONResponse(w, 200, &FilesDeleteResponse{ + Success: true, + Message: "File successfully removed", + }) +} diff --git a/routes/attachments_test.go b/routes/files_test.go similarity index 67% rename from routes/attachments_test.go rename to routes/files_test.go index d34db71..91897ac 100644 --- a/routes/attachments_test.go +++ b/routes/files_test.go @@ -12,8 +12,8 @@ import ( "github.com/lavab/api/routes" ) -func TestAttachmentsRoute(t *testing.T) { - Convey("When uploading a new attachment", t, func() { +func TestFilesRoute(t *testing.T) { + Convey("When uploading a new file", t, func() { account := &models.Account{ Resource: models.MakeResource("", "johnorange"), Status: "complete", @@ -47,7 +47,7 @@ func TestAttachmentsRoute(t *testing.T) { Convey("Misformatted body should fail", func() { request := goreq.Request{ Method: "POST", - Uri: server.URL + "/attachments", + Uri: server.URL + "/files", ContentType: "application/json", Body: "!@#!@#", } @@ -55,7 +55,7 @@ func TestAttachmentsRoute(t *testing.T) { result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsCreateResponse + var response routes.FilesCreateResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) @@ -66,7 +66,7 @@ func TestAttachmentsRoute(t *testing.T) { Convey("Invalid set of data should fail", func() { request := goreq.Request{ Method: "POST", - Uri: server.URL + "/attachments", + Uri: server.URL + "/files", } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() @@ -80,10 +80,10 @@ func TestAttachmentsRoute(t *testing.T) { So(response.Message, ShouldEqual, "Invalid request") }) - Convey("Attachment upload should succeed", func() { + Convey("File upload should succeed", func() { request := goreq.Request{ Method: "POST", - Uri: server.URL + "/attachments", + Uri: server.URL + "/files", ContentType: "application/json", Body: `{ "data": "` + uniuri.NewLen(64) + `", @@ -98,52 +98,52 @@ func TestAttachmentsRoute(t *testing.T) { result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsCreateResponse + var response routes.FilesCreateResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) - So(response.Message, ShouldEqual, "A new attachment was successfully created") + So(response.Message, ShouldEqual, "A new file was successfully created") So(response.Success, ShouldBeTrue) - So(response.Attachment.ID, ShouldNotBeEmpty) + So(response.File.ID, ShouldNotBeEmpty) - attachment := response.Attachment + file := response.File - Convey("Getting that attachment should succeed", func() { + Convey("Getting that file should succeed", func() { request := goreq.Request{ Method: "GET", - Uri: server.URL + "/attachments/" + attachment.ID, + Uri: server.URL + "/files/" + file.ID, } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsGetResponse + var response routes.FilesGetResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) - So(response.Attachment.ID, ShouldNotBeNil) + So(response.File.ID, ShouldNotBeNil) So(response.Success, ShouldBeTrue) }) - Convey("The attachment should be visible on the list", func() { + Convey("The file should be visible on the list", func() { request := goreq.Request{ Method: "GET", - Uri: server.URL + "/attachments", + Uri: server.URL + "/files", } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsListResponse + var response routes.FilesListResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) - So(len(*response.Attachments), ShouldBeGreaterThan, 0) + So(len(*response.Files), ShouldBeGreaterThan, 0) So(response.Success, ShouldBeTrue) found := false - for _, a := range *response.Attachments { - if a.ID == attachment.ID { + for _, a := range *response.Files { + if a.ID == file.ID { found = true break } @@ -155,7 +155,7 @@ func TestAttachmentsRoute(t *testing.T) { Convey("Updating it should succeed", func() { request := goreq.Request{ Method: "PUT", - Uri: server.URL + "/attachments/" + attachment.ID, + Uri: server.URL + "/files/" + file.ID, ContentType: "application/json", Body: `{ "data": "` + uniuri.NewLen(64) + `", @@ -170,53 +170,53 @@ func TestAttachmentsRoute(t *testing.T) { result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsUpdateResponse + var response routes.FilesUpdateResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) So(response.Success, ShouldBeTrue) - So(response.Attachment.ID, ShouldEqual, attachment.ID) - So(response.Attachment.Encoding, ShouldEqual, "xml") + So(response.File.ID, ShouldEqual, file.ID) + So(response.File.Encoding, ShouldEqual, "xml") }) Convey("Deleting it should succeed", func() { request := goreq.Request{ Method: "DELETE", - Uri: server.URL + "/attachments/" + attachment.ID, + Uri: server.URL + "/files/" + file.ID, } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsDeleteResponse + var response routes.FilesDeleteResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) - So(response.Message, ShouldEqual, "Attachment successfully removed") + So(response.Message, ShouldEqual, "File successfully removed") So(response.Success, ShouldBeTrue) }) }) - Convey("Getting a non-existing attachment should fail", func() { + Convey("Getting a non-existing file should fail", func() { request := goreq.Request{ Method: "GET", - Uri: server.URL + "/attachments/doesntexist", + Uri: server.URL + "/files/doesntexist", } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsGetResponse + var response routes.FilesGetResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) - So(response.Message, ShouldEqual, "Attachment not found") + So(response.Message, ShouldEqual, "File not found") So(response.Success, ShouldBeFalse) Convey("Updating it should fail too", func() { request := goreq.Request{ Method: "PUT", - Uri: server.URL + "/attachments/doesntexist", + Uri: server.URL + "/files/doesntexist", ContentType: "application/json", Body: "{}", } @@ -224,21 +224,21 @@ func TestAttachmentsRoute(t *testing.T) { result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsGetResponse + var response routes.FilesGetResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) - So(response.Message, ShouldEqual, "Attachment not found") + So(response.Message, ShouldEqual, "File not found") So(response.Success, ShouldBeFalse) }) }) - Convey("Getting a non-owned attachment should fail", func() { - attachment := &models.Attachment{ + Convey("Getting a non-owned file should fail", func() { + file := &models.File{ Encrypted: models.Encrypted{ Encoding: "json", Data: uniuri.NewLen(64), - Schema: "attachment", + Schema: "file", VersionMajor: 1, VersionMinor: 0, PGPFingerprints: []string{uniuri.New()}, @@ -246,29 +246,29 @@ func TestAttachmentsRoute(t *testing.T) { Resource: models.MakeResource("nonowned", "photo.jpg"), } - err := env.Attachments.Insert(attachment) + err := env.Files.Insert(file) So(err, ShouldBeNil) request := goreq.Request{ Method: "GET", - Uri: server.URL + "/attachments/" + attachment.ID, + Uri: server.URL + "/files/" + file.ID, } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsGetResponse + var response routes.FilesGetResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) - So(response.Message, ShouldEqual, "Attachment not found") + So(response.Message, ShouldEqual, "File not found") So(response.Success, ShouldBeFalse) }) Convey("Updating using a misformatted body should fail", func() { request := goreq.Request{ Method: "PUT", - Uri: server.URL + "/attachments/shizzle", + Uri: server.URL + "/files/shizzle", ContentType: "application/json", Body: "!@#!@#", } @@ -276,7 +276,7 @@ func TestAttachmentsRoute(t *testing.T) { result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsUpdateResponse + var response routes.FilesUpdateResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) @@ -284,10 +284,10 @@ func TestAttachmentsRoute(t *testing.T) { So(response.Message, ShouldEqual, "Invalid input format") }) - Convey("Updating a non-existing attachment should fail", func() { + Convey("Updating a non-existing file should fail", func() { request := goreq.Request{ Method: "PUT", - Uri: server.URL + "/attachments/shizzle", + Uri: server.URL + "/files/shizzle", ContentType: "application/json", Body: "{}", } @@ -295,29 +295,29 @@ func TestAttachmentsRoute(t *testing.T) { result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsUpdateResponse + var response routes.FilesUpdateResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) So(response.Success, ShouldBeFalse) - So(response.Message, ShouldEqual, "Attachment not found") + So(response.Message, ShouldEqual, "File not found") }) - Convey("Deleting a non-existing attachment should fail", func() { + Convey("Deleting a non-existing file should fail", func() { request := goreq.Request{ Method: "DELETE", - Uri: server.URL + "/attachments/shizzle", + Uri: server.URL + "/files/shizzle", } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsDeleteResponse + var response routes.FilesDeleteResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) So(response.Success, ShouldBeFalse) - So(response.Message, ShouldEqual, "Attachment not found") + So(response.Message, ShouldEqual, "File not found") }) }) } diff --git a/routes/init_test.go b/routes/init_test.go index 227cfbb..1dbb6ff 100644 --- a/routes/init_test.go +++ b/routes/init_test.go @@ -27,7 +27,8 @@ func init() { RedisAddress: "127.0.0.1:6379", - NATSAddress: "nats://127.0.0.1:4222", + NSQdAddress: "127.0.0.1:4150", + LookupdAddress: "127.0.0.1:4160", RethinkDBAddress: "127.0.0.1:28015", RethinkDBKey: "", diff --git a/routes/labels.go b/routes/labels.go index daed582..579cb77 100644 --- a/routes/labels.go +++ b/routes/labels.go @@ -34,6 +34,39 @@ func LabelsList(c web.C, w http.ResponseWriter, r *http.Request) { return } + for _, label := range labels { + totalCount, err := env.Threads.CountByLabel(label.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "label": label.ID, + }).Error("Unable to fetch total threads count") + + utils.JSONResponse(w, 500, &LabelsListResponse{ + Success: false, + Message: "Internal error (code LA/LI/02)", + }) + return + } + + unreadCount, err := env.Threads.CountByLabelUnread(label.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "label": label.ID, + }).Error("Unable to fetch unread threads count") + + utils.JSONResponse(w, 500, &LabelsListResponse{ + Success: false, + Message: "Internal error (code LA/LI/03)", + }) + return + } + + label.TotalThreadsCount = totalCount + label.UnreadThreadsCount = unreadCount + } + utils.JSONResponse(w, 200, &LabelsListResponse{ Success: true, Labels: &labels, @@ -144,6 +177,37 @@ func LabelsGet(c web.C, w http.ResponseWriter, r *http.Request) { return } + totalCount, err := env.Threads.CountByLabel(label.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "label": label.ID, + }).Error("Unable to fetch total threads count") + + utils.JSONResponse(w, 500, &LabelsListResponse{ + Success: false, + Message: "Internal error (code LA/GE/01)", + }) + return + } + + unreadCount, err := env.Threads.CountByLabelUnread(label.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "label": label.ID, + }).Error("Unable to fetch unread threads count") + + utils.JSONResponse(w, 500, &LabelsListResponse{ + Success: false, + Message: "Internal error (code LA/GE/01)", + }) + return + } + + label.TotalThreadsCount = totalCount + label.UnreadThreadsCount = unreadCount + // Write the label to the response utils.JSONResponse(w, 200, &LabelsGetResponse{ Success: true, diff --git a/routes/threads.go b/routes/threads.go index b80bc48..95b4484 100644 --- a/routes/threads.go +++ b/routes/threads.go @@ -27,15 +27,15 @@ func ThreadsList(c web.C, w http.ResponseWriter, r *http.Request) { session := c.Env["token"].(*models.Token) var ( - query = r.URL.Query() - sortRaw = query.Get("sort") - offsetRaw = query.Get("offset") - limitRaw = query.Get("limit") - attachmentsCount = query.Get("attachments_count") - label = query.Get("label") - sort []string - offset int - limit int + query = r.URL.Query() + sortRaw = query.Get("sort") + offsetRaw = query.Get("offset") + limitRaw = query.Get("limit") + countFiles = query.Get("count_files") + label = query.Get("label") + sort []string + offset int + limit int ) if offsetRaw != "" { @@ -105,14 +105,14 @@ func ThreadsList(c web.C, w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Total-Count", strconv.Itoa(count)) } - if attachmentsCount == "true" || attachmentsCount == "1" { + if countFiles == "true" || countFiles == "1" { for _, thread := range threads { - count, err := env.Attachments.CountByThread(thread.ID) + count, err := env.Files.CountByThread(thread.ID) if err != nil { env.Log.WithFields(logrus.Fields{ "error": err.Error(), "thread": thread.ID, - }).Error("Unable to count attachments per thread") + }).Error("Unable to count files per thread") utils.JSONResponse(w, 500, &ThreadsListResponse{ Success: false, @@ -121,7 +121,7 @@ func ThreadsList(c web.C, w http.ResponseWriter, r *http.Request) { return } - thread.AttachmentsCount = &count + thread.FilesCount = &count } } @@ -160,6 +160,16 @@ func ThreadsGet(c web.C, w http.ResponseWriter, r *http.Request) { return } + manifest, err := env.Emails.GetThreadManifest(thread.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": thread.ID, + }).Error("Unable to get a manifest") + } else { + thread.Manifest = manifest + } + var emails []*models.Email if ok := r.URL.Query().Get("list_emails"); ok == "true" || ok == "1" { emails, err = env.Emails.GetByThread(thread.ID) @@ -177,22 +187,22 @@ func ThreadsGet(c web.C, w http.ResponseWriter, r *http.Request) { } } - if ok := r.URL.Query().Get("attachments_count"); ok == "true" || ok == "1" { - count, err := env.Attachments.CountByThread(thread.ID) + if ok := r.URL.Query().Get("count_files"); ok == "true" || ok == "1" { + count, err := env.Files.CountByThread(thread.ID) if err != nil { env.Log.WithFields(logrus.Fields{ "error": err.Error(), "id": thread.ID, - }).Error("Unable to count attachments linked to a thread") + }).Error("Unable to count files linked to a thread") utils.JSONResponse(w, 500, &ThreadsGetResponse{ Success: false, - Message: "Unable to count attachments", + Message: "Unable to count files", }) return } - thread.AttachmentsCount = &count + thread.FilesCount = &count } utils.JSONResponse(w, 200, &ThreadsGetResponse{ @@ -252,7 +262,7 @@ func ThreadsUpdate(c web.C, w http.ResponseWriter, r *http.Request) { return } - if !reflect.DeepEqual(thread.Labels, input.Labels) { + if thread.Labels != nil && !reflect.DeepEqual(thread.Labels, input.Labels) { thread.Labels = input.Labels } diff --git a/setup/setup.go b/setup/setup.go index b62e5f2..19c320f 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -5,13 +5,14 @@ import ( "io" "net/http" "net/http/httptest" + "os" "regexp" "strings" "sync" "time" "github.com/Sirupsen/logrus" - "github.com/apcera/nats" + "github.com/bitly/go-nsq" "github.com/dancannon/gorethink" "github.com/johntdyer/slackrus" "github.com/pzduniak/glogrus" @@ -206,42 +207,63 @@ func PrepareMux(flags *env.Flags) *web.Mux { Emails: env.Emails, //Cache: redis, } - env.Attachments = &db.AttachmentsTable{ + env.Files = &db.FilesTable{ RethinkCRUD: db.NewCRUDTable( rethinkSession, rethinkOpts.Database, - "attachments", + "files", ), } - // NATS queue connection - nc, err := nats.Connect(flags.NATSAddress) + // Create a producer + producer, err := nsq.NewProducer(flags.NSQdAddress, nsq.NewConfig()) if err != nil { env.Log.WithFields(logrus.Fields{ - "error": err, - "address": flags.NATSAddress, - }).Fatal("Unable to connect to NATS") + "error": err.Error(), + }).Fatal("Unable to create a new nsq producer") } - c, err := nats.NewEncodedConn(nc, "json") + /*defer func(producer *nsq.Producer) { + producer.Stop() + }(producer)*/ + + env.Producer = producer + + // Get the hostname + hostname, err := os.Hostname() + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + }).Fatal("Unable to get the hostname") + } + + // Create a delivery consumer + deliveryConsumer, err := nsq.NewConsumer("email_delivery", hostname, nsq.NewConfig()) if err != nil { env.Log.WithFields(logrus.Fields{ - "error": err, - "address": flags.NATSAddress, - }).Fatal("Unable to initialize a JSON NATS connection") + "error": err.Error(), + "topic": "email_delivery", + }).Fatal("Unable to create a new nsq consumer") } + //defer deliveryConsumer.Stop() + + deliveryConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(m *nsq.Message) error { + var msg *struct { + ID string `json:"id"` + Owner string `json:"owner"` + } + + if err := json.Unmarshal(m.Body, &msg); err != nil { + return err + } - c.Subscribe("delivery", func(msg *struct { - ID string `json:"id"` - Owner string `json:"owner"` - }) { // Check if we are handling owner's session if _, ok := sessions[msg.Owner]; !ok { - return + return nil } if len(sessions[msg.Owner]) == 0 { - return + return nil } // Resolve the email @@ -251,15 +273,28 @@ func PrepareMux(flags *env.Flags) *web.Mux { "error": err.Error(), "id": msg.ID, }).Error("Unable to resolve an email from queue") - return + return nil + } + + // Resolve the thread + thread, err := env.Threads.GetThread(email.Thread) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": msg.ID, + "thread": email.Thread, + }).Error("Unable to resolve a thread from queue") + return nil } // Send notifications to subscribers for _, session := range sessions[msg.Owner] { result, _ := json.Marshal(map[string]interface{}{ - "type": "delivery", - "id": msg.ID, - "name": email.Name, + "type": "delivery", + "id": msg.ID, + "name": email.Name, + "thread": email.Thread, + "labels": thread.Labels, }) err = session.Send(string(result)) if err != nil { @@ -269,20 +304,43 @@ func PrepareMux(flags *env.Flags) *web.Mux { }).Warn("Error while writing to a WebSocket") } } - }) - c.Subscribe("receipt", func(msg *struct { - ID string `json:"id"` - Owner string `json:"owner"` - }) { - log.Print(msg) + return nil + }), 10) + + if err := deliveryConsumer.ConnectToNSQLookupd(flags.LookupdAddress); err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + }).Fatal("Unable to connect to nsqlookupd") + } + + // Create a receipt consumer + receiptConsumer, err := nsq.NewConsumer("email_receipt", hostname, nsq.NewConfig()) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "topic": "email_receipt", + }).Fatal("Unable to create a new nsq consumer") + } + //defer receiptConsumer.Stop() + + receiptConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(m *nsq.Message) error { + var msg *struct { + ID string `json:"id"` + Owner string `json:"owner"` + } + + if err := json.Unmarshal(m.Body, &msg); err != nil { + return err + } + // Check if we are handling owner's session if _, ok := sessions[msg.Owner]; !ok { - return + return nil } if len(sessions[msg.Owner]) == 0 { - return + return nil } // Resolve the email @@ -292,15 +350,28 @@ func PrepareMux(flags *env.Flags) *web.Mux { "error": err.Error(), "id": msg.ID, }).Error("Unable to resolve an email from queue") - return + return nil + } + + // Resolve the thread + thread, err := env.Threads.GetThread(email.Thread) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": msg.ID, + "thread": email.Thread, + }).Error("Unable to resolve a thread from queue") + return nil } // Send notifications to subscribers for _, session := range sessions[msg.Owner] { result, _ := json.Marshal(map[string]interface{}{ - "type": "receipt", - "id": msg.ID, - "name": email.Name, + "type": "receipt", + "id": msg.ID, + "name": email.Name, + "thread": email.Thread, + "labels": thread.Labels, }) err = session.Send(string(result)) if err != nil { @@ -310,9 +381,15 @@ func PrepareMux(flags *env.Flags) *web.Mux { }).Warn("Error while writing to a WebSocket") } } - }) - env.NATS = c + return nil + }), 10) + + if err := receiptConsumer.ConnectToNSQLookupd(flags.LookupdAddress); err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + }).Fatal("Unable to connect to nsqlookupd") + } // Create a new goji mux mux := web.New() @@ -386,13 +463,6 @@ func PrepareMux(flags *env.Flags) *web.Mux { // Index route mux.Get("/", routes.Hello) - // Attachments - auth.Get("/attachments", routes.AttachmentsList) - auth.Post("/attachments", routes.AttachmentsCreate) - auth.Get("/attachments/:id", routes.AttachmentsGet) - auth.Put("/attachments/:id", routes.AttachmentsUpdate) - auth.Delete("/attachments/:id", routes.AttachmentsDelete) - // Accounts auth.Get("/accounts", routes.AccountsList) mux.Post("/accounts", routes.AccountsCreate) @@ -405,6 +475,13 @@ func PrepareMux(flags *env.Flags) *web.Mux { mux.Get(regexp.MustCompile(`/avatars/(?P[\S\s]*?)\.(?Psvg|png)(?:[\S\s]*?)$`), routes.Avatars) //mux.Get("/avatars/:hash.:ext", routes.Avatars) + // Files + auth.Get("/files", routes.FilesList) + auth.Post("/files", routes.FilesCreate) + auth.Get("/files/:id", routes.FilesGet) + auth.Put("/files/:id", routes.FilesUpdate) + auth.Delete("/files/:id", routes.FilesDelete) + // Tokens auth.Get("/tokens", routes.TokensGet) auth.Get("/tokens/:id", routes.TokensGet) diff --git a/setup/setup_test.go_ b/setup/setup_test.go_ index c10f95d..e44739d 100644 --- a/setup/setup_test.go_ +++ b/setup/setup_test.go_ @@ -19,7 +19,8 @@ func TestSetup(t *testing.T) { RedisAddress: "127.0.0.1:6379", - NATSAddress: "nats://127.0.0.1:4222", + NSQAddress: "127.0.0.1:4150", + LookupdAddress: "127.0.0.1:4160", RethinkDBAddress: "127.0.0.1:28015", RethinkDBKey: "",