forked from flowchartsman/boltqueue
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathpqueue.go
More file actions
176 lines (156 loc) · 4.5 KB
/
pqueue.go
File metadata and controls
176 lines (156 loc) · 4.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package boltqueue
import (
"encoding/binary"
"time"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
)
// TODO: Interfacification of messages
// TODO: add Peek(index) and Drop(n)
var foundItem = errors.New("item found")
// PQueue is a priority queue backed by a Bolt database on disk
type PQueue struct {
conn *bolt.DB
}
// NewPQueue loads or creates a new PQueue with the given filename
func NewPQueue(filename string) (*PQueue, error) {
db, err := bolt.Open(filename, 0644, &bolt.Options{
Timeout: 5 * time.Second,
})
if err != nil {
return nil, errors.Wrap(err, "open db failed")
}
return &PQueue{db}, nil
}
func getBucketName(priority int) ([]byte, error) {
if priority < 0 || priority > 255 {
return nil, errors.Errorf("invalid priority %d, should in range 0~255", priority)
}
return []byte{byte(priority)}, nil
}
func (b *PQueue) enqueueMessage(priority int, key []byte, message *Message) error {
p, err := getBucketName(priority)
if err != nil {
return err
}
return b.conn.Update(func(tx *bolt.Tx) error {
// Get bucket for this priority level
pb, err := tx.CreateBucketIfNotExists(p)
if err != nil {
return errors.Wrap(err, "create bucket failed")
}
if len(key) == 0 {
seq, err := pb.NextSequence()
if err != nil {
return errors.Wrap(err, "gen sequence failed")
}
key = make([]byte, 8)
binary.BigEndian.PutUint64(key, seq)
}
err = pb.Put(key, message.value)
if err != nil {
return errors.Wrap(err, "put message failed")
}
return nil
})
}
// Enqueue adds a message to the queue
func (b *PQueue) Enqueue(priority int, message *Message) error {
return b.enqueueMessage(priority, nil, message)
}
// RequeueAs adds a message back into the queue, keeping its precedence.
// If added at the same priority, it should be among the first to dequeue.
// If added at a different priority, it will dequeue before newer messages
// of that priority.
func (b *PQueue) RequeueAs(priority int, message *Message) error {
if message.key == nil {
return errors.New("cannot requeue new message")
}
return b.enqueueMessage(priority, message.key, message)
}
// Requeue adds a message back into the queue with the same priority, it should be among the first to dequeue.
func (b *PQueue) Requeue(message *Message) error {
return b.RequeueAs(message.priority, message)
}
// Dequeue removes the oldest, highest priority message from the queue and
// returns it
func (b *PQueue) Dequeue() (*Message, error) {
return b.getMessage(true, true)
}
// Peek get nth message from the queue, without removing it from the queue
func (b *PQueue) Peek() (*Message, error) {
return b.getMessage(true, false)
}
// Drop removes one message from the queue. Invoke it several times if need to remove more.
func (b *PQueue) Drop() error {
_, err := b.getMessage(false, true)
return err
}
func (b *PQueue) getMessage(decode, remove bool) (*Message, error) {
if !decode && !remove {
return nil, errors.New("param invalid, decode or remove flag must be set")
}
var m *Message
err := b.conn.Update(func(tx *bolt.Tx) error {
err := tx.ForEach(func(bname []byte, bucket *bolt.Bucket) error {
cur := bucket.Cursor()
k, v := cur.First() // Should not be empty by definition
if k == nil { // empty bucket, check next
return nil
}
if decode {
priority, _ := binary.Uvarint(bname)
m = &Message{priority: int(priority), key: cloneBytes(k), value: cloneBytes(v)}
}
// Remove message
if remove {
if err := cur.Delete(); err != nil {
return errors.Wrap(err, "remove message failed")
}
}
return foundItem //to stop the iteration
})
if err != nil && err != foundItem {
return err
}
return nil
})
return m, err // m is nil when not found
}
// Size returns the number of entries of a given priority from 1 to 5
func (b *PQueue) Size(priority int) (int, error) {
p, err := getBucketName(priority)
if err != nil {
return 0, err
}
count := 0
err = b.conn.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(p)
var err error
count, err = countBucket(bucket)
return err
})
return count, err
}
func countBucket(bk *bolt.Bucket) (count int, err error) {
if bk == nil {
return
}
err = bk.ForEach(func(k, v []byte) error {
if len(k) > 0 && len(v) > 0 {
count++
}
return nil
})
return
}
// Close closes the queue and releases all resources
func (b *PQueue) Close() error {
return b.conn.Close()
}
// taken from boltDB. Avoids corruption when re-queueing
func cloneBytes(v []byte) []byte {
var clone = make([]byte, len(v))
copy(clone, v)
return clone
}