From 9ce4ba8e36d67a8fd97b709461e9f52f8004c70d Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sun, 8 Feb 2015 13:52:15 +0100 Subject: [PATCH 01/33] Renamed attachments to files --- db/table_attachments.go | 95 ------ db/table_files.go | 95 ++++++ env/env.go | 4 +- models/attachment.go | 14 - models/email.go | 2 +- models/file.go | 10 + models/thread.go | 2 +- routes/attachments.go | 310 ------------------ routes/emails.go | 18 +- routes/files.go | 310 ++++++++++++++++++ routes/{attachments_test.go => files_test.go} | 58 ++-- routes/threads.go | 36 +- setup/setup.go | 18 +- 13 files changed, 484 insertions(+), 488 deletions(-) delete mode 100644 db/table_attachments.go create mode 100644 db/table_files.go delete mode 100644 models/attachment.go create mode 100644 models/file.go delete mode 100644 routes/attachments.go create mode 100644 routes/files.go rename routes/{attachments_test.go => files_test.go} (81%) diff --git a/db/table_attachments.go b/db/table_attachments.go deleted file mode 100644 index 165af2a..0000000 --- a/db/table_attachments.go +++ /dev/null @@ -1,95 +0,0 @@ -package db - -import ( - "github.com/lavab/api/models" - - "github.com/dancannon/gorethink" -) - -type AttachmentsTable struct { - RethinkCRUD - Emails *EmailsTable -} - -func (a *AttachmentsTable) GetAttachment(id string) (*models.Attachment, error) { - var result models.Attachment - - if err := a.FindFetchOne(id, &result); err != nil { - return nil, err - } - - return &result, nil -} - -func (a *AttachmentsTable) GetOwnedBy(id string) ([]*models.Attachment, error) { - var result []*models.Attachment - - err := a.WhereAndFetch(map[string]interface{}{ - "owner": id, - }, &result) - if err != nil { - return nil, err - } - - return result, nil -} - -func (a *AttachmentsTable) DeleteOwnedBy(id string) error { - return a.Delete(map[string]interface{}{ - "owner": id, - }) -} - -func (a *AttachmentsTable) GetEmailAttachments(id string) ([]*models.Attachment, error) { - email, err := a.Emails.GetEmail(id) - if err != nil { - return nil, err - } - - query, err := a.Emails.GetTable().Filter(func(row gorethink.Term) gorethink.Term { - return gorethink.Expr(email.Attachments).Contains(row.Field("id")) - }).GetAll().Run(a.Emails.GetSession()) - if err != nil { - return nil, err - } - - var result []*models.Attachment - err = query.All(&result) - if err != nil { - return nil, err - } - - return result, nil -} - -func (a *AttachmentsTable) CountByEmail(id string) (int, error) { - query, err := a.GetTable().GetAllByIndex("owner", id).Count().Run(a.GetSession()) - if err != nil { - return 0, err - } - - var result int - err = query.One(&result) - if err != nil { - return 0, err - } - - return result, nil -} - -func (a *AttachmentsTable) CountByThread(id ...interface{}) (int, error) { - query, err := a.GetTable().Filter(func(row gorethink.Term) gorethink.Term { - return gorethink.Table("emails").GetAllByIndex("owner", id...).Field("attachments").Contains(row.Field("id")) - }).Count().Run(a.GetSession()) - if err != nil { - return 0, err - } - - var result int - err = query.One(&result) - if err != nil { - return 0, err - } - - return result, nil -} diff --git a/db/table_files.go b/db/table_files.go new file mode 100644 index 0000000..4dc2c3d --- /dev/null +++ b/db/table_files.go @@ -0,0 +1,95 @@ +package db + +import ( + "github.com/lavab/api/models" + + "github.com/dancannon/gorethink" +) + +type FilesTable struct { + RethinkCRUD + Emails *EmailsTable +} + +func (f *FilesTable) GetFile(id string) (*models.File, error) { + var result models.File + + if err := f.FindFetchOne(id, &result); err != nil { + return nil, err + } + + return &result, nil +} + +func (f *FilesTable) GetOwnedBy(id string) ([]*models.File, error) { + var result []*models.File + + err := f.WhereAndFetch(map[string]interface{}{ + "owner": id, + }, &result) + if err != nil { + return nil, err + } + + return result, nil +} + +func (f *FilesTable) DeleteOwnedBy(id string) error { + return f.Delete(map[string]interface{}{ + "owner": id, + }) +} + +func (f *FilesTable) GetEmailFiles(id string) ([]*models.File, error) { + email, err := f.Emails.GetEmail(id) + if err != nil { + return nil, err + } + + query, err := f.Emails.GetTable().Filter(func(row gorethink.Term) gorethink.Term { + return gorethink.Expr(email.Files).Contains(row.Field("id")) + }).GetAll().Run(f.Emails.GetSession()) + if err != nil { + return nil, err + } + + var result []*models.File + err = query.All(&result) + if err != nil { + return nil, err + } + + return result, nil +} + +func (f *FilesTable) CountByEmail(id string) (int, error) { + query, err := f.GetTable().GetAllByIndex("owner", id).Count().Run(f.GetSession()) + if err != nil { + return 0, err + } + + var result int + err = query.One(&result) + if err != nil { + return 0, err + } + + return result, nil +} + +func (f *FilesTable) CountByThread(id ...interface{}) (int, error) { + query, err := f.GetTable().Filter(func(row gorethink.Term) gorethink.Term { + return gorethink.Table("emails").GetAllByIndex("owner", id...).Field("files").Contains(row.Field("id")) + }).Count().Run(f.GetSession()) + if err != nil { + return 0, err + } + + var result int + err = query.One(&result) + if err != nil { + return 0, err + } + + return result, nil +} diff --git a/env/env.go b/env/env.go index da9ba66..9a40254 100644 --- a/env/env.go +++ b/env/env.go @@ -33,8 +33,8 @@ var ( Emails *db.EmailsTable // Labels is the global instance of LabelsTable Labels *db.LabelsTable - // Attachments is the global instance of AttachmentsTable - Attachments *db.AttachmentsTable + // Files is the global instance of FilesTable + Files *db.FilesTable // Threads is the global instance of ThreadsTable Threads *db.ThreadsTable // Factors contains all currently registered factors diff --git a/models/attachment.go b/models/attachment.go deleted file mode 100644 index b4c0e61..0000000 --- a/models/attachment.go +++ /dev/null @@ -1,14 +0,0 @@ -package models - -// Attachment is an encrypted file stored by Lavaboom -type Attachment struct { - Encrypted - Resource - - // Mime is the Internet media type of the attachment - // Format: "type/subtype" – more info: en.wikipedia.org/wiki/Internet_media_type - MIME string `json:"mime" gorethink:"mime"` - - // Size is the size of the file in bytes i.e. len(file.Data) - Size int `json:"size" gorethink:"size"` -} diff --git a/models/email.go b/models/email.go index ac6839c..5610f2e 100644 --- a/models/email.go +++ b/models/email.go @@ -19,7 +19,7 @@ type Email struct { // AttachmentsIDs is a slice of the FileIDs associated with this email // For uploading attachments see `POST /upload` - Attachments []string `json:"attachments" gorethink:"attachments"` + Files []string `json:"files" gorethink:"files"` // Body contains all the data needed to send this email Body Encrypted `json:"body" gorethink:"body"` diff --git a/models/file.go b/models/file.go new file mode 100644 index 0000000..ad1ec67 --- /dev/null +++ b/models/file.go @@ -0,0 +1,10 @@ +package models + +// File is an encrypted file stored by Lavaboom +type File struct { + Encrypted + Resource + + Metadata Encrypted `json:"encrypted" gorethink:"encrypted"` + Body Encrypted `json:"body" gorethink:"body"` +} diff --git a/models/thread.go b/models/thread.go index f5f2392..de9d2c5 100644 --- a/models/thread.go +++ b/models/thread.go @@ -17,5 +17,5 @@ type Thread struct { IsRead bool `json:"is_read" gorethink:"is_read"` LastRead string `json:"last_read" gorethink:"last_read"` - AttachmentsCount *int `json:"attachments_count,omitempty" gorethink:"attachments_count,omitempty"` + FilesCount *int `json:"files_count,omitempty" gorethink:"-"` } diff --git a/routes/attachments.go b/routes/attachments.go deleted file mode 100644 index ca86e99..0000000 --- a/routes/attachments.go +++ /dev/null @@ -1,310 +0,0 @@ -package routes - -import ( - "net/http" - - "github.com/Sirupsen/logrus" - "github.com/zenazn/goji/web" - - "github.com/lavab/api/env" - "github.com/lavab/api/models" - "github.com/lavab/api/utils" -) - -type AttachmentsListResponse struct { - Success bool `json:"success"` - Message string `json:"message,omitempty"` - Attachments *[]*models.Attachment `json:"attachments,omitempty"` -} - -func AttachmentsList(c web.C, w http.ResponseWriter, r *http.Request) { - session := c.Env["token"].(*models.Token) - - attachments, err := env.Attachments.GetOwnedBy(session.Owner) - if err != nil { - env.Log.WithFields(logrus.Fields{ - "error": err.Error(), - }).Error("Unable to fetch attachments") - - utils.JSONResponse(w, 500, &AttachmentsListResponse{ - Success: false, - Message: "Internal error (code AT/LI/01)", - }) - return - } - - utils.JSONResponse(w, 200, &AttachmentsListResponse{ - Success: true, - Attachments: &attachments, - }) -} - -type AttachmentsCreateRequest struct { - Data string `json:"data" schema:"data"` - Name string `json:"name" schema:"name"` - Encoding string `json:"encoding" schema:"encoding"` - VersionMajor int `json:"version_major" schema:"version_major"` - VersionMinor int `json:"version_minor" schema:"version_minor"` - PGPFingerprints []string `json:"pgp_fingerprints" schema:"pgp_fingerprints"` -} - -type AttachmentsCreateResponse struct { - Success bool `json:"success"` - Message string `json:"message"` - Attachment *models.Attachment `json:"attachment,omitempty"` -} - -// AttachmentsCreate creates a new attachment -func AttachmentsCreate(c web.C, w http.ResponseWriter, r *http.Request) { - // Decode the request - var input AttachmentsCreateRequest - err := utils.ParseRequest(r, &input) - if err != nil { - env.Log.WithFields(logrus.Fields{ - "error": err.Error(), - }).Warn("Unable to decode a request") - - utils.JSONResponse(w, 400, &AttachmentsCreateResponse{ - Success: false, - Message: "Invalid input format", - }) - return - } - - // Fetch the current session from the middleware - session := c.Env["token"].(*models.Token) - - // Ensure that the input data isn't empty - if input.Data == "" || input.Name == "" || input.Encoding == "" || - input.PGPFingerprints == nil || len(input.PGPFingerprints) == 0 { - utils.JSONResponse(w, 400, &AttachmentsCreateResponse{ - Success: false, - Message: "Invalid request", - }) - return - } - - // Create a new attachment struct - attachment := &models.Attachment{ - Encrypted: models.Encrypted{ - Encoding: input.Encoding, - Data: input.Data, - Schema: "attachment", - VersionMajor: input.VersionMajor, - VersionMinor: input.VersionMinor, - PGPFingerprints: input.PGPFingerprints, - }, - Resource: models.MakeResource(session.Owner, input.Name), - } - - // Insert the attachment into the database - if err := env.Attachments.Insert(attachment); err != nil { - utils.JSONResponse(w, 500, &AttachmentsCreateResponse{ - Success: false, - Message: "internal server error - AT/CR/01", - }) - - env.Log.WithFields(logrus.Fields{ - "error": err.Error(), - }).Error("Could not insert a attachment into the database") - return - } - - utils.JSONResponse(w, 201, &AttachmentsCreateResponse{ - Success: true, - Message: "A new attachment was successfully created", - Attachment: attachment, - }) -} - -// AttachmentsGetResponse contains the result of the AttachmentsGet request. -type AttachmentsGetResponse struct { - Success bool `json:"success"` - Message string `json:"message,omitempty"` - Attachment *models.Attachment `json:"attachment,omitempty"` -} - -// AttachmentsGet gets the requested attachment from the database -func AttachmentsGet(c web.C, w http.ResponseWriter, r *http.Request) { - // Get the attachment from the database - attachment, err := env.Attachments.GetAttachment(c.URLParams["id"]) - if err != nil { - utils.JSONResponse(w, 404, &AttachmentsGetResponse{ - Success: false, - Message: "Attachment not found", - }) - return - } - - // Fetch the current session from the middleware - session := c.Env["token"].(*models.Token) - - // Check for ownership - if attachment.Owner != session.Owner { - utils.JSONResponse(w, 404, &AttachmentsGetResponse{ - Success: false, - Message: "Attachment not found", - }) - return - } - - // Write the attachment to the response - utils.JSONResponse(w, 200, &AttachmentsGetResponse{ - Success: true, - Attachment: attachment, - }) -} - -// AttachmentsUpdateRequest is the payload passed to PUT /contacts/:id -type AttachmentsUpdateRequest struct { - Data string `json:"data" schema:"data"` - Name string `json:"name" schema:"name"` - Encoding string `json:"encoding" schema:"encoding"` - VersionMajor *int `json:"version_major" schema:"version_major"` - VersionMinor *int `json:"version_minor" schema:"version_minor"` - PGPFingerprints []string `json:"pgp_fingerprints" schema:"pgp_fingerprints"` -} - -// AttachmentsUpdateResponse contains the result of the AttachmentsUpdate request. -type AttachmentsUpdateResponse struct { - Success bool `json:"success"` - Message string `json:"message,omitempty"` - Attachment *models.Attachment `json:"attachment,omitempty"` -} - -// AttachmentsUpdate updates an existing attachment in the database -func AttachmentsUpdate(c web.C, w http.ResponseWriter, r *http.Request) { - // Decode the request - var input AttachmentsUpdateRequest - err := utils.ParseRequest(r, &input) - if err != nil { - env.Log.WithFields(logrus.Fields{ - "error": err.Error(), - }).Warn("Unable to decode a request") - - utils.JSONResponse(w, 400, &AttachmentsUpdateResponse{ - Success: false, - Message: "Invalid input format", - }) - return - } - - // Get the attachment from the database - attachment, err := env.Attachments.GetAttachment(c.URLParams["id"]) - if err != nil { - utils.JSONResponse(w, 404, &AttachmentsUpdateResponse{ - Success: false, - Message: "Attachment not found", - }) - return - } - - // Fetch the current session from the middleware - session := c.Env["token"].(*models.Token) - - // Check for ownership - if attachment.Owner != session.Owner { - utils.JSONResponse(w, 404, &AttachmentsUpdateResponse{ - Success: false, - Message: "Attachment not found", - }) - return - } - - if input.Data != "" { - attachment.Data = input.Data - } - - if input.Name != "" { - attachment.Name = input.Name - } - - if input.Encoding != "" { - attachment.Encoding = input.Encoding - } - - if input.VersionMajor != nil { - attachment.VersionMajor = *input.VersionMajor - } - - if input.VersionMinor != nil { - attachment.VersionMinor = *input.VersionMinor - } - - if input.PGPFingerprints != nil { - attachment.PGPFingerprints = input.PGPFingerprints - } - - // Perform the update - err = env.Attachments.UpdateID(c.URLParams["id"], attachment) - if err != nil { - env.Log.WithFields(logrus.Fields{ - "error": err.Error(), - "id": c.URLParams["id"], - }).Error("Unable to update a attachment") - - utils.JSONResponse(w, 500, &AttachmentsUpdateResponse{ - Success: false, - Message: "Internal error (code AT/UP/01)", - }) - return - } - - // Write the attachment to the response - utils.JSONResponse(w, 200, &AttachmentsUpdateResponse{ - Success: true, - Attachment: attachment, - }) -} - -// AttachmentsDeleteResponse contains the result of the Delete request. -type AttachmentsDeleteResponse struct { - Success bool `json:"success"` - Message string `json:"message"` -} - -// AttachmentsDelete removes a attachment from the database -func AttachmentsDelete(c web.C, w http.ResponseWriter, r *http.Request) { - // Get the attachment from the database - attachment, err := env.Attachments.GetAttachment(c.URLParams["id"]) - if err != nil { - utils.JSONResponse(w, 404, &AttachmentsDeleteResponse{ - Success: false, - Message: "Attachment not found", - }) - return - } - - // Fetch the current session from the middleware - session := c.Env["token"].(*models.Token) - - // Check for ownership - if attachment.Owner != session.Owner { - utils.JSONResponse(w, 404, &AttachmentsDeleteResponse{ - Success: false, - Message: "Attachment not found", - }) - return - } - - // Perform the deletion - err = env.Attachments.DeleteID(c.URLParams["id"]) - if err != nil { - env.Log.WithFields(logrus.Fields{ - "error": err.Error(), - "id": c.URLParams["id"], - }).Error("Unable to delete a attachment") - - utils.JSONResponse(w, 500, &AttachmentsDeleteResponse{ - Success: false, - Message: "Internal error (code AT/DE/01)", - }) - return - } - - // Write the attachment to the response - utils.JSONResponse(w, 200, &AttachmentsDeleteResponse{ - Success: true, - Message: "Attachment successfully removed", - }) -} diff --git a/routes/emails.go b/routes/emails.go index def605d..42f27cf 100644 --- a/routes/emails.go +++ b/routes/emails.go @@ -138,7 +138,7 @@ type EmailsCreateRequest struct { PreviewVersionMajor int `json:"preview_version_major"` PreviewVersionMinor int `json:"preview_version_minor"` Encoding string `json:"encoding"` - Attachments []string `json:"attachments"` + Files []string `json:"files"` PGPFingerprints []string `json:"pgp_fingerprints"` } @@ -260,14 +260,14 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { // Create a new email struct email := &models.Email{ - Kind: "sent", - From: []string{account.Name + "@" + env.Config.EmailDomain}, - To: input.To, - CC: input.CC, - BCC: input.BCC, - Resource: emailResource, - Attachments: input.Attachments, - Thread: input.Thread, + Kind: "sent", + From: []string{account.Name + "@" + env.Config.EmailDomain}, + To: input.To, + CC: input.CC, + BCC: input.BCC, + Resource: emailResource, + Files: input.Files, + Thread: input.Thread, Body: models.Encrypted{ Encoding: "json", PGPFingerprints: input.PGPFingerprints, diff --git a/routes/files.go b/routes/files.go new file mode 100644 index 0000000..8f9de53 --- /dev/null +++ b/routes/files.go @@ -0,0 +1,310 @@ +package routes + +import ( + "net/http" + + "github.com/Sirupsen/logrus" + "github.com/zenazn/goji/web" + + "github.com/lavab/api/env" + "github.com/lavab/api/models" + "github.com/lavab/api/utils" +) + +type FilesListResponse struct { + Success bool `json:"success"` + Message string `json:"message,omitempty"` + Files *[]*models.File `json:"files,omitempty"` +} + +func FilesList(c web.C, w http.ResponseWriter, r *http.Request) { + session := c.Env["token"].(*models.Token) + + files, err := env.Files.GetOwnedBy(session.Owner) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + }).Error("Unable to fetch files") + + utils.JSONResponse(w, 500, &FilesListResponse{ + Success: false, + Message: "Internal error (code FI/LI/01)", + }) + return + } + + utils.JSONResponse(w, 200, &FilesListResponse{ + Success: true, + Files: &files, + }) +} + +type FilesCreateRequest struct { + Data string `json:"data" schema:"data"` + Name string `json:"name" schema:"name"` + Encoding string `json:"encoding" schema:"encoding"` + VersionMajor int `json:"version_major" schema:"version_major"` + VersionMinor int `json:"version_minor" schema:"version_minor"` + PGPFingerprints []string `json:"pgp_fingerprints" schema:"pgp_fingerprints"` +} + +type FilesCreateResponse struct { + Success bool `json:"success"` + Message string `json:"message"` + File *models.File `json:"file,omitempty"` +} + +// FilesCreate creates a new file +func FilesCreate(c web.C, w http.ResponseWriter, r *http.Request) { + // Decode the request + var input FilesCreateRequest + err := utils.ParseRequest(r, &input) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + }).Warn("Unable to decode a request") + + utils.JSONResponse(w, 400, &FilesCreateResponse{ + Success: false, + Message: "Invalid input format", + }) + return + } + + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Ensure that the input data isn't empty + if input.Data == "" || input.Name == "" || input.Encoding == "" || + input.PGPFingerprints == nil || len(input.PGPFingerprints) == 0 { + utils.JSONResponse(w, 400, &FilesCreateResponse{ + Success: false, + Message: "Invalid request", + }) + return + } + + // Create a new file struct + file := &models.File{ + Encrypted: models.Encrypted{ + Encoding: input.Encoding, + Data: input.Data, + Schema: "file", + VersionMajor: input.VersionMajor, + VersionMinor: input.VersionMinor, + PGPFingerprints: input.PGPFingerprints, + }, + Resource: models.MakeResource(session.Owner, input.Name), + } + + // Insert the file into the database + if err := env.Files.Insert(file); err != nil { + utils.JSONResponse(w, 500, &FilesCreateResponse{ + Success: false, + Message: "internal server error - FI/CR/01", + }) + + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + }).Error("Could not insert a file into the database") + return + } + + utils.JSONResponse(w, 201, &FilesCreateResponse{ + Success: true, + Message: "A new file was successfully created", + File: file, + }) +} + +// FilesGetResponse contains the result of the FilesGet request. +type FilesGetResponse struct { + Success bool `json:"success"` + Message string `json:"message,omitempty"` + File *models.File `json:"file,omitempty"` +} + +// FilesGet gets the requested file from the database +func FilesGet(c web.C, w http.ResponseWriter, r *http.Request) { + // Get the file from the database + file, err := env.Files.GetFile(c.URLParams["id"]) + if err != nil { + utils.JSONResponse(w, 404, &FilesGetResponse{ + Success: false, + Message: "File not found", + }) + return + } + + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Check for ownership + if file.Owner != session.Owner { + utils.JSONResponse(w, 404, &FilesGetResponse{ + Success: false, + Message: "File not found", + }) + return + } + + // Write the file to the response + utils.JSONResponse(w, 200, &FilesGetResponse{ + Success: true, + File: file, + }) +} + +// FilesUpdateRequest is the payload passed to PUT /files/:id +type FilesUpdateRequest struct { + Data string `json:"data" schema:"data"` + Name string `json:"name" schema:"name"` + Encoding string `json:"encoding" schema:"encoding"` + VersionMajor *int `json:"version_major" schema:"version_major"` + VersionMinor *int `json:"version_minor" schema:"version_minor"` + PGPFingerprints []string `json:"pgp_fingerprints" schema:"pgp_fingerprints"` +} + +// FilesUpdateResponse contains the result of the FilesUpdate request. +type FilesUpdateResponse struct { + Success bool `json:"success"` + Message string `json:"message,omitempty"` + File *models.File `json:"file,omitempty"` +} + +// FilesUpdate updates an existing file in the database +func FilesUpdate(c web.C, w http.ResponseWriter, r *http.Request) { + // Decode the request + var input FilesUpdateRequest + err := utils.ParseRequest(r, &input) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + }).Warn("Unable to decode a request") + + utils.JSONResponse(w, 400, &FilesUpdateResponse{ + Success: false, + Message: "Invalid input format", + }) + return + } + + // Get the file from the database + file, err := env.Files.GetFile(c.URLParams["id"]) + if err != nil { + utils.JSONResponse(w, 404, &FilesUpdateResponse{ + Success: false, + Message: "File not found", + }) + return + } + + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Check for ownership + if file.Owner != session.Owner { + utils.JSONResponse(w, 404, &FilesUpdateResponse{ + Success: false, + Message: "File not found", + }) + return + } + + if input.Data != "" { + file.Data = input.Data + } + + if input.Name != "" { + file.Name = input.Name + } + + if input.Encoding != "" { + file.Encoding = input.Encoding + } + + if input.VersionMajor != nil { + file.VersionMajor = *input.VersionMajor + } + + if input.VersionMinor != nil { + file.VersionMinor = *input.VersionMinor + } + + if input.PGPFingerprints != nil { + file.PGPFingerprints = input.PGPFingerprints + } + + // Perform the update + err = env.Files.UpdateID(c.URLParams["id"], file) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": c.URLParams["id"], + }).Error("Unable to update a file") + + utils.JSONResponse(w, 500, &FilesUpdateResponse{ + Success: false, + Message: "Internal error (code FI/UP/01)", + }) + return + } + + // Write the file to the response + utils.JSONResponse(w, 200, &FilesUpdateResponse{ + Success: true, + File: file, + }) +} + +// FilesDeleteResponse contains the result of the Delete request. +type FilesDeleteResponse struct { + Success bool `json:"success"` + Message string `json:"message"` +} + +// FilesDelete removes a file from the database +func FilesDelete(c web.C, w http.ResponseWriter, r *http.Request) { + // Get the file from the database + file, err := env.Files.GetFile(c.URLParams["id"]) + if err != nil { + utils.JSONResponse(w, 404, &FilesDeleteResponse{ + Success: false, + Message: "File not found", + }) + return + } + + // Fetch the current session from the middleware + session := c.Env["token"].(*models.Token) + + // Check for ownership + if file.Owner != session.Owner { + utils.JSONResponse(w, 404, &FilesDeleteResponse{ + Success: false, + Message: "File not found", + }) + return + } + + // Perform the deletion + err = env.Files.DeleteID(c.URLParams["id"]) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": c.URLParams["id"], + }).Error("Unable to delete a file") + + utils.JSONResponse(w, 500, &FilesDeleteResponse{ + Success: false, + Message: "Internal error (code FI/DE/01)", + }) + return + } + + // Write the file to the response + utils.JSONResponse(w, 200, &FilesDeleteResponse{ + Success: true, + Message: "File successfully removed", + }) +} diff --git a/routes/attachments_test.go b/routes/files_test.go similarity index 81% rename from routes/attachments_test.go rename to routes/files_test.go index d34db71..323b8f2 100644 --- a/routes/attachments_test.go +++ b/routes/files_test.go @@ -12,7 +12,7 @@ import ( "github.com/lavab/api/routes" ) -func TestAttachmentsRoute(t *testing.T) { +func TestFilesRoute(t *testing.T) { Convey("When uploading a new attachment", t, func() { account := &models.Account{ Resource: models.MakeResource("", "johnorange"), @@ -47,7 +47,7 @@ func TestAttachmentsRoute(t *testing.T) { Convey("Misformatted body should fail", func() { request := goreq.Request{ Method: "POST", - Uri: server.URL + "/attachments", + Uri: server.URL + "/files", ContentType: "application/json", Body: "!@#!@#", } @@ -55,7 +55,7 @@ func TestAttachmentsRoute(t *testing.T) { result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsCreateResponse + var response routes.FilesCreateResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) @@ -66,7 +66,7 @@ func TestAttachmentsRoute(t *testing.T) { Convey("Invalid set of data should fail", func() { request := goreq.Request{ Method: "POST", - Uri: server.URL + "/attachments", + Uri: server.URL + "/files", } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() @@ -83,7 +83,7 @@ func TestAttachmentsRoute(t *testing.T) { Convey("Attachment upload should succeed", func() { request := goreq.Request{ Method: "POST", - Uri: server.URL + "/attachments", + Uri: server.URL + "/files", ContentType: "application/json", Body: `{ "data": "` + uniuri.NewLen(64) + `", @@ -98,7 +98,7 @@ func TestAttachmentsRoute(t *testing.T) { result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsCreateResponse + var response routes.FilesCreateResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) @@ -111,13 +111,13 @@ func TestAttachmentsRoute(t *testing.T) { Convey("Getting that attachment should succeed", func() { request := goreq.Request{ Method: "GET", - Uri: server.URL + "/attachments/" + attachment.ID, + Uri: server.URL + "/files/" + attachment.ID, } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsGetResponse + var response routes.FilesGetResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) @@ -128,21 +128,21 @@ func TestAttachmentsRoute(t *testing.T) { Convey("The attachment should be visible on the list", func() { request := goreq.Request{ Method: "GET", - Uri: server.URL + "/attachments", + Uri: server.URL + "/files", } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsListResponse + var response routes.FilesListResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) - So(len(*response.Attachments), ShouldBeGreaterThan, 0) + So(len(*response.Files), ShouldBeGreaterThan, 0) So(response.Success, ShouldBeTrue) found := false - for _, a := range *response.Attachments { + for _, a := range *response.Files { if a.ID == attachment.ID { found = true break @@ -155,7 +155,7 @@ func TestAttachmentsRoute(t *testing.T) { Convey("Updating it should succeed", func() { request := goreq.Request{ Method: "PUT", - Uri: server.URL + "/attachments/" + attachment.ID, + Uri: server.URL + "/files/" + attachment.ID, ContentType: "application/json", Body: `{ "data": "` + uniuri.NewLen(64) + `", @@ -170,7 +170,7 @@ func TestAttachmentsRoute(t *testing.T) { result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsUpdateResponse + var response routes.FilesUpdateResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) @@ -182,13 +182,13 @@ func TestAttachmentsRoute(t *testing.T) { Convey("Deleting it should succeed", func() { request := goreq.Request{ Method: "DELETE", - Uri: server.URL + "/attachments/" + attachment.ID, + Uri: server.URL + "/files/" + attachment.ID, } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsDeleteResponse + var response routes.FilesDeleteResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) @@ -200,13 +200,13 @@ func TestAttachmentsRoute(t *testing.T) { Convey("Getting a non-existing attachment should fail", func() { request := goreq.Request{ Method: "GET", - Uri: server.URL + "/attachments/doesntexist", + Uri: server.URL + "/files/doesntexist", } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsGetResponse + var response routes.FilesGetResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) @@ -216,7 +216,7 @@ func TestAttachmentsRoute(t *testing.T) { Convey("Updating it should fail too", func() { request := goreq.Request{ Method: "PUT", - Uri: server.URL + "/attachments/doesntexist", + Uri: server.URL + "/files/doesntexist", ContentType: "application/json", Body: "{}", } @@ -224,7 +224,7 @@ func TestAttachmentsRoute(t *testing.T) { result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsGetResponse + var response routes.FilesGetResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) @@ -246,18 +246,18 @@ func TestAttachmentsRoute(t *testing.T) { Resource: models.MakeResource("nonowned", "photo.jpg"), } - err := env.Attachments.Insert(attachment) + err := env.Files.Insert(attachment) So(err, ShouldBeNil) request := goreq.Request{ Method: "GET", - Uri: server.URL + "/attachments/" + attachment.ID, + Uri: server.URL + "/files/" + attachment.ID, } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsGetResponse + var response routes.FilesGetResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) @@ -268,7 +268,7 @@ func TestAttachmentsRoute(t *testing.T) { Convey("Updating using a misformatted body should fail", func() { request := goreq.Request{ Method: "PUT", - Uri: server.URL + "/attachments/shizzle", + Uri: server.URL + "/files/shizzle", ContentType: "application/json", Body: "!@#!@#", } @@ -276,7 +276,7 @@ func TestAttachmentsRoute(t *testing.T) { result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsUpdateResponse + var response routes.FilesUpdateResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) @@ -287,7 +287,7 @@ func TestAttachmentsRoute(t *testing.T) { Convey("Updating a non-existing attachment should fail", func() { request := goreq.Request{ Method: "PUT", - Uri: server.URL + "/attachments/shizzle", + Uri: server.URL + "/files/shizzle", ContentType: "application/json", Body: "{}", } @@ -295,7 +295,7 @@ func TestAttachmentsRoute(t *testing.T) { result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsUpdateResponse + var response routes.FilesUpdateResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) @@ -306,13 +306,13 @@ func TestAttachmentsRoute(t *testing.T) { Convey("Deleting a non-existing attachment should fail", func() { request := goreq.Request{ Method: "DELETE", - Uri: server.URL + "/attachments/shizzle", + Uri: server.URL + "/files/shizzle", } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() So(err, ShouldBeNil) - var response routes.AttachmentsDeleteResponse + var response routes.FilesDeleteResponse err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) diff --git a/routes/threads.go b/routes/threads.go index b80bc48..44e1134 100644 --- a/routes/threads.go +++ b/routes/threads.go @@ -27,15 +27,15 @@ func ThreadsList(c web.C, w http.ResponseWriter, r *http.Request) { session := c.Env["token"].(*models.Token) var ( - query = r.URL.Query() - sortRaw = query.Get("sort") - offsetRaw = query.Get("offset") - limitRaw = query.Get("limit") - attachmentsCount = query.Get("attachments_count") - label = query.Get("label") - sort []string - offset int - limit int + query = r.URL.Query() + sortRaw = query.Get("sort") + offsetRaw = query.Get("offset") + limitRaw = query.Get("limit") + countFiles = query.Get("count_files") + label = query.Get("label") + sort []string + offset int + limit int ) if offsetRaw != "" { @@ -105,14 +105,14 @@ func ThreadsList(c web.C, w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Total-Count", strconv.Itoa(count)) } - if attachmentsCount == "true" || attachmentsCount == "1" { + if countFiles == "true" || countFiles == "1" { for _, thread := range threads { - count, err := env.Attachments.CountByThread(thread.ID) + count, err := env.Files.CountByThread(thread.ID) if err != nil { env.Log.WithFields(logrus.Fields{ "error": err.Error(), "thread": thread.ID, - }).Error("Unable to count attachments per thread") + }).Error("Unable to count files per thread") utils.JSONResponse(w, 500, &ThreadsListResponse{ Success: false, @@ -121,7 +121,7 @@ func ThreadsList(c web.C, w http.ResponseWriter, r *http.Request) { return } - thread.AttachmentsCount = &count + thread.FilesCount = &count } } @@ -177,22 +177,22 @@ func ThreadsGet(c web.C, w http.ResponseWriter, r *http.Request) { } } - if ok := r.URL.Query().Get("attachments_count"); ok == "true" || ok == "1" { - count, err := env.Attachments.CountByThread(thread.ID) + if ok := r.URL.Query().Get("count_files"); ok == "true" || ok == "1" { + count, err := env.Files.CountByThread(thread.ID) if err != nil { env.Log.WithFields(logrus.Fields{ "error": err.Error(), "id": thread.ID, - }).Error("Unable to count attachments linked to a thread") + }).Error("Unable to count files linked to a thread") utils.JSONResponse(w, 500, &ThreadsGetResponse{ Success: false, - Message: "Unable to count attachments", + Message: "Unable to count files", }) return } - thread.AttachmentsCount = &count + thread.FilesCount = &count } utils.JSONResponse(w, 200, &ThreadsGetResponse{ diff --git a/setup/setup.go b/setup/setup.go index b62e5f2..7e60e33 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -206,11 +206,11 @@ func PrepareMux(flags *env.Flags) *web.Mux { Emails: env.Emails, //Cache: redis, } - env.Attachments = &db.AttachmentsTable{ + env.Files = &db.FilesTable{ RethinkCRUD: db.NewCRUDTable( rethinkSession, rethinkOpts.Database, - "attachments", + "files", ), } @@ -386,13 +386,6 @@ func PrepareMux(flags *env.Flags) *web.Mux { // Index route mux.Get("/", routes.Hello) - // Attachments - auth.Get("/attachments", routes.AttachmentsList) - auth.Post("/attachments", routes.AttachmentsCreate) - auth.Get("/attachments/:id", routes.AttachmentsGet) - auth.Put("/attachments/:id", routes.AttachmentsUpdate) - auth.Delete("/attachments/:id", routes.AttachmentsDelete) - // Accounts auth.Get("/accounts", routes.AccountsList) mux.Post("/accounts", routes.AccountsCreate) @@ -405,6 +398,13 @@ func PrepareMux(flags *env.Flags) *web.Mux { mux.Get(regexp.MustCompile(`/avatars/(?P[\S\s]*?)\.(?Psvg|png)(?:[\S\s]*?)$`), routes.Avatars) //mux.Get("/avatars/:hash.:ext", routes.Avatars) + // Files + auth.Get("/files", routes.FilesList) + auth.Post("/files", routes.FilesCreate) + auth.Get("/files/:id", routes.FilesGet) + auth.Put("/files/:id", routes.FilesUpdate) + auth.Delete("/files/:id", routes.FilesDelete) + // Tokens auth.Get("/tokens", routes.TokensGet) auth.Get("/tokens/:id", routes.TokensGet) From 239d6c5a4bc0c5fb64514ce328a45a26c051ae43 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Mon, 9 Feb 2015 13:10:30 +0100 Subject: [PATCH 02/33] EVERYONE LOVES YAML --- .drone.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.drone.yml b/.drone.yml index 1956af8..fc923c0 100644 --- a/.drone.yml +++ b/.drone.yml @@ -18,9 +18,9 @@ script: - "if [ \"$DRONE_BRANCH\" = \"develop\" ]; then fab -H bart.lavaboom.io:36467 deploy; fi" notify: slack: - - webhook_url: $$SLACK_URL - - channel: $$SLACK_CHANNEL - - username: lavadrone - - on_started: true - - on_success: true - - on_failure: true \ No newline at end of file + webhook_url: $$SLACK_URL + channel: $$SLACK_CHANNEL + username: lavadrone + on_started: true + on_success: true + on_failure: true \ No newline at end of file From 9f98d51f26317e0da39312ff8d5b990571d7bd22 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Mon, 9 Feb 2015 13:13:31 +0100 Subject: [PATCH 03/33] Fixed attachment tests --- routes/files_test.go | 56 ++++++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/routes/files_test.go b/routes/files_test.go index 323b8f2..91897ac 100644 --- a/routes/files_test.go +++ b/routes/files_test.go @@ -13,7 +13,7 @@ import ( ) func TestFilesRoute(t *testing.T) { - Convey("When uploading a new attachment", t, func() { + Convey("When uploading a new file", t, func() { account := &models.Account{ Resource: models.MakeResource("", "johnorange"), Status: "complete", @@ -80,7 +80,7 @@ func TestFilesRoute(t *testing.T) { So(response.Message, ShouldEqual, "Invalid request") }) - Convey("Attachment upload should succeed", func() { + Convey("File upload should succeed", func() { request := goreq.Request{ Method: "POST", Uri: server.URL + "/files", @@ -102,16 +102,16 @@ func TestFilesRoute(t *testing.T) { err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) - So(response.Message, ShouldEqual, "A new attachment was successfully created") + So(response.Message, ShouldEqual, "A new file was successfully created") So(response.Success, ShouldBeTrue) - So(response.Attachment.ID, ShouldNotBeEmpty) + So(response.File.ID, ShouldNotBeEmpty) - attachment := response.Attachment + file := response.File - Convey("Getting that attachment should succeed", func() { + Convey("Getting that file should succeed", func() { request := goreq.Request{ Method: "GET", - Uri: server.URL + "/files/" + attachment.ID, + Uri: server.URL + "/files/" + file.ID, } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() @@ -121,11 +121,11 @@ func TestFilesRoute(t *testing.T) { err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) - So(response.Attachment.ID, ShouldNotBeNil) + So(response.File.ID, ShouldNotBeNil) So(response.Success, ShouldBeTrue) }) - Convey("The attachment should be visible on the list", func() { + Convey("The file should be visible on the list", func() { request := goreq.Request{ Method: "GET", Uri: server.URL + "/files", @@ -143,7 +143,7 @@ func TestFilesRoute(t *testing.T) { found := false for _, a := range *response.Files { - if a.ID == attachment.ID { + if a.ID == file.ID { found = true break } @@ -155,7 +155,7 @@ func TestFilesRoute(t *testing.T) { Convey("Updating it should succeed", func() { request := goreq.Request{ Method: "PUT", - Uri: server.URL + "/files/" + attachment.ID, + Uri: server.URL + "/files/" + file.ID, ContentType: "application/json", Body: `{ "data": "` + uniuri.NewLen(64) + `", @@ -175,14 +175,14 @@ func TestFilesRoute(t *testing.T) { So(err, ShouldBeNil) So(response.Success, ShouldBeTrue) - So(response.Attachment.ID, ShouldEqual, attachment.ID) - So(response.Attachment.Encoding, ShouldEqual, "xml") + So(response.File.ID, ShouldEqual, file.ID) + So(response.File.Encoding, ShouldEqual, "xml") }) Convey("Deleting it should succeed", func() { request := goreq.Request{ Method: "DELETE", - Uri: server.URL + "/files/" + attachment.ID, + Uri: server.URL + "/files/" + file.ID, } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() @@ -192,12 +192,12 @@ func TestFilesRoute(t *testing.T) { err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) - So(response.Message, ShouldEqual, "Attachment successfully removed") + So(response.Message, ShouldEqual, "File successfully removed") So(response.Success, ShouldBeTrue) }) }) - Convey("Getting a non-existing attachment should fail", func() { + Convey("Getting a non-existing file should fail", func() { request := goreq.Request{ Method: "GET", Uri: server.URL + "/files/doesntexist", @@ -210,7 +210,7 @@ func TestFilesRoute(t *testing.T) { err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) - So(response.Message, ShouldEqual, "Attachment not found") + So(response.Message, ShouldEqual, "File not found") So(response.Success, ShouldBeFalse) Convey("Updating it should fail too", func() { @@ -228,17 +228,17 @@ func TestFilesRoute(t *testing.T) { err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) - So(response.Message, ShouldEqual, "Attachment not found") + So(response.Message, ShouldEqual, "File not found") So(response.Success, ShouldBeFalse) }) }) - Convey("Getting a non-owned attachment should fail", func() { - attachment := &models.Attachment{ + Convey("Getting a non-owned file should fail", func() { + file := &models.File{ Encrypted: models.Encrypted{ Encoding: "json", Data: uniuri.NewLen(64), - Schema: "attachment", + Schema: "file", VersionMajor: 1, VersionMinor: 0, PGPFingerprints: []string{uniuri.New()}, @@ -246,12 +246,12 @@ func TestFilesRoute(t *testing.T) { Resource: models.MakeResource("nonowned", "photo.jpg"), } - err := env.Files.Insert(attachment) + err := env.Files.Insert(file) So(err, ShouldBeNil) request := goreq.Request{ Method: "GET", - Uri: server.URL + "/files/" + attachment.ID, + Uri: server.URL + "/files/" + file.ID, } request.AddHeader("Authorization", "Bearer "+authToken.ID) result, err := request.Do() @@ -261,7 +261,7 @@ func TestFilesRoute(t *testing.T) { err = result.Body.FromJsonTo(&response) So(err, ShouldBeNil) - So(response.Message, ShouldEqual, "Attachment not found") + So(response.Message, ShouldEqual, "File not found") So(response.Success, ShouldBeFalse) }) @@ -284,7 +284,7 @@ func TestFilesRoute(t *testing.T) { So(response.Message, ShouldEqual, "Invalid input format") }) - Convey("Updating a non-existing attachment should fail", func() { + Convey("Updating a non-existing file should fail", func() { request := goreq.Request{ Method: "PUT", Uri: server.URL + "/files/shizzle", @@ -300,10 +300,10 @@ func TestFilesRoute(t *testing.T) { So(err, ShouldBeNil) So(response.Success, ShouldBeFalse) - So(response.Message, ShouldEqual, "Attachment not found") + So(response.Message, ShouldEqual, "File not found") }) - Convey("Deleting a non-existing attachment should fail", func() { + Convey("Deleting a non-existing file should fail", func() { request := goreq.Request{ Method: "DELETE", Uri: server.URL + "/files/shizzle", @@ -317,7 +317,7 @@ func TestFilesRoute(t *testing.T) { So(err, ShouldBeNil) So(response.Success, ShouldBeFalse) - So(response.Message, ShouldEqual, "Attachment not found") + So(response.Message, ShouldEqual, "File not found") }) }) } From 24e123db261c1e7ca25aabafa38de687b31fc1cf Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Mon, 9 Feb 2015 13:23:07 +0100 Subject: [PATCH 04/33] Database setup fix --- db/setup.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/setup.go b/db/setup.go index 5aa049b..03f3b58 100644 --- a/db/setup.go +++ b/db/setup.go @@ -20,7 +20,7 @@ var tableIndexes = map[string][]string{ "reservations": []string{"email", "name"}, "threads": []string{"owner"}, "tokens": []string{"owner"}, - "attachments": []string{"owner"}, + "files": []string{"owner"}, } // List of names of databases From fb5cd2a73f3e176538cf0bdf43899f29ae6b1b9b Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Mon, 9 Feb 2015 22:42:20 +0100 Subject: [PATCH 05/33] Thread count per label --- models/label.go | 4 +-- routes/labels.go | 64 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/models/label.go b/models/label.go index 6daa02c..588044c 100644 --- a/models/label.go +++ b/models/label.go @@ -16,6 +16,6 @@ type Label struct { // Examples: inbox, trash, spam, drafts, starred, etc. Builtin bool `json:"builtin" gorethink:"builtin"` - EmailsUnread int `json:"emails_unread" gorethink:"-"` - EmailsTotal int `json:"emails_total" gorethink:"-"` + UnreadThreadsCount int `json:"unread_threads_count" gorethink:"-"` + TotalThreadsCount int `json:"total_threads_count" gorethink:"-"` } diff --git a/routes/labels.go b/routes/labels.go index daed582..579cb77 100644 --- a/routes/labels.go +++ b/routes/labels.go @@ -34,6 +34,39 @@ func LabelsList(c web.C, w http.ResponseWriter, r *http.Request) { return } + for _, label := range labels { + totalCount, err := env.Threads.CountByLabel(label.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "label": label.ID, + }).Error("Unable to fetch total threads count") + + utils.JSONResponse(w, 500, &LabelsListResponse{ + Success: false, + Message: "Internal error (code LA/LI/02)", + }) + return + } + + unreadCount, err := env.Threads.CountByLabelUnread(label.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "label": label.ID, + }).Error("Unable to fetch unread threads count") + + utils.JSONResponse(w, 500, &LabelsListResponse{ + Success: false, + Message: "Internal error (code LA/LI/03)", + }) + return + } + + label.TotalThreadsCount = totalCount + label.UnreadThreadsCount = unreadCount + } + utils.JSONResponse(w, 200, &LabelsListResponse{ Success: true, Labels: &labels, @@ -144,6 +177,37 @@ func LabelsGet(c web.C, w http.ResponseWriter, r *http.Request) { return } + totalCount, err := env.Threads.CountByLabel(label.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "label": label.ID, + }).Error("Unable to fetch total threads count") + + utils.JSONResponse(w, 500, &LabelsListResponse{ + Success: false, + Message: "Internal error (code LA/GE/01)", + }) + return + } + + unreadCount, err := env.Threads.CountByLabelUnread(label.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "label": label.ID, + }).Error("Unable to fetch unread threads count") + + utils.JSONResponse(w, 500, &LabelsListResponse{ + Success: false, + Message: "Internal error (code LA/GE/01)", + }) + return + } + + label.TotalThreadsCount = totalCount + label.UnreadThreadsCount = unreadCount + // Write the label to the response utils.JSONResponse(w, 200, &LabelsGetResponse{ Success: true, From 072718e7604b5558a3e5f3b12fca87da325d9d8a Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Tue, 10 Feb 2015 18:57:49 +0100 Subject: [PATCH 06/33] New information added to the events --- db/setup.go | 2 +- setup/setup.go | 39 ++++++++++++++++++++++++++++++++------- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/db/setup.go b/db/setup.go index 03f3b58..1c65e57 100644 --- a/db/setup.go +++ b/db/setup.go @@ -15,12 +15,12 @@ var tableIndexes = map[string][]string{ "accounts": []string{"name"}, "contacts": []string{"owner"}, "emails": []string{"owner", "label_ids"}, + "files": []string{"owner"}, "keys": []string{"owner", "key_id"}, "labels": []string{"owner"}, "reservations": []string{"email", "name"}, "threads": []string{"owner"}, "tokens": []string{"owner"}, - "files": []string{"owner"}, } // List of names of databases diff --git a/setup/setup.go b/setup/setup.go index 7e60e33..7e9b3bf 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -254,12 +254,25 @@ func PrepareMux(flags *env.Flags) *web.Mux { return } + // Resolve the thread + thread, err := env.Threads.GetThread(email.Thread) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": msg.ID, + "thread": email.Thread, + }).Error("Unable to resolve a thread from queue") + return + } + // Send notifications to subscribers for _, session := range sessions[msg.Owner] { result, _ := json.Marshal(map[string]interface{}{ - "type": "delivery", - "id": msg.ID, - "name": email.Name, + "type": "delivery", + "id": msg.ID, + "name": email.Name, + "thread": email.Thread, + "labels": thread.Labels, }) err = session.Send(string(result)) if err != nil { @@ -275,7 +288,6 @@ func PrepareMux(flags *env.Flags) *web.Mux { ID string `json:"id"` Owner string `json:"owner"` }) { - log.Print(msg) // Check if we are handling owner's session if _, ok := sessions[msg.Owner]; !ok { return @@ -295,12 +307,25 @@ func PrepareMux(flags *env.Flags) *web.Mux { return } + // Resolve the thread + thread, err := env.Threads.GetThread(email.Thread) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": msg.ID, + "thread": email.Thread, + }).Error("Unable to resolve a thread from queue") + return + } + // Send notifications to subscribers for _, session := range sessions[msg.Owner] { result, _ := json.Marshal(map[string]interface{}{ - "type": "receipt", - "id": msg.ID, - "name": email.Name, + "type": "receipt", + "id": msg.ID, + "name": email.Name, + "thread": email.Thread, + "labels": thread.Labels, }) err = session.Send(string(result)) if err != nil { From 6e43dde2eb565b6bd5d599aac298a0d9521562da Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Thu, 12 Feb 2015 21:15:02 +0100 Subject: [PATCH 07/33] Prevent thread updates from dropping labels --- routes/threads.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/routes/threads.go b/routes/threads.go index 44e1134..9cb0a17 100644 --- a/routes/threads.go +++ b/routes/threads.go @@ -252,7 +252,7 @@ func ThreadsUpdate(c web.C, w http.ResponseWriter, r *http.Request) { return } - if !reflect.DeepEqual(thread.Labels, input.Labels) { + if thread.Labels != nil && !reflect.DeepEqual(thread.Labels, input.Labels) { thread.Labels = input.Labels } From a666f03e926ed486d742dd7e521d20e3d1aeba60 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sat, 14 Feb 2015 15:16:36 +0100 Subject: [PATCH 08/33] New emails model; disabled internal mailing --- models/email.go | 47 ++++----- models/file.go | 3 - routes/emails.go | 113 ++++++++++++--------- routes/{emails_test.go => emails_test.go_} | 0 4 files changed, 91 insertions(+), 72 deletions(-) rename routes/{emails_test.go => emails_test.go_} (100%) diff --git a/models/email.go b/models/email.go index 5610f2e..f6a9c1b 100644 --- a/models/email.go +++ b/models/email.go @@ -1,42 +1,43 @@ package models -// Email is the cornerstone of our application. -// TODO mime info +// Email is a message in a thread type Email struct { Resource - // Kind of the email. Value is either sent or received. + // Kind is the type of encryption used in the email: + // - raw - when sending raw emails before they get sent + // - manifest - Manifest field is not empty, + // - pgpmime - PGP/MIME format, aka everything is in body Kind string `json:"kind" gorethink:"kind"` - From []string `json:"from" gorethink:"from"` - - // Who is supposed to receive the email / what email received it. - To []string `json:"to" gorethink:"to"` - - CC []string `json:"cc" gorethink:"cc"` + // Unencrypted metadata information, available in both received in sent emails + From string `json:"from" gorethink:"from"` + To []string `json:"to" gorethink:"to"` + CC []string `json:"cc" gorethink:"cc"` + // BCC is only visible in sent emails BCC []string `json:"bcc" gorethink:"bcc"` - // AttachmentsIDs is a slice of the FileIDs associated with this email - // For uploading attachments see `POST /upload` + // Fingerprints used for body and manifest + PGPFingerprints []string `json:"pgp_fingerprints" gorethink:"pgp_fingerprints"` + + // Files contains IDs of other files Files []string `json:"files" gorethink:"files"` - // Body contains all the data needed to send this email - Body Encrypted `json:"body" gorethink:"body"` + // Manifest is only available in emails that were encrypted using PGP manifests + Manifest string `json:"manifest" gorethink:"manifest"` - // Preview contains the encrypted preview information (needed to show a list of emails) - // Example: Headers []string, Body string, - // Headers []string - // Body string - // Snippet string - //Preview Encrypted `json:"preview" gorethink:"preview"` + // Body contains all the data needed to send this email + Body string `json:"body" gorethink:"body"` - Headers []string `json:"headers" gorethink:"headers"` + // ContentType of the body in unencrypted emails + ContentType string `json:"content_type" gorethink:"content_type"` + Subject string `json:"subject" gorethink:"subject"` + ReplyTo string `json:"reply_to" gorethink:"reply_to"` - // ThreadID + // Contains ID of the thread Thread string `json:"thread" gorethink:"thread"` + // received or (queued|processed) Status string `json:"status" gorethink:"status"` - - IsRead string `json:"is_read" gorethink:"is_read"` } diff --git a/models/file.go b/models/file.go index ad1ec67..0414578 100644 --- a/models/file.go +++ b/models/file.go @@ -4,7 +4,4 @@ package models type File struct { Encrypted Resource - - Metadata Encrypted `json:"encrypted" gorethink:"encrypted"` - Body Encrypted `json:"body" gorethink:"body"` } diff --git a/routes/emails.go b/routes/emails.go index 42f27cf..5dc47b4 100644 --- a/routes/emails.go +++ b/routes/emails.go @@ -1,8 +1,8 @@ package routes import ( - "bytes" - "io" + //"bytes" + //"io" "net/http" "regexp" "strconv" @@ -10,8 +10,8 @@ import ( "github.com/Sirupsen/logrus" "github.com/zenazn/goji/web" - "golang.org/x/crypto/openpgp" - "golang.org/x/crypto/openpgp/armor" + //"golang.org/x/crypto/openpgp" + //"golang.org/x/crypto/openpgp/armor" _ "golang.org/x/crypto/ripemd160" "github.com/lavab/api/env" @@ -125,21 +125,25 @@ func EmailsList(c web.C, w http.ResponseWriter, r *http.Request) { } type EmailsCreateRequest struct { - To []string `json:"to"` - CC []string `json:"cc"` - BCC []string `json:"bcc"` - ReplyTo string `json:"reply_to"` - Thread string `json:"thread"` - Subject string `json:"subject"` - Body string `json:"body"` - BodyVersionMajor int `json:"body_version_major"` - BodyVersionMinor int `json:"body_version_minor"` - Preview string `json:"preview"` - PreviewVersionMajor int `json:"preview_version_major"` - PreviewVersionMinor int `json:"preview_version_minor"` - Encoding string `json:"encoding"` - Files []string `json:"files"` - PGPFingerprints []string `json:"pgp_fingerprints"` + // Internal properties + Kind string `json:"kind"` + Thread string `json:"thread"` + + // Metadata that has to be leaked + To []string `json:"to"` + CC []string `json:"cc"` + BCC []string `json:"bcc"` + + // Encrypted parts + PGPFingerprints []string `json:"pgp_fingerprints"` + Manifest string `json:"manifest"` + Body string `json:"body"` + Files []string `json:"files"` + + // Temporary partials if you're sending unencrypted + Subject string `json:"subject"` + ContentType string `json:"content_type"` + ReplyTo string `json:"reply_to"` } // EmailsCreateResponse contains the result of the EmailsCreate request. @@ -169,15 +173,32 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { // Fetch the current session from the middleware session := c.Env["token"].(*models.Token) - // Ensure that the input data isn't empty - if len(input.To) == 0 || input.Subject == "" || input.Body == "" { + // Ensure that the kind is valid + if input.Kind != "raw" && input.Kind != "manifest" && input.Kind != "pgpmime" { utils.JSONResponse(w, 400, &EmailsCreateResponse{ Success: false, - Message: "Invalid request", + Message: "Invalid email encryption kind", }) return } + // Ensure that there's at least one recipient and that there's body + if len(input.To) == 0 || input.Body == "" { + utils.JSONResponse(w, 400, &EmailsCreateResponse{ + Success: false, + Message: "Invalid email", + }) + return + } + + // Create an email resource + resource := models.MakeResource(session.Owner, input.Subject) + + // Generate metadata for manifests + if input.Kind == "manifest" { + resource.Name = "Encrypted message (" + resource.ID + ")" + } + // Fetch the user object from the database account, err := env.Accounts.GetTokenOwner(c.Env["token"].(*models.Token)) if err != nil { @@ -194,9 +215,6 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { return } - // Create an email resource - emailResource := models.MakeResource(session.Owner, input.Subject) - // Get the "Sent" label's ID var label *models.Label err = env.Labels.WhereAndFetchOne(map[string]interface{}{ @@ -236,7 +254,7 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { } else { thread := &models.Thread{ Resource: models.MakeResource(account.ID, input.Subject), - Emails: []string{emailResource.ID}, + Emails: []string{resource.ID}, Labels: []string{label.ID}, Members: append(append(input.To, input.CC...), input.BCC...), IsRead: true, @@ -260,22 +278,25 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { // Create a new email struct email := &models.Email{ - Kind: "sent", - From: []string{account.Name + "@" + env.Config.EmailDomain}, - To: input.To, - CC: input.CC, - BCC: input.BCC, - Resource: emailResource, - Files: input.Files, - Thread: input.Thread, - Body: models.Encrypted{ - Encoding: "json", - PGPFingerprints: input.PGPFingerprints, - Data: input.Body, - Schema: "email", - VersionMajor: input.BodyVersionMajor, - VersionMinor: input.BodyVersionMinor, - }, + Resource: resource, + + Kind: input.Kind, + Thread: input.Thread, + + From: account.Name + "@" + env.Config.EmailDomain, + To: input.To, + CC: input.CC, + BCC: input.BCC, + + PGPFingerprints: input.PGPFingerprints, + Manifest: input.Manifest, + Body: input.Body, + Files: input.Files, + + Subject: input.Subject, + ContentType: input.ContentType, + ReplyTo: input.ReplyTo, + Status: "queued", } @@ -295,7 +316,7 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { // I'm going to whine at this part, as we are doubling the email sending code // Check if To contains lavaboom emails - for _, address := range email.To { + /*for _, address := range email.To { parts := strings.SplitN(address, "@", 2) if parts[1] == env.Config.EmailDomain { go sendEmail(parts[0], email) @@ -330,7 +351,7 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { "error": err.Error(), }).Error("Could not publish an email send request") return - } + }*/ utils.JSONResponse(w, 201, &EmailsCreateResponse{ Success: true, @@ -428,7 +449,7 @@ func EmailsDelete(c web.C, w http.ResponseWriter, r *http.Request) { }) } -func sendEmail(account string, email *models.Email) { +/*func sendEmail(account string, email *models.Email) { // find recipient's account recipient, err := env.Accounts.FindAccountByName(account) if err != nil { @@ -651,4 +672,4 @@ func sendEmail(account string, email *models.Email) { "error": err.Error(), }).Error("Unable to publish a receipt message") } -} +}*/ diff --git a/routes/emails_test.go b/routes/emails_test.go_ similarity index 100% rename from routes/emails_test.go rename to routes/emails_test.go_ From 278e47a8c835492cfb3808b0368ccf986c1ed68b Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sat, 14 Feb 2015 21:53:17 +0100 Subject: [PATCH 09/33] Fixed slack integration --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index ead3bd2..2466249 100644 --- a/main.go +++ b/main.go @@ -76,7 +76,7 @@ var ( // slack slackURL = flag.String("slack_url", "", "URL of the Slack Incoming webhook") slackLevels = flag.String("slack_level", "warning", "minimal level required to have messages sent to slack") - slackChannel = flag.String("slack_channel", "#api-logs", "channel to which Slack bot will send messages") + slackChannel = flag.String("slack_channel", "#notif-api-logs", "channel to which Slack bot will send messages") slackIcon = flag.String("slack_icon", ":ghost:", "emoji icon of the Slack bot") slackUsername = flag.String("slack_username", "API", "username of the Slack bot") ) From 945df5561c1626fabc50c6db8c5fce751e73ace8 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sun, 15 Feb 2015 12:50:22 +0100 Subject: [PATCH 10/33] Uncomment email queueing --- routes/emails.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/routes/emails.go b/routes/emails.go index 5dc47b4..6608234 100644 --- a/routes/emails.go +++ b/routes/emails.go @@ -337,7 +337,7 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { if parts[1] == env.Config.EmailDomain { go sendEmail(parts[0], email) } - } + }*/ // Add a send request to the queue err = env.NATS.Publish("send", email.ID) @@ -351,7 +351,7 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { "error": err.Error(), }).Error("Could not publish an email send request") return - }*/ + } utils.JSONResponse(w, 201, &EmailsCreateResponse{ Success: true, From dbbddc83bd170695cedb370b678db22e7594d951 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sun, 15 Feb 2015 14:58:41 +0100 Subject: [PATCH 11/33] Subject to name --- routes/emails.go | 1 - 1 file changed, 1 deletion(-) diff --git a/routes/emails.go b/routes/emails.go index 6608234..7345dcd 100644 --- a/routes/emails.go +++ b/routes/emails.go @@ -293,7 +293,6 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { Body: input.Body, Files: input.Files, - Subject: input.Subject, ContentType: input.ContentType, ReplyTo: input.ReplyTo, From 5f9bffd13297e54bfb81cbd5b7a52b664e480155 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sun, 15 Feb 2015 14:59:21 +0100 Subject: [PATCH 12/33] Removed subject from the model --- models/email.go | 1 - 1 file changed, 1 deletion(-) diff --git a/models/email.go b/models/email.go index f6a9c1b..298ca10 100644 --- a/models/email.go +++ b/models/email.go @@ -32,7 +32,6 @@ type Email struct { // ContentType of the body in unencrypted emails ContentType string `json:"content_type" gorethink:"content_type"` - Subject string `json:"subject" gorethink:"subject"` ReplyTo string `json:"reply_to" gorethink:"reply_to"` // Contains ID of the thread From 01f5e864a7f9387464dae6dc8ef256309c93fb0e Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Wed, 18 Feb 2015 20:14:43 +0100 Subject: [PATCH 13/33] Subject hashes --- models/thread.go | 3 +++ routes/emails.go | 14 +++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/models/thread.go b/models/thread.go index de9d2c5..df3ad02 100644 --- a/models/thread.go +++ b/models/thread.go @@ -18,4 +18,7 @@ type Thread struct { LastRead string `json:"last_read" gorethink:"last_read"` FilesCount *int `json:"files_count,omitempty" gorethink:"-"` + + // SHA256 hash of the raw subject without prefixes + SubjectHash string `json:"subject_hash" gorethink:"subject_hash"` } diff --git a/routes/emails.go b/routes/emails.go index 7345dcd..0dc1ca4 100644 --- a/routes/emails.go +++ b/routes/emails.go @@ -3,6 +3,7 @@ package routes import ( //"bytes" //"io" + "crypto/sha256" "net/http" "regexp" "strconv" @@ -252,12 +253,15 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { return } } else { + hash := sha256.Sum256([]byte(input.Subject)) + thread := &models.Thread{ - Resource: models.MakeResource(account.ID, input.Subject), - Emails: []string{resource.ID}, - Labels: []string{label.ID}, - Members: append(append(input.To, input.CC...), input.BCC...), - IsRead: true, + Resource: models.MakeResource(account.ID, "Encrypted thread"), + Emails: []string{resource.ID}, + Labels: []string{label.ID}, + Members: append(append(input.To, input.CC...), input.BCC...), + IsRead: true, + SubjectHash: string(hash[:]), } err := env.Threads.Insert(thread) From fed49439c7c7b921924cee794f0d591850c321b4 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Thu, 19 Feb 2015 23:42:34 +0100 Subject: [PATCH 14/33] Subject hash index --- db/setup.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/setup.go b/db/setup.go index 1c65e57..bf5bd96 100644 --- a/db/setup.go +++ b/db/setup.go @@ -19,7 +19,7 @@ var tableIndexes = map[string][]string{ "keys": []string{"owner", "key_id"}, "labels": []string{"owner"}, "reservations": []string{"email", "name"}, - "threads": []string{"owner"}, + "threads": []string{"owner", "subject_hash"}, "tokens": []string{"owner"}, } From 05b904cc1525f017a17eabf191298bd4cd5280f3 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Fri, 20 Feb 2015 10:47:58 +0100 Subject: [PATCH 15/33] Subject hashes --- models/thread.go | 3 ++- routes/emails.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/models/thread.go b/models/thread.go index df3ad02..73da028 100644 --- a/models/thread.go +++ b/models/thread.go @@ -17,7 +17,8 @@ type Thread struct { IsRead bool `json:"is_read" gorethink:"is_read"` LastRead string `json:"last_read" gorethink:"last_read"` - FilesCount *int `json:"files_count,omitempty" gorethink:"-"` + FilesCount *int `json:"files_count,omitempty" gorethink:"-"` + FirstManifest string `json:"first_manifest,omitempty" gorethink:"-"` // SHA256 hash of the raw subject without prefixes SubjectHash string `json:"subject_hash" gorethink:"subject_hash"` diff --git a/routes/emails.go b/routes/emails.go index 0dc1ca4..c98f9a4 100644 --- a/routes/emails.go +++ b/routes/emails.go @@ -4,6 +4,7 @@ import ( //"bytes" //"io" "crypto/sha256" + "encoding/hex" "net/http" "regexp" "strconv" @@ -261,7 +262,7 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { Labels: []string{label.ID}, Members: append(append(input.To, input.CC...), input.BCC...), IsRead: true, - SubjectHash: string(hash[:]), + SubjectHash: hex.EncodeToString(hash[:]), } err := env.Threads.Insert(thread) From bb46f03e0aadc7822787f8219c82084f98d4d946 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Fri, 20 Feb 2015 11:15:44 +0100 Subject: [PATCH 16/33] Added manifests to emails.List --- db/table_emails.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/db/table_emails.go b/db/table_emails.go index e4229a4..57a8e4c 100644 --- a/db/table_emails.go +++ b/db/table_emails.go @@ -96,6 +96,15 @@ func (e *EmailsTable) List( term = term.Slice(offset, offset+limit) } + // Add manifests + term = term.InnerJoin(gorethink.Table("emails").Pluck("id", "manifest"), func(thread gorethink.Term, email gorethink.Term) gorethink.Term { + return thread.Field("emails").Contains(email.Field("id")) + }).Without(map[string]interface{}{ + "right": map[string]interface{}{ + "id": true, + }, + }).Zip() + // Run the query cursor, err := term.Run(e.GetSession()) if err != nil { From 2c7ee870e49e41bfdd4df324223f18a1f72a1b78 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Fri, 20 Feb 2015 18:48:08 +0100 Subject: [PATCH 17/33] List with manifests query fix --- db/table_emails.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/db/table_emails.go b/db/table_emails.go index 57a8e4c..80cf079 100644 --- a/db/table_emails.go +++ b/db/table_emails.go @@ -97,11 +97,11 @@ func (e *EmailsTable) List( } // Add manifests - term = term.InnerJoin(gorethink.Table("emails").Pluck("id", "manifest"), func(thread gorethink.Term, email gorethink.Term) gorethink.Term { - return thread.Field("emails").Contains(email.Field("id")) + term = term.InnerJoin(gorethink.Table("emails").Pluck("thread", "manifest"), func(thread gorethink.Term, email gorethink.Term) gorethink.Term { + return thread.Field("id").Eq(email.Field("thread")) }).Without(map[string]interface{}{ "right": map[string]interface{}{ - "id": true, + "thread": true, }, }).Zip() From 1f383ea2b547862e5e05995f1fd3f04fa2e0c9f9 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Fri, 20 Feb 2015 19:00:47 +0100 Subject: [PATCH 18/33] oops --- db/table_emails.go | 9 --------- db/table_threads.go | 9 +++++++++ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/db/table_emails.go b/db/table_emails.go index 80cf079..e4229a4 100644 --- a/db/table_emails.go +++ b/db/table_emails.go @@ -96,15 +96,6 @@ func (e *EmailsTable) List( term = term.Slice(offset, offset+limit) } - // Add manifests - term = term.InnerJoin(gorethink.Table("emails").Pluck("thread", "manifest"), func(thread gorethink.Term, email gorethink.Term) gorethink.Term { - return thread.Field("id").Eq(email.Field("thread")) - }).Without(map[string]interface{}{ - "right": map[string]interface{}{ - "thread": true, - }, - }).Zip() - // Run the query cursor, err := term.Run(e.GetSession()) if err != nil { diff --git a/db/table_threads.go b/db/table_threads.go index 9cfceb1..0ca0960 100644 --- a/db/table_threads.go +++ b/db/table_threads.go @@ -103,6 +103,15 @@ func (t *ThreadsTable) List( term = term.Slice(offset, offset+limit) } + // Add manifests + term = term.InnerJoin(gorethink.Table("emails").Pluck("thread", "manifest"), func(thread gorethink.Term, email gorethink.Term) gorethink.Term { + return thread.Field("id").Eq(email.Field("thread")) + }).Without(map[string]interface{}{ + "right": map[string]interface{}{ + "thread": true, + }, + }).Zip() + // Run the query cursor, err := term.Run(t.GetSession()) if err != nil { From 2948cbb51f861a6aef1732528eb048c3d6dfd086 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Fri, 20 Feb 2015 19:14:00 +0100 Subject: [PATCH 19/33] Fixed the thread query --- db/table_threads.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/db/table_threads.go b/db/table_threads.go index 0ca0960..9a1e76a 100644 --- a/db/table_threads.go +++ b/db/table_threads.go @@ -60,18 +60,16 @@ func (t *ThreadsTable) List( row.Field("labels").Contains(label), ) }) - } - - if owner != "" && label == "" { + } else if owner != "" && label == "" { term = t.GetTable().Filter(map[string]interface{}{ "owner": owner, }) - } - - if owner == "" && label != "" { + } else if owner == "" && label != "" { term = t.GetTable().Filter(func(row gorethink.Term) gorethink.Term { return row.Field("labels").Contains(label) }) + } else { + term = t.GetTable() } // If sort array has contents, parse them and add to the term From e03a489f2a092d40e90e2bde66ab1cdf41e0c0f3 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Fri, 20 Feb 2015 19:45:48 +0100 Subject: [PATCH 20/33] pls work --- db/table_threads.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/table_threads.go b/db/table_threads.go index 9a1e76a..23a0995 100644 --- a/db/table_threads.go +++ b/db/table_threads.go @@ -102,7 +102,7 @@ func (t *ThreadsTable) List( } // Add manifests - term = term.InnerJoin(gorethink.Table("emails").Pluck("thread", "manifest"), func(thread gorethink.Term, email gorethink.Term) gorethink.Term { + term = term.InnerJoin(gorethink.Db(t.GetDBName()).Table("emails").Pluck("thread", "manifest"), func(thread gorethink.Term, email gorethink.Term) gorethink.Term { return thread.Field("id").Eq(email.Field("thread")) }).Without(map[string]interface{}{ "right": map[string]interface{}{ From c7e916ac454d8f5db28b98e0a592daf9645cae6e Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Fri, 20 Feb 2015 19:56:05 +0100 Subject: [PATCH 21/33] I didn't expect that to be the issue --- models/thread.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/models/thread.go b/models/thread.go index 73da028..250f145 100644 --- a/models/thread.go +++ b/models/thread.go @@ -17,8 +17,8 @@ type Thread struct { IsRead bool `json:"is_read" gorethink:"is_read"` LastRead string `json:"last_read" gorethink:"last_read"` - FilesCount *int `json:"files_count,omitempty" gorethink:"-"` - FirstManifest string `json:"first_manifest,omitempty" gorethink:"-"` + FilesCount *int `json:"files_count,omitempty" gorethink:"-"` + Manifest string `json:"manifest,omitempty" gorethink:"-"` // SHA256 hash of the raw subject without prefixes SubjectHash string `json:"subject_hash" gorethink:"subject_hash"` From a5206a18add9dcef027d0bef96ff3bc0b6d4cee3 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Fri, 20 Feb 2015 20:04:37 +0100 Subject: [PATCH 22/33] Added a gorethink tag definition --- models/thread.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/models/thread.go b/models/thread.go index 250f145..b32f35e 100644 --- a/models/thread.go +++ b/models/thread.go @@ -17,8 +17,8 @@ type Thread struct { IsRead bool `json:"is_read" gorethink:"is_read"` LastRead string `json:"last_read" gorethink:"last_read"` - FilesCount *int `json:"files_count,omitempty" gorethink:"-"` - Manifest string `json:"manifest,omitempty" gorethink:"-"` + FilesCount *int `json:"files_count,omitempty" gorethink:"-"` + Manifest *string `json:"manifest,omitempty" gorethink:"manifest"` // SHA256 hash of the raw subject without prefixes SubjectHash string `json:"subject_hash" gorethink:"subject_hash"` From 5dc4a052b2e786b1a5601772e91d1d4b61152fd0 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Fri, 20 Feb 2015 20:13:51 +0100 Subject: [PATCH 23/33] gorethink sucks --- models/thread.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/models/thread.go b/models/thread.go index b32f35e..2270965 100644 --- a/models/thread.go +++ b/models/thread.go @@ -17,8 +17,8 @@ type Thread struct { IsRead bool `json:"is_read" gorethink:"is_read"` LastRead string `json:"last_read" gorethink:"last_read"` - FilesCount *int `json:"files_count,omitempty" gorethink:"-"` - Manifest *string `json:"manifest,omitempty" gorethink:"manifest"` + FilesCount *int `json:"files_count,omitempty" gorethink:"-"` + Manifest string `json:"manifest,omitempty" gorethink:"manifest"` // SHA256 hash of the raw subject without prefixes SubjectHash string `json:"subject_hash" gorethink:"subject_hash"` From f9718cdef80dc8f5cea19556aee9de9bfee68728 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Fri, 20 Feb 2015 20:33:23 +0100 Subject: [PATCH 24/33] Manifest in threads.get --- db/setup.go | 2 +- db/table_emails.go | 21 +++++++++++++++++++++ routes/threads.go | 10 ++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/db/setup.go b/db/setup.go index bf5bd96..f67d9d9 100644 --- a/db/setup.go +++ b/db/setup.go @@ -14,7 +14,7 @@ var ( var tableIndexes = map[string][]string{ "accounts": []string{"name"}, "contacts": []string{"owner"}, - "emails": []string{"owner", "label_ids"}, + "emails": []string{"owner", "label_ids", "date_created", "thread"}, "files": []string{"owner"}, "keys": []string{"owner", "key_id"}, "labels": []string{"owner"}, diff --git a/db/table_emails.go b/db/table_emails.go index e4229a4..29e35ad 100644 --- a/db/table_emails.go +++ b/db/table_emails.go @@ -128,3 +128,24 @@ func (e *EmailsTable) DeleteByThread(id string) error { "thread": id, }) } + +func (e *EmailsTable) GetThreadManifest(thread string) (string, error) { + cursor, err := e.GetTable(). + GetAllByIndex("thread", thread). + OrderBy(gorethink.OrderByOpts{Index: "date_created"}). + Limit(1). + Pluck("manifest"). + Field("manifest"). + Run(e.GetSession()) + if err != nil { + return "", err + } + + var manifest string + err = cursor.One(&manifest) + if err != nil { + return "", err + } + + return manifest, nil +} diff --git a/routes/threads.go b/routes/threads.go index 9cb0a17..95b4484 100644 --- a/routes/threads.go +++ b/routes/threads.go @@ -160,6 +160,16 @@ func ThreadsGet(c web.C, w http.ResponseWriter, r *http.Request) { return } + manifest, err := env.Emails.GetThreadManifest(thread.ID) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "id": thread.ID, + }).Error("Unable to get a manifest") + } else { + thread.Manifest = manifest + } + var emails []*models.Email if ok := r.URL.Query().Get("list_emails"); ok == "true" || ok == "1" { emails, err = env.Emails.GetByThread(thread.ID) From 017ddda7758e4d87339c66f26d7879ae5d37a5c0 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Fri, 20 Feb 2015 20:43:22 +0100 Subject: [PATCH 25/33] Ordering fix --- db/table_emails.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/table_emails.go b/db/table_emails.go index 29e35ad..ec2d8e3 100644 --- a/db/table_emails.go +++ b/db/table_emails.go @@ -132,7 +132,7 @@ func (e *EmailsTable) DeleteByThread(id string) error { func (e *EmailsTable) GetThreadManifest(thread string) (string, error) { cursor, err := e.GetTable(). GetAllByIndex("thread", thread). - OrderBy(gorethink.OrderByOpts{Index: "date_created"}). + OrderBy("date_created"). Limit(1). Pluck("manifest"). Field("manifest"). From cf7f9ea1a8ea4e3c9cedd55f6c37dcd17f2abfec Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sat, 21 Feb 2015 11:24:01 +0100 Subject: [PATCH 26/33] Fixed attachments count query --- db/table_files.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/table_files.go b/db/table_files.go index 4dc2c3d..07ea9f4 100644 --- a/db/table_files.go +++ b/db/table_files.go @@ -79,7 +79,7 @@ func (f *FilesTable) CountByEmail(id string) (int, error) { func (f *FilesTable) CountByThread(id ...interface{}) (int, error) { query, err := f.GetTable().Filter(func(row gorethink.Term) gorethink.Term { - return gorethink.Table("emails").GetAllByIndex("owner", id...).Field("files").Contains(row.Field("id")) + return gorethink.Table("emails").GetAllByIndex("thread", id...).Field("files").Contains(row.Field("id")) }).Count().Run(f.GetSession()) if err != nil { return 0, err From 85b78a09974f29672a4787066b00224563a5430a Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Tue, 24 Feb 2015 23:52:03 +0100 Subject: [PATCH 27/33] Added raw error logging to NATS pushing --- routes/emails.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/routes/emails.go b/routes/emails.go index c98f9a4..84da696 100644 --- a/routes/emails.go +++ b/routes/emails.go @@ -352,7 +352,8 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { }) env.Log.WithFields(logrus.Fields{ - "error": err.Error(), + "error": err.Error(), + "rawerr": err, }).Error("Could not publish an email send request") return } From 144ebb40c5a655e7c945489d0d6210384b0fcd66 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Wed, 25 Feb 2015 19:46:14 +0100 Subject: [PATCH 28/33] Swapped out NATS for nsq --- .drone.yml | 2 +- circle.yml | 40 ---------------- env/config.go | 3 +- env/env.go | 6 +-- main.go | 20 +++++--- routes/emails.go | 9 ++-- routes/init_test.go | 3 +- setup/setup.go | 107 +++++++++++++++++++++++++++++++------------ setup/setup_test.go_ | 3 +- 9 files changed, 106 insertions(+), 87 deletions(-) delete mode 100644 circle.yml diff --git a/.drone.yml b/.drone.yml index fc923c0..59ae393 100644 --- a/.drone.yml +++ b/.drone.yml @@ -3,7 +3,7 @@ env: - GOPATH=/var/cache/drone services: - redis - - apcera/gnatsd + - mikedewar/nsqd - dockerfile/rethinkdb script: - pip install fabric diff --git a/circle.yml b/circle.yml deleted file mode 100644 index acf9014..0000000 --- a/circle.yml +++ /dev/null @@ -1,40 +0,0 @@ -machine: - timezone: - Europe/Berlin - services: - - docker - -dependencies: - cache_directories: - - /home/ubuntu/api/redis-2.8.18 - pre: - - source /etc/lsb-release && echo "deb http://download.rethinkdb.com/apt $DISTRIB_CODENAME main" | sudo tee /etc/apt/sources.list.d/rethinkdb.list - - wget -qO- http://download.rethinkdb.com/apt/pubkey.gpg | sudo apt-key add - - - sudo apt-get update - - sudo apt-get install rethinkdb - - if [[ ! -e redis-2.8.18/src/redis-server ]]; then wget http://download.redis.io/releases/redis-2.8.18.tar.gz && tar xvzf redis-2.8.18.tar.gz; fi - - if [[ ! -e redis-2.8.18/src/redis-server ]]; then cd redis-2.8.18 && make; fi - - go get github.com/apcera/gnatsd - - go get golang.org/x/tools/cmd/cover - - go get github.com/mattn/goveralls - post: - - rethinkdb --bind all: - background: true - - src/redis-server: - background: true - - gnatsd: - background: true - -test: - override: - - go test -v github.com/lavab/api/setup - - go test -v -covermode=count -coverprofile=coverage.out github.com/lavab/api/routes - - goveralls -coverprofile=coverage.out -service=circleci -repotoken $COVERALLS_TOKEN - -deployment: - hub: - branch: master - commands: - - docker login -e circleci@lavaboom.io -u $DOCKER_USER -p $DOCKER_PASS https://registry.lavaboom.io - - docker build -t registry.lavaboom.io/lavaboom/api . - - docker push registry.lavaboom.io/lavaboom/api \ No newline at end of file diff --git a/env/config.go b/env/config.go index 2ca5ddd..e1f2a67 100644 --- a/env/config.go +++ b/env/config.go @@ -18,7 +18,8 @@ type Flags struct { RethinkDBKey string RethinkDBDatabase string - NATSAddress string + LookupdAddress string + NSQdAddress string YubiCloudID string YubiCloudKey string diff --git a/env/env.go b/env/env.go index 9a40254..00ab65c 100644 --- a/env/env.go +++ b/env/env.go @@ -2,7 +2,7 @@ package env import ( "github.com/Sirupsen/logrus" - "github.com/apcera/nats" + "github.com/bitly/go-nsq" "github.com/dancannon/gorethink" "github.com/lavab/api/cache" @@ -39,6 +39,6 @@ var ( Threads *db.ThreadsTable // Factors contains all currently registered factors Factors map[string]factor.Factor - // NATS is the encoded connection to the NATS queue - NATS *nats.EncodedConn + // Producer is the nsq producer used to send messages to other components of the system + Producer *nsq.Producer ) diff --git a/main.go b/main.go index 2466249..cfec8b8 100644 --- a/main.go +++ b/main.go @@ -54,14 +54,21 @@ var ( } return database }(), "Database name on the RethinkDB server") - // NATS address - natsAddress = flag.String("nats_address", func() string { - address := os.Getenv("NATS_PORT_4222_TCP_ADDR") + // nsq and lookupd addresses + nsqdAddress = flag.String("nsqd_address", func() string { + address := os.Getenv("NSQD_PORT_4150_TCP_ADDR") if address == "" { address = "127.0.0.1" } - return "nats://" + address + ":4222" - }(), "Address of the NATS server") + return address + ":4150" + }(), "Address of the nsqd server") + lookupdAddress = flag.String("lookupd_address", func() string { + address := os.Getenv("NSQLOOKUPD_PORT_4160_TCP_ADDR") + if address == "" { + address = "127.0.0.1" + } + return address + ":4160" + }(), "Address of the lookupd server") // YubiCloud params yubiCloudID = flag.String("yubicloud_id", "", "YubiCloud API id") yubiCloudKey = flag.String("yubicloud_key", "", "YubiCloud API key") @@ -103,7 +110,8 @@ func main() { RethinkDBKey: *rethinkdbKey, RethinkDBDatabase: *rethinkdbDatabase, - NATSAddress: *natsAddress, + NSQdAddress: *nsqdAddress, + LookupdAddress: *lookupdAddress, YubiCloudID: *yubiCloudID, YubiCloudKey: *yubiCloudKey, diff --git a/routes/emails.go b/routes/emails.go index 84da696..302aa39 100644 --- a/routes/emails.go +++ b/routes/emails.go @@ -344,7 +344,7 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { }*/ // Add a send request to the queue - err = env.NATS.Publish("send", email.ID) + err = env.Producer.Publish("send_email", []byte(`"`+email.ID+`"`)) if err != nil { utils.JSONResponse(w, 500, &EmailsCreateResponse{ Success: false, @@ -352,8 +352,7 @@ func EmailsCreate(c web.C, w http.ResponseWriter, r *http.Request) { }) env.Log.WithFields(logrus.Fields{ - "error": err.Error(), - "rawerr": err, + "error": err.Error(), }).Error("Could not publish an email send request") return } @@ -656,7 +655,7 @@ func EmailsDelete(c web.C, w http.ResponseWriter, r *http.Request) { } // Send notifications - err = env.NATS.Publish("delivery", map[string]interface{}{ + err = env.Producer.Publish("email_delivery", map[string]interface{}{ "id": email.ID, "owner": email.Owner, }) @@ -667,7 +666,7 @@ func EmailsDelete(c web.C, w http.ResponseWriter, r *http.Request) { }).Error("Unable to publish a delivery message") } - err = env.NATS.Publish("receipt", map[string]interface{}{ + err = env.NATS.Publish("email_receipt", map[string]interface{}{ "id": newEmail.ID, "owner": newEmail.Owner, }) diff --git a/routes/init_test.go b/routes/init_test.go index 227cfbb..c201fe8 100644 --- a/routes/init_test.go +++ b/routes/init_test.go @@ -27,7 +27,8 @@ func init() { RedisAddress: "127.0.0.1:6379", - NATSAddress: "nats://127.0.0.1:4222", + NSQAddress: "127.0.0.1:4150", + LookupdAddress: "127.0.0.1:4160", RethinkDBAddress: "127.0.0.1:28015", RethinkDBKey: "", diff --git a/setup/setup.go b/setup/setup.go index 7e9b3bf..c9be7a7 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -5,13 +5,14 @@ import ( "io" "net/http" "net/http/httptest" + "os" "regexp" "strings" "sync" "time" "github.com/Sirupsen/logrus" - "github.com/apcera/nats" + "github.com/bitly/go-nsq" "github.com/dancannon/gorethink" "github.com/johntdyer/slackrus" "github.com/pzduniak/glogrus" @@ -214,34 +215,52 @@ func PrepareMux(flags *env.Flags) *web.Mux { ), } - // NATS queue connection - nc, err := nats.Connect(flags.NATSAddress) + // Create a producer + producer, err := nsq.NewProducer(flags.NSQdAddress, nsq.NewConfig()) if err != nil { env.Log.WithFields(logrus.Fields{ - "error": err, - "address": flags.NATSAddress, - }).Fatal("Unable to connect to NATS") + "error": err.Error(), + }).Fatal("Unable to create a new nsq producer") } + defer producer.Stop() - c, err := nats.NewEncodedConn(nc, "json") + env.Producer = producer + + // Get the hostname + hostname, err := os.Hostname() + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + }).Fatal("Unable to get the hostname") + } + + // Create a delivery consumer + deliveryConsumer, err := nsq.NewConsumer("email_delivery", hostname, nsq.NewConfig()) if err != nil { env.Log.WithFields(logrus.Fields{ - "error": err, - "address": flags.NATSAddress, - }).Fatal("Unable to initialize a JSON NATS connection") + "error": err.Error(), + "topic": "email_delivery", + }).Fatal("Unable to create a new nsq consumer") } + defer deliveryConsumer.Stop() + + deliveryConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(m *nsq.Message) error { + var msg *struct { + ID string `json:"id"` + Owner string `json:"owner"` + } + + if err := json.Unmarshal(m.Body, &msg); err != nil { + return err + } - c.Subscribe("delivery", func(msg *struct { - ID string `json:"id"` - Owner string `json:"owner"` - }) { // Check if we are handling owner's session if _, ok := sessions[msg.Owner]; !ok { - return + return nil } if len(sessions[msg.Owner]) == 0 { - return + return nil } // Resolve the email @@ -251,7 +270,7 @@ func PrepareMux(flags *env.Flags) *web.Mux { "error": err.Error(), "id": msg.ID, }).Error("Unable to resolve an email from queue") - return + return nil } // Resolve the thread @@ -262,7 +281,7 @@ func PrepareMux(flags *env.Flags) *web.Mux { "id": msg.ID, "thread": email.Thread, }).Error("Unable to resolve a thread from queue") - return + return nil } // Send notifications to subscribers @@ -282,19 +301,43 @@ func PrepareMux(flags *env.Flags) *web.Mux { }).Warn("Error while writing to a WebSocket") } } - }) - c.Subscribe("receipt", func(msg *struct { - ID string `json:"id"` - Owner string `json:"owner"` - }) { + return nil + }), 10) + + if err := deliveryConsumer.ConnectToNSQLookupd(flags.LookupdAddress); err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + }).Fatal("Unable to connect to nsqlookupd") + } + + // Create a receipt consumer + receiptConsumer, err := nsq.NewConsumer("email_receipt", hostname, nsq.NewConfig()) + if err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + "topic": "email_receipt", + }).Fatal("Unable to create a new nsq consumer") + } + defer receiptConsumer.Stop() + + receiptConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(m *nsq.Message) error { + var msg *struct { + ID string `json:"id"` + Owner string `json:"owner"` + } + + if err := json.Unmarshal(m.Body, &msg); err != nil { + return err + } + // Check if we are handling owner's session if _, ok := sessions[msg.Owner]; !ok { - return + return nil } if len(sessions[msg.Owner]) == 0 { - return + return nil } // Resolve the email @@ -304,7 +347,7 @@ func PrepareMux(flags *env.Flags) *web.Mux { "error": err.Error(), "id": msg.ID, }).Error("Unable to resolve an email from queue") - return + return nil } // Resolve the thread @@ -315,7 +358,7 @@ func PrepareMux(flags *env.Flags) *web.Mux { "id": msg.ID, "thread": email.Thread, }).Error("Unable to resolve a thread from queue") - return + return nil } // Send notifications to subscribers @@ -335,9 +378,15 @@ func PrepareMux(flags *env.Flags) *web.Mux { }).Warn("Error while writing to a WebSocket") } } - }) - env.NATS = c + return nil + }), 10) + + if err := receiptConsumer.ConnectToNSQLookupd(flags.LookupdAddress); err != nil { + env.Log.WithFields(logrus.Fields{ + "error": err.Error(), + }).Fatal("Unable to connect to nsqlookupd") + } // Create a new goji mux mux := web.New() diff --git a/setup/setup_test.go_ b/setup/setup_test.go_ index c10f95d..e44739d 100644 --- a/setup/setup_test.go_ +++ b/setup/setup_test.go_ @@ -19,7 +19,8 @@ func TestSetup(t *testing.T) { RedisAddress: "127.0.0.1:6379", - NATSAddress: "nats://127.0.0.1:4222", + NSQAddress: "127.0.0.1:4150", + LookupdAddress: "127.0.0.1:4160", RethinkDBAddress: "127.0.0.1:28015", RethinkDBKey: "", From 1cc10426af0c9747fa8bd616a28a7177bf707e26 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Wed, 25 Feb 2015 20:01:41 +0100 Subject: [PATCH 29/33] Fixed tests --- routes/init_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/routes/init_test.go b/routes/init_test.go index c201fe8..1dbb6ff 100644 --- a/routes/init_test.go +++ b/routes/init_test.go @@ -27,7 +27,7 @@ func init() { RedisAddress: "127.0.0.1:6379", - NSQAddress: "127.0.0.1:4150", + NSQdAddress: "127.0.0.1:4150", LookupdAddress: "127.0.0.1:4160", RethinkDBAddress: "127.0.0.1:28015", From 76462495917c20841c465d27ad3acf3a1cebd27f Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Wed, 25 Feb 2015 20:46:53 +0100 Subject: [PATCH 30/33] huge hack --- setup/setup.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/setup/setup.go b/setup/setup.go index c9be7a7..85f76a1 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -222,7 +222,10 @@ func PrepareMux(flags *env.Flags) *web.Mux { "error": err.Error(), }).Fatal("Unable to create a new nsq producer") } - defer producer.Stop() + + defer func(producer *nsq.Producer) { + producer.Stop() + }(producer) env.Producer = producer From 1feaafd2d5225139e38d792a7c612094e26c6a53 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Wed, 25 Feb 2015 21:08:27 +0100 Subject: [PATCH 31/33] Removed .Stop() from nsq code --- setup/setup.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/setup/setup.go b/setup/setup.go index 85f76a1..19c320f 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -223,9 +223,9 @@ func PrepareMux(flags *env.Flags) *web.Mux { }).Fatal("Unable to create a new nsq producer") } - defer func(producer *nsq.Producer) { + /*defer func(producer *nsq.Producer) { producer.Stop() - }(producer) + }(producer)*/ env.Producer = producer @@ -245,7 +245,7 @@ func PrepareMux(flags *env.Flags) *web.Mux { "topic": "email_delivery", }).Fatal("Unable to create a new nsq consumer") } - defer deliveryConsumer.Stop() + //defer deliveryConsumer.Stop() deliveryConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(m *nsq.Message) error { var msg *struct { @@ -322,7 +322,7 @@ func PrepareMux(flags *env.Flags) *web.Mux { "topic": "email_receipt", }).Fatal("Unable to create a new nsq consumer") } - defer receiptConsumer.Stop() + //defer receiptConsumer.Stop() receiptConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(m *nsq.Message) error { var msg *struct { From 81fad1d2f8629a880a90d21a8019ff20802c7fb6 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Thu, 26 Feb 2015 22:24:16 +0100 Subject: [PATCH 32/33] Remove queued emails from emails list --- db/table_emails.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/table_emails.go b/db/table_emails.go index ec2d8e3..6d388d4 100644 --- a/db/table_emails.go +++ b/db/table_emails.go @@ -65,7 +65,7 @@ func (e *EmailsTable) List( filter["thread"] = thread } - term := e.GetTable().Filter(filter) + term := e.GetTable().Filter(filter).Filter(gorethink.Not(gorethink.Row.Field("status").Eq(gorethink.Expr("queued")))) // If sort array has contents, parse them and add to the term if sort != nil && len(sort) > 0 { From 1a2504512a33f540bcfcb17c857eed43de5fd394 Mon Sep 17 00:00:00 2001 From: Piotr Zduniak Date: Sun, 1 Mar 2015 14:00:24 +0100 Subject: [PATCH 33/33] Added utility field to the account model --- models/account.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/models/account.go b/models/account.go index 02b39b6..d82327b 100644 --- a/models/account.go +++ b/models/account.go @@ -4,6 +4,7 @@ import ( "github.com/gyepisam/mcf" _ "github.com/gyepisam/mcf/scrypt" // Required to have mcf hash the password into scrypt "github.com/lavab/api/factor" + "golang.org/x/crypto/openpgp" ) // Account stores essential data for a Lavaboom user, and is thus not encrypted. @@ -38,6 +39,8 @@ type Account struct { FactorValue []string `json:"-" gorethink:"factor_value"` Status string `json:"status" gorethink:"status"` + + Key *openpgp.Entity `json:"-" gorethink:"-"` } // SetPassword changes the account's password