Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions _research/nats_receipt/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"github.com/apcera/nats"
"log"
)

func main() {
nc, err := nats.Connect("nats://127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}

c, err := nats.NewEncodedConn(nc, "json")
if err != nil {
log.Fatal(err)
}

c.Publish("receipt", &struct {
ID string `json:"id"`
Owner string `json:"owner"`
}{
ID: "helloworld",
Owner: "k5TuCXomSMEeCdXw1aXl",
})

log.Print("x")
}
177 changes: 177 additions & 0 deletions db/table_threads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package db

import (
"github.com/dancannon/gorethink"

"github.com/lavab/api/models"
)

type ThreadsTable struct {
RethinkCRUD
}

func (t *ThreadsTable) GetThread(id string) (*models.Thread, error) {
var result models.Thread

if err := t.FindFetchOne(id, &result); err != nil {
return nil, err
}

return &result, nil
}

func (t *ThreadsTable) GetOwnedBy(id string) ([]*models.Thread, error) {
var result []*models.Thread

err := t.WhereAndFetch(map[string]interface{}{
"owner": id,
}, &result)
if err != nil {
return nil, err
}

return result, nil
}

func (t *ThreadsTable) DeleteOwnedBy(id string) error {
return t.Delete(map[string]interface{}{
"owner": id,
})
}

func (t *ThreadsTable) CountOwnedBy(id string) (int, error) {
return t.FindByAndCount("owner", id)
}

func (t *ThreadsTable) List(
owner string,
sort []string,
offset int,
limit int,
label string,
) ([]*models.Thread, error) {

var term gorethink.Term

if owner != "" && label != "" {
term = t.GetTable().Filter(func(row gorethink.Term) gorethink.Term {
return gorethink.And(
row.Field("owner").Eq(owner),
row.Field("labels").Contains(label),
)
})
}

if owner != "" && label == "" {
term = t.GetTable().Filter(map[string]interface{}{
"owner": owner,
})
}

if owner == "" && label != "" {
term = t.GetTable().Filter(func(row gorethink.Term) gorethink.Term {
return row.Field("labels").Contains(label)
})
}

// If sort array has contents, parse them and add to the term
if sort != nil && len(sort) > 0 {
var conds []interface{}
for _, cond := range sort {
if cond[0] == '-' {
conds = append(conds, gorethink.Desc(cond[1:]))
} else if cond[0] == '+' || cond[0] == ' ' {
conds = append(conds, gorethink.Asc(cond[1:]))
} else {
conds = append(conds, gorethink.Asc(cond))
}
}

term = term.OrderBy(conds...)
}

// Slice the result in 3 cases
if offset != 0 && limit == 0 {
term = term.Skip(offset)
}

if offset == 0 && limit != 0 {
term = term.Limit(limit)
}

if offset != 0 && limit != 0 {
term = term.Slice(offset, offset+limit)
}

// Run the query
cursor, err := term.Run(t.GetSession())
if err != nil {
return nil, err
}

// Fetch the cursor
var resp []*models.Thread
err = cursor.All(&resp)
if err != nil {
return nil, err
}

return resp, nil
}

func (t *ThreadsTable) GetByLabel(label string) ([]*models.Thread, error) {
var result []*models.Thread

cursor, err := t.GetTable().Filter(func(row gorethink.Term) gorethink.Term {
return row.Field("labels").Contains(label)
}).GetAll().Run(t.GetSession())
if err != nil {
return nil, err
}

err = cursor.All(&result)
if err != nil {
return nil, err
}

return result, nil
}

func (t *ThreadsTable) CountByLabel(label string) (int, error) {
var result int

cursor, err := t.GetTable().Filter(func(row gorethink.Term) gorethink.Term {
return row.Field("labels").Contains(label)
}).Count().Run(t.GetSession())
if err != nil {
return 0, err
}

err = cursor.One(&result)
if err != nil {
return 0, err
}

return result, nil
}

func (t *ThreadsTable) CountByLabelUnread(label string) (int, error) {
var result int

cursor, err := t.GetTable().Filter(func(row gorethink.Term) gorethink.Term {
return gorethink.And(
row.Field("labels").Contains(label),
row.Field("is_read").Eq(false),
)
}).Count().Run(t.GetSession())
if err != nil {
return 0, err
}

err = cursor.One(&result)
if err != nil {
return 0, err
}

return result, nil
}
2 changes: 2 additions & 0 deletions env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ var (
Labels *db.LabelsTable
// Attachments is the global instance of AttachmentsTable
Attachments *db.AttachmentsTable
// Threads is the global instance of ThreadsTable
Threads *db.ThreadsTable
// Factors contains all currently registered factors
Factors map[string]factor.Factor
// NATS is the encoded connection to the NATS queue
Expand Down
5 changes: 2 additions & 3 deletions models/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ type Thread struct {
// Members is a slice containing userIDs or email addresses for all members of the thread
Members []string `json:"members" gorethink:"members"`

// Snippet is a bit of text from the conversation, for context. It's only visible to the user.
Snippet Encrypted `json:"snippet" gorethink:"snippet"`

// Subject is the subject of the thread.
Subject string `json:"subject" gorethink:"subject"`

IsRead bool `json:"is_read" gorethink:"is_body"`
}
19 changes: 18 additions & 1 deletion setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package setup

import (
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"regexp"
Expand Down Expand Up @@ -31,6 +32,12 @@ var (
sessionsLock sync.Mutex
)

type nopCloser struct {
io.Reader
}

func (nopCloser) Close() error { return nil }

// PrepareMux sets up the API
func PrepareMux(flags *env.Flags) *web.Mux {
// Set up a new logger
Expand Down Expand Up @@ -155,6 +162,13 @@ func PrepareMux(flags *env.Flags) *web.Mux {
"emails",
),
}
env.Threads = &db.ThreadsTable{
RethinkCRUD: db.NewCRUDTable(
rethinkSession,
rethinkOpts.Database,
"threads",
),
}
env.Labels = &db.LabelsTable{
RethinkCRUD: db.NewCRUDTable(
rethinkSession,
Expand Down Expand Up @@ -233,6 +247,7 @@ func PrepareMux(flags *env.Flags) *web.Mux {
ID string `json:"id"`
Owner string `json:"owner"`
}) {
log.Print(msg)
// Check if we are handling owner's session
if _, ok := sessions[msg.Owner]; !ok {
return
Expand Down Expand Up @@ -625,7 +640,7 @@ func PrepareMux(flags *env.Flags) *web.Mux {
} else if input.Type == "request" {
// Perform the request
w := httptest.NewRecorder()
r, err := http.NewRequest(input.Method, "http://api.lavaboom.io"+input.Path, strings.NewReader(input.Body))
r, err := http.NewRequest(strings.ToUpper(input.Method), "http://api.lavaboom.io"+input.Path, strings.NewReader(input.Body))
if err != nil {
env.Log.WithFields(logrus.Fields{
"id": session.ID(),
Expand All @@ -648,6 +663,8 @@ func PrepareMux(flags *env.Flags) *web.Mux {
continue
}

r.Body = nopCloser{strings.NewReader(input.Body)}

r.RequestURI = input.Path

for key, value := range input.Headers {
Expand Down