-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathpqueue.go
More file actions
133 lines (118 loc) · 3.24 KB
/
pqueue.go
File metadata and controls
133 lines (118 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package boltqueue
import (
"encoding/binary"
"errors"
"fmt"
"github.com/boltdb/bolt"
)
// TODO: Interfacification of messages
var foundItem = errors.New("item found")
// aKey singleton for assigning keys to messages
var aKey = new(atomicKey)
// PQueue is a priority queue backed by a Bolt database on disk
type PQueue struct {
conn *bolt.DB
}
// NewPQueue loads or creates a new PQueue with the given filename
func NewPQueue(filename string) (*PQueue, error) {
db, err := bolt.Open(filename, 0600, nil)
if err != nil {
return nil, err
}
return &PQueue{db}, nil
}
func (b *PQueue) enqueueMessage(priority int, key []byte, message *Message) error {
if priority < 0 || priority > 255 {
return fmt.Errorf("Invalid priority %d on Enqueue", priority)
}
p := make([]byte, 1)
p[0] = byte(uint8(priority))
return b.conn.Update(func(tx *bolt.Tx) error {
// Get bucket for this priority level
pb, err := tx.CreateBucketIfNotExists(p)
if err != nil {
return err
}
err = pb.Put(key, message.value)
if err != nil {
return err
}
return nil
})
}
// Enqueue adds a message to the queue
func (b *PQueue) Enqueue(priority int, message *Message) error {
k := make([]byte, 8)
binary.BigEndian.PutUint64(k, aKey.Get())
return b.enqueueMessage(priority, k, message)
}
// Requeue adds a message back into the queue, keeping its precedence.
// If added at the same priority, it should be among the first to dequeue.
// If added at a different priority, it will dequeue before newer messages
// of that priority.
func (b *PQueue) Requeue(priority int, message *Message) error {
if message.key == nil {
return fmt.Errorf("Cannot requeue new message")
}
return b.enqueueMessage(priority, message.key, message)
}
// Dequeue removes the oldest, highest priority message from the queue and
// returns it
func (b *PQueue) Dequeue() (*Message, error) {
var m *Message
err := b.conn.Update(func(tx *bolt.Tx) error {
err := tx.ForEach(func(bname []byte, bucket *bolt.Bucket) error {
if bucket.Stats().KeyN == 0 { //empty bucket
return nil
}
cur := bucket.Cursor()
k, v := cur.First() //Should not be empty by definition
priority, _ := binary.Uvarint(bname)
m = &Message{priority: int(priority), key: cloneBytes(k), value: cloneBytes(v)}
// Remove message
if err := cur.Delete(); err != nil {
return err
}
return foundItem //to stop the iteration
})
if err != nil && err != foundItem {
return err
}
return nil
})
if err != nil {
return nil, err
}
return m, nil
}
// Size returns the number of entries of a given priority from 1 to 5
func (b *PQueue) Size(priority int) (int, error) {
if priority < 0 || priority > 255 {
return 0, fmt.Errorf("Invalid priority %d for Size()", priority)
}
tx, err := b.conn.Begin(false)
if err != nil {
return 0, err
}
bucket := tx.Bucket([]byte{byte(uint8(priority))})
if bucket == nil {
return 0, nil
}
count := bucket.Stats().KeyN
tx.Rollback()
return count, nil
}
// Close closes the queue and releases all resources
func (b *PQueue) Close() error {
err := b.conn.Close()
if err != nil {
return err
}
return nil
}
// taken from boltDB. Avoids corruption when re-queueing
func cloneBytes(v []byte) []byte {
var clone = make([]byte, len(v))
copy(clone, v)
return clone
}