-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdataflow.py
More file actions
289 lines (237 loc) · 10.9 KB
/
dataflow.py
File metadata and controls
289 lines (237 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import operator
### Setting up a quick dataflow structure model for handling
### Originally inspired for handling transformations in importing mail
### messages for the mail database project (see Projects/MailSys) but
### useful enough that I wanted it globa.
### XXX: There probably are improvements to make in the base abstraction
### around handling multiple intput and output links. Oh, well.
### XXX: Some logging in the base abstraction would be good. For the
### current implementation, central logging showing input and output
### along each link would be grand. I think this requires inputs to
### pass through the base class and call up, which is fine.
### Naming conventions for internally/externally called functions and
### should/shouldn't be overridden:
### If they're external I want them to be easy to call, so I don't want
### to distinguish override/non-override in the name. I'll follow that
### pattern internally as well. Internal functions will have an _ prepended.
### XXX: Should have classes export schema and have that schema checked on
### linkage.
### XXX: You may want tagged inputs and outputs. Heck, you may want
### both tagged and numbered outputs; numbered for multiple items of
### the same type, and tagged for different categories.
### XXX: Specify interface more completely (specifically to superclasses, and
### to external functions).
### XXX: Might want to think about operator overloading to link DFNs
### (possibly mimic building a list; stream DFN container? Any good
### syntactic sugar for splits?)
### XXX: I need a clearer distinction in states between "figuring out
### linkages" and "flowing". I need to know whether I can trust
### the linkage info.
### XXX: Why am I assuming a single input before inputs get attached?
class DataflowNode(object):
"""Base class for node in a dataflow network. Takes an input record,
does some type of transformation on it, and outputs some other record.
Default action is just to pass things through.
Note that only input, _localEos, and _validate_link are intended to be overridden by
descendants."""
def __init__(self):
self.outputFunctions = []
self.outputEos = []
# Default to a single input. If there are more from other DFNs,
# the array will expand automatically, and it currently doesn't
# make sense to have no inputs for a DFN.
self.upstreamInfo = [] # Tuples of obj, output#
self.eosSeen = [False,]
self.shutdown = False
# input and eos are both called by both user and internal links
def input(self, record, inputLink=0):
"Default behavior is assertion exception; descendants should override."
assert False, "DataflowNode class not meant to be used directly."
def eos(self, inputLink=0):
self.eosSeen[inputLink] = True
if reduce(operator.and_, filter(lambda x: operator.is_not(x, None),
self.eosSeen)):
self._localEos()
for f in self.outputEos:
if f: f()
def setupInput(self, inputLink):
"""Setup a specific external input for multi-external input
nodes."""
assert inputLink > 1
self.eosSeen += \
[None,] * max(0,inputLink - len(self.eosSeen) + 1)
self.eosSeen[inputLink] = False
def _firstOpenOutput(self):
"""Used by subclasses to do auto-linking of multiple outputs."""
for i in range(len(self.outputFunctions)):
if self.outputFunctions is None:
return i
return len(self.outputFunctions)
def _validate_link(self, linknum, input_p):
"""Should be overridden if only some links are valid."""
return True
def _localEos(self):
"""Internal function called when eos has been seen on all inputs.
Descendants may override to get this notification."""
pass
def _output(self, record, links=None):
"""Internal method for outputing a record conditional on output func.
links is a list of outputs to output on; defaults to the specical value
None, meaning all of them."""
if links is None: links = range(len(self.outputFunctions))
for l in links:
if self.outputFunctions[l]:
self.outputFunctions[l](record)
def _shutdown(self):
"""Inform upstream nodes that we're going away and they shouldn't
bother us anymore. Note that this is independent from sending
eos downstream."""
self.shutdown = True
for usn in self.upstreamInfo:
(node, port) = usn
node._breakPipe(port)
def _breakPipe(self, port):
self.outputFunctions[port] = None
self.outputEos[port] = None
if not filter(None, self.outputFunctions):
# We're done; we've got no more customers
self._shutdown()
@staticmethod
def link(outputNode, inputNode, outputLink=0, inputLink=0):
assert outputNode._validate_link(outputLink, False), (outputNode, outputLink)
assert inputNode._validate_link(inputLink, True), (inputNode, inputLink)
outputNode.outputFunctions += \
[None,] * max(0,outputLink - len(outputNode.outputFunctions) + 1)
outputNode.outputEos += \
[None,] * max(0,outputLink - len(outputNode.outputEos) + 1)
inputNode.eosSeen += \
[None,] * max(0,inputLink - len(inputNode.eosSeen) + 1)
inputNode.upstreamInfo += \
[None,] * max(0,inputLink - len(inputNode.upstreamInfo) + 1)
outputNode.outputFunctions[outputLink] = \
lambda record: inputNode.input(record, inputLink=inputLink)
outputNode.outputEos[outputLink] = \
lambda: inputNode.eos(inputLink=inputLink)
inputNode.eosSeen[inputLink] = False
inputNode.upstreamInfo[inputLink] = (outputNode, outputLink)
# Utility dataflow classes
class StreamDFN(DataflowNode):
"""Easy class for binding together a single list of data flow nodes."""
def __init__(self):
DataflowNode.__init__(self)
self.start = None
self.end = None
def prepend(self, node):
if self.start:
DataflowNode.link(node, self.start)
self.start = node
else:
self.start = self.end = node
def append(self, node):
if self.end:
DataflowNode.link(self.end, node)
self.end = node
else:
self.start = self.end = node
def _validate_link(self, linknum, input_p):
return linknum == 0 # One input, one output
def input(self, record, inputLink=0):
assert inputLink == 0
if self.start:
self.start.input(record)
else:
self._output(record)
def _localEos(self):
if self.start:
self.start.eos()
class SplitDFN(DataflowNode):
"""Split the input into as many outputs as are linked."""
def __init__(self):
DataflowNode.__init__(self)
def _validate_link(self, linknum, input_p):
return linknum == 0 or not input_p # One input, any num outputs
def input(self, record, inputLink=0):
self._output(record)
def addOutput(self, downstreamNode, downstreamlink=0):
DataflowNode.link(self, downstreamNode, self._firstOpenOutput(),
downstreamlink)
class FilterDFN(DataflowNode):
"""Filters input through a specified function."""
def __init__(self, filterFunc=None, eosFunc=None):
DataflowNode.__init__(self)
self.filterFunc = filterFunc
self.eosFunc = eosFunc
def _validate_link(self, linknum, input_p):
return linknum == 0 # One input, 0-1 outputs.
def input(self, record, inputLink=0):
if self.filterFunc: self._output(self.filterFunc(record))
def _localEos(self):
if self.eosFunc: self.eosFunc()
class SinkDFN(FilterDFN):
"""Accepts input and dumps it to a specified function."""
# Implemented through FilterDFN with no outputs.
def _validate_link(self, linknum, input_p):
return input_p and linknum ==0 # Any input, no outputs
class RecordIntervalDFN(DataflowNode):
"""Only transmit a specified interval of records from input to output."""
def __init__(self, interval):
"""Only transmit records whose record number falls in the given
interval from input to output. -1 for the end of the interval means
no limit."""
DataflowNode.__init__(self)
assert isinstance(interval[0], int) and isinstance(interval[1], int)
self.interval = interval
self.recordNum = 0
def _validate_link(self, linknum, input_p):
return linknum == 0 # One input, one output
def input(self, record, inputLink=0):
if (self.recordNum >= self.interval[0]
and (self.interval[1] == -1 or self.recordNum < self.interval[1])):
self._output(record)
self.recordNum += 1
if self.interval[1] != -1 and self.recordNum >= self.interval[1]:
self.eos()
self._shutdown()
class ByteIntervalDFN(DataflowNode):
"""Only transmit a specified byte interval (where input/output is in text strings)."""
def __init__(self, interval):
"""Only transmit bytes whose position in the stream falls in the given
interval from input to output. -1 for the end of the interval means
no limit."""
DataflowNode.__init__(self)
self.interval = interval
self.byteNum = 0
def _validate_link(self, linknum, input_p):
return linknum == 0 # One input, one output
def input(self, record, inputLink=0):
strlen = len(record)
# Map the byte interval into the string coords
# Limit by string boundaries
startInStr = self.interval[0] - self.byteNum
startInStr = min(strlen, max(0, startInStr))
endInStr = self.interval[1] - self.byteNum if self.interval[1] != -1 else strlen
endInStr = min(strlen, max(0, endInStr))
self.byteNum += len(record)
if endInStr - startInStr > 0:
self._output(record[startInStr:endInStr])
if self.interval[1] != -1 and self.byteNum > self.interval[1]:
self.eos()
self._shutdown()
class BatchRecordDFN(DataflowNode):
"""Pass on records input->output in batches. A batchsize of 0 means to
wait until end of stream."""
def __init__(self, batchsize):
DataflowNode.__init__(self)
self.batchsize = batchsize
self.recordlist = []
def _validate_link(self, linknum, input_p):
return linknum == 0 # One input, one output
def _push(self):
self._output(self.recordlist)
self.recordlist = []
def input(self, record, inputLink=0):
self.recordlist += (record,)
if self.batchsize and len(self.recordlist) >= self.batchsize:
self._push()
def _localEos(self):
if self.recordlist: self._push()