Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
317 commits
Select commit Hold shift + click to select a range
64ad847
fix indent
ymatsuda Jul 22, 2015
0f174c4
RecordCollector api
ymatsuda Jul 22, 2015
4b20d92
send(record) delegates to send(record, keySerializer, valSerializer)
ymatsuda Jul 22, 2015
72bf9f7
Small changes on the ProcessorKStreamJob
guozhangwang Jul 22, 2015
87d3a8c
compiler warning
ymatsuda Jul 22, 2015
27be100
minor fixes
guozhangwang Jul 22, 2015
8ec18bf
fix commit behavior; add record metadata to processor context
guozhangwang Jul 22, 2015
ad21788
use poll(0) for non-blocking poll
ymatsuda Jul 22, 2015
7b32e1a
typo
ymatsuda Jul 22, 2015
90178b0
address Yasuhiro's comments
guozhangwang Jul 22, 2015
f1d23d4
pause immediately
ymatsuda Jul 23, 2015
4b93cc2
simplify multi-topic stream
ymatsuda Jul 23, 2015
7cb30aa
we can add partition->streamGroup mappings to the ingestor after init
ymatsuda Jul 23, 2015
c6f9147
forbid kstream construction outside of init call
ymatsuda Jul 23, 2015
5e1e64d
simplify Coordinator
ymatsuda Jul 23, 2015
63ea4c2
bug fix
ymatsuda Jul 23, 2015
2b5663e
remove unused member variable
ymatsuda Jul 23, 2015
0246618
blocking poll only when there is no buffered record at all
ymatsuda Jul 24, 2015
bfbf9e0
added KStreamThread
ymatsuda Jul 27, 2015
9a598bf
fix vararg in KStreamContextImpl.from()
ymatsuda Jul 27, 2015
5ef2073
removed Coordinator
ymatsuda Jul 27, 2015
180e760
fixed comments
ymatsuda Jul 27, 2015
fbe0003
do not subscribe until run()
ymatsuda Jul 27, 2015
04354d7
wip
guozhangwang Jul 27, 2015
506eebc
cleanup
ymatsuda Jul 27, 2015
0174ebf
fix mergeStreamGroup
ymatsuda Jul 27, 2015
dc14dcc
wip
guozhangwang Jul 27, 2015
8f5c195
refactored API
guozhangwang Jul 28, 2015
d6c00da
add an example for using local state store
guozhangwang Jul 28, 2015
39a45aa
Address Yasu's comments
guozhangwang Jul 28, 2015
bad7c83
close stream groups in context
ymatsuda Jul 28, 2015
71c9e5e
rename getConsumedOffsets to putConsumedOffsetsTo
ymatsuda Jul 28, 2015
05035ec
renamed WindowByTime to SlidingWindow
ymatsuda Jul 28, 2015
5370b79
complete the state flush / restore workflow
guozhangwang Jul 30, 2015
9c6f392
Add the change log topics checking upon registration
guozhangwang Jul 30, 2015
15f212b
minor comment fix
guozhangwang Jul 30, 2015
beac8f3
add rocksDB: wip
guozhangwang Jul 31, 2015
ee422e7
new api model
ymatsuda Jul 31, 2015
2062d7a
wip
ymatsuda Jul 31, 2015
12c2a54
wip
ymatsuda Jul 31, 2015
103c18b
Address comments, finish RocksDB restore logic
guozhangwang Jul 31, 2015
f9f0185
wip
ymatsuda Jul 31, 2015
9964374
wip
ymatsuda Jul 31, 2015
e2f612c
fix examples
ymatsuda Jul 31, 2015
cdbe845
fix examples
ymatsuda Jul 31, 2015
3b6b3b4
removed ProcessorContext
ymatsuda Aug 3, 2015
0cff31b
remove streamTime from Receiver
ymatsuda Aug 3, 2015
7a75a5d
fix parameter name
ymatsuda Aug 3, 2015
736da10
KStream.tranform method for generalized transformation
ymatsuda Aug 3, 2015
49ed1d7
new api model
ymatsuda Jul 31, 2015
f934266
wip
ymatsuda Jul 31, 2015
3309e20
wip
ymatsuda Jul 31, 2015
a715bda
wip
ymatsuda Jul 31, 2015
e964bf3
wip
ymatsuda Jul 31, 2015
37f83a6
fix examples
ymatsuda Jul 31, 2015
37e6e6d
fix examples
ymatsuda Jul 31, 2015
392e255
removed ProcessorContext
ymatsuda Aug 3, 2015
914a149
remove streamTime from Receiver
ymatsuda Aug 3, 2015
6a93f33
fix parameter name
ymatsuda Aug 3, 2015
1d72ea5
KStream.tranform method for generalized transformation
ymatsuda Aug 3, 2015
e64eceb
remove restore function
guozhangwang Aug 4, 2015
c07d61c
first re-org
guozhangwang Aug 5, 2015
dd15e13
add rocksdb dependency
guozhangwang Aug 5, 2015
91caf0d
removing io.confluent imports: wip
guozhangwang Aug 5, 2015
336a346
removing io.confluent imports: wip
guozhangwang Aug 5, 2015
13a670f
removing io.confluent imports: wip
guozhangwang Aug 5, 2015
6320cf9
removing io.confluent imports: wip
guozhangwang Aug 5, 2015
680fe3d
compile and test passed
guozhangwang Aug 5, 2015
44cd09f
added missing files
guozhangwang Aug 5, 2015
c7cffdd
SingleProcessorTopology implements Processor
guozhangwang Aug 6, 2015
a329fce
implement StateStore methods in SlidingWindow
ymatsuda Aug 6, 2015
41b7855
Address Yasu's comments
guozhangwang Aug 6, 2015
4ea4aad
wip
guozhangwang Aug 7, 2015
3df3756
wip
guozhangwang Aug 7, 2015
0d9bcba
add one file
guozhangwang Aug 7, 2015
23f3259
wip
guozhangwang Aug 7, 2015
58379f3
wip
guozhangwang Aug 7, 2015
378b88f
wip
guozhangwang Aug 7, 2015
80ac445
wip
guozhangwang Aug 7, 2015
9b31888
wip
guozhangwang Aug 7, 2015
23ce16e
wip
guozhangwang Aug 7, 2015
6e34220
wip
guozhangwang Aug 8, 2015
5eb0240
add folder
guozhangwang Aug 8, 2015
746fec1
wip
guozhangwang Aug 8, 2015
5502aa7
wip
guozhangwang Aug 8, 2015
7019874
compiles
guozhangwang Aug 8, 2015
e7859ff
minor renaming
guozhangwang Aug 8, 2015
6ed0104
adding files
guozhangwang Aug 8, 2015
e293481
adding files
guozhangwang Aug 9, 2015
548651c
minor API changes
guozhangwang Aug 11, 2015
0952e65
Refactor Processor and KStream APIs
guozhangwang Aug 13, 2015
b1844bd
Further working on KStreamImpl
guozhangwang Aug 13, 2015
77c1ba2
Add two files
guozhangwang Aug 13, 2015
36a4eda
Continue on KStreamImpl
guozhangwang Aug 14, 2015
4c5f724
Remove ProcessorProperties
guozhangwang Aug 17, 2015
e13b2d0
Some package and class renaming, fix KafkaSource constrcution at builder
guozhangwang Aug 17, 2015
ac02867
move some classes to internals, hide Chooser / RecordQueue / StampedR…
guozhangwang Aug 17, 2015
3cee6ab
Use reflection to clone KafkaProcessor
guozhangwang Aug 18, 2015
5c41e82
Add SinkNode, unify context.send() with processor.forward()
guozhangwang Aug 18, 2015
070ab00
Fix flatMap by adding a KeyValueFlatMap
guozhangwang Aug 19, 2015
24f36c0
WIP towards the stateless plan API
guozhangwang Aug 20, 2015
63eee5d
wip: refactor StreamTask and ProcessorContext, fix RecordQueue timest…
guozhangwang Aug 25, 2015
60e953b
record current Node and current Record in StreamTask, add ProcessorFa…
guozhangwang Aug 26, 2015
0f59d16
address Yasu's comments
guozhangwang Aug 26, 2015
a94c328
refactor TopologyBuilder
Aug 26, 2015
8210865
fix sourceTopicNames
Aug 26, 2015
f5a37bd
kstream refactored
Aug 27, 2015
16e760f
remove ProcessorMetadata
Aug 27, 2015
0674b74
adress comments
Aug 27, 2015
cae41cd
comment
Aug 27, 2015
4b920e7
wip: commit task states
guozhangwang Aug 26, 2015
237415f
Fix fetch / commit / clean logic
guozhangwang Aug 27, 2015
1c3a86e
kstream refactored
Aug 27, 2015
8515ec7
rename factory to def, move some unit test files
guozhangwang Aug 27, 2015
372c66c
rename define() to instance()
guozhangwang Aug 27, 2015
f1070fd
rebased
guozhangwang Aug 27, 2015
3902e17
rebased continue
guozhangwang Aug 27, 2015
7865633
kstream test fix
Aug 27, 2015
33febf9
fix StreamTaskTest
guozhangwang Aug 27, 2015
98f525a
minor fixes, now compiles
guozhangwang Aug 28, 2015
50c2779
Address comments
guozhangwang Aug 28, 2015
93e36e0
imports
guozhangwang Aug 28, 2015
9b7f66f
unify KeyValueMapper and KeyValueFlatMap
Aug 28, 2015
e4d7309
make testing work
guozhangwang Aug 28, 2015
fd0e805
disallow SinkNode.addChild()
Aug 28, 2015
0c48d53
cleanup KStreamJoin
Aug 28, 2015
ead02e5
fix KStreamBranchTest
Aug 28, 2015
26a5d17
Let KafkaProcess not extending from Runnable to be started and closed…
guozhangwang Aug 28, 2015
b9dc1a5
fix context.forward() using a stack of nodes
Aug 28, 2015
9df86f1
simplified join processor
Aug 28, 2015
5198b02
fix MinTimestampTrackerTest
guozhangwang Aug 28, 2015
cd481a8
OS comments
guozhangwang Aug 28, 2015
0106d3f
fix typo
Sep 1, 2015
20fecf9
get source topic names from builer and subscribe topics
Sep 1, 2015
957ed18
update comments
Sep 1, 2015
6dfa339
unnecessary check
Sep 2, 2015
c4e92f5
make the close method work
Sep 2, 2015
e821386
remove sync from StreamThread.run()
Sep 2, 2015
e01c822
make sure Processor.close() is called at the end of task
Sep 2, 2015
998a60b
no need for recordcollector to know default serializers
Sep 2, 2015
6d3010e
enable default SerDe, optimize record inserts
Sep 3, 2015
bcc1464
logging exceptions
Sep 3, 2015
93baa6c
add thread id and task id to error messages
Sep 3, 2015
7cae4d0
capitalize the first letters of messages
Sep 3, 2015
03271ff
TopologyBuilder methods return the builder to allow methods to be cha…
rhauch Sep 2, 2015
b9489dc
Added JavaDoc to the public methods in the TopologyBuilder class.
rhauch Sep 4, 2015
0614d90
fix unused imports
Sep 4, 2015
74455f2
Modified the gradle configuration for the project to include the tes…
rhauch Sep 3, 2015
0da9a16
fix ProcessorJob; rename streaming to streams
guozhangwang Sep 9, 2015
5f442ad
remove unused imports
guozhangwang Sep 9, 2015
a78a9b6
address comments
guozhangwang Sep 9, 2015
ef0ff2e
revert some changes
guozhangwang Sep 9, 2015
d7501e0
rebased on trunk
guozhangwang Sep 9, 2015
b05f4ac
fix KStreamFlatMapTest.java
guozhangwang Sep 10, 2015
d2f1d55
fix KStreamFlatMapTest
guozhangwang Sep 10, 2015
050aade
misc fixes after rebase
guozhangwang Sep 10, 2015
32c870c
hide node stack in streamtask
Sep 10, 2015
d05c562
fix whitespace
Sep 10, 2015
d221eb0
Merge pull request #43 from ymatsuda/punctuation
ymatsuda Sep 10, 2015
2c645b6
change timestampextractor signature
Sep 10, 2015
74c58e3
Merge pull request #44 from ymatsuda/timestampextractor_signature
ymatsuda Sep 10, 2015
39c8256
remove nodestack
Sep 11, 2015
42a4cfd
Merge pull request #45 from ymatsuda/remove_nodestack
ymatsuda Sep 11, 2015
96066bd
remove nodestack from test code
Sep 11, 2015
1a99204
Merge branch 'streaming' of github.com:confluentinc/kafka into remove…
Sep 11, 2015
087e270
Merge pull request #47 from ymatsuda/remove_nodestack
ymatsuda Sep 11, 2015
be49a97
make ProcessorTopology immutable
Sep 11, 2015
3dc28f5
flush records before offset commit
Sep 11, 2015
ab814c6
Merge pull request #49 from ymatsuda/streaming
ymatsuda Sep 11, 2015
de80cc7
Merge pull request #48 from ymatsuda/processortopology
ymatsuda Sep 11, 2015
c459e9e
use restoredOffset if ack'ed offset is not available
Sep 11, 2015
6fdad6d
remove processor arg from schedule call
Sep 11, 2015
d493d27
Merge pull request #50 from ymatsuda/punctuation
ymatsuda Sep 14, 2015
8c8a5b2
Merge branch 'streaming' of github.com:confluentinc/kafka into chkpt_…
Sep 14, 2015
b978271
Merge pull request #51 from ymatsuda/chkpt_offsets
ymatsuda Sep 14, 2015
395ca48
call statemgr.close()
Sep 14, 2015
9dab4c6
ProcessorTopology unit tests
guozhangwang Sep 14, 2015
2ab6576
address comments
guozhangwang Sep 14, 2015
a088297
Merge pull request #52 from ymatsuda/call_statemgr_close
ymatsuda Sep 14, 2015
f1f7f4e
Merge pull request #53 from guozhangwang/ProcessorUnitTests
guozhangwang Sep 14, 2015
60a26a2
fix state directory cleanup
Sep 14, 2015
04bcf7e
Merge branch 'streaming' of github.com:confluentinc/kafka into fix_ma…
Sep 14, 2015
2164002
change error messages
Sep 14, 2015
5a781c7
Merge pull request #54 from ymatsuda/fix_maybeClean
ymatsuda Sep 14, 2015
0fc5c0b
add unit tests for PartitionGroup and RecordQueue
guozhangwang Sep 15, 2015
fe73ac8
merged from streaming
guozhangwang Sep 15, 2015
e0c04e1
Merge pull request #55 from guozhangwang/PartitionGroupUnitTests
guozhangwang Sep 15, 2015
1306509
merge nextQueue and getRecord into nextRecord
Sep 15, 2015
be24263
move mayPunctuate call out of a try block
Sep 16, 2015
60dca26
Merge pull request #56 from ymatsuda/nextRecord
ymatsuda Sep 16, 2015
cea5bb1
add ProcessorStateManagerTest
Sep 17, 2015
fa508ef
The greater than cannot be used in JavaDoc.
eljefe6a Sep 17, 2015
d586061
Merge pull request #59 from eljefe6a/streaming
guozhangwang Sep 18, 2015
bb09ba9
address some comments
guozhangwang Sep 18, 2015
6bb2f3e
Merge pull request #58 from ymatsuda/procStateMgrTest
ymatsuda Sep 21, 2015
789a33f
Yasu's comments
guozhangwang Sep 21, 2015
e016b15
Merge pull request #60 from guozhangwang/addressComments
guozhangwang Sep 21, 2015
f438e8b
add StreamThreadTest
Sep 21, 2015
91db671
fix generics
Sep 21, 2015
ffec5bb
Merge pull request #61 from ymatsuda/streamThreadTest
ymatsuda Sep 21, 2015
5b76f95
Merge branch 'trunk' of https://github.com/apache/kafka into streaming
guozhangwang Sep 21, 2015
edc2186
rebase on apache trunk
guozhangwang Sep 21, 2015
37db015
revork changes of unsubscribe()
guozhangwang Sep 21, 2015
b4e1f19
use commitSync()
guozhangwang Sep 21, 2015
ec8bd8a
Merge pull request #62 from guozhangwang/rebaseApache
guozhangwang Sep 21, 2015
1aff413
add a test for statedir cleanup and fix a bug
Sep 21, 2015
4ff39da
check commits
Sep 21, 2015
516d0c9
remove redundant lines
Sep 21, 2015
aee5991
Merge branch 'streamThreadTest' of github.com:ymatsuda/kafka into tes…
Sep 21, 2015
f0453c9
Added serializer and deserializers for longs.
eljefe6a Sep 21, 2015
fdcc91c
Merge pull request #63 from ymatsuda/streamThreadTest
ymatsuda Sep 21, 2015
e8e9810
added testMaybeCommit
Sep 21, 2015
58d6f1e
Merge pull request #65 from ymatsuda/testPeriodicalCommit
ymatsuda Sep 21, 2015
699b5c1
Merge pull request #64 from eljefe6a/streaming
guozhangwang Sep 21, 2015
3c33383
add unit tests for KStreamImpl
guozhangwang Sep 22, 2015
e9a526e
move statemgr construction to streamtask
Sep 22, 2015
4a2e188
rename sendTo -> to
guozhangwang Sep 22, 2015
58c7413
Merge pull request #66 from guozhangwang/KStreamImplTest
guozhangwang Sep 22, 2015
783d806
Merge pull request #67 from ymatsuda/moveStateMgrConstruction
ymatsuda Sep 22, 2015
2c26b9f
set the initialized flag in context
Sep 22, 2015
8b48440
Docs say that iterators must be closed. Maybe need to emphasize with …
eljefe6a Sep 22, 2015
e9dd3fc
Merge pull request #68 from ymatsuda/ctxInitFlag
ymatsuda Sep 22, 2015
0f85cf7
fix metrics names
guozhangwang Sep 22, 2015
09d7360
Merge branch 'streaming' of https://github.com/confluentinc/kafka int…
guozhangwang Sep 22, 2015
8243789
finer process / punctuate metric sensors
guozhangwang Sep 22, 2015
5bac8ca
comments addressed
guozhangwang Sep 22, 2015
ac842f1
Merge pull request #71 from guozhangwang/moreComments
guozhangwang Sep 22, 2015
3eb4da9
Merge branch 'streaming' of https://github.com/confluentinc/kafka int…
guozhangwang Sep 22, 2015
b7554ba
revert to use static thread ids
guozhangwang Sep 22, 2015
e83d54b
Merge pull request #70 from guozhangwang/fixMetrics
guozhangwang Sep 22, 2015
5046ec6
Merge branch 'trunk' of https://github.com/apache/kafka into streaming
guozhangwang Sep 23, 2015
2633c13
merge latest commits from apache
guozhangwang Sep 23, 2015
25a869b
Merge pull request #72 from guozhangwang/rebaseAgain
guozhangwang Sep 23, 2015
1c43863
Added a new ProcessorTopologyTestDriver class that makes it easier to…
rhauch Sep 23, 2015
fc6cc9a
Allowed the restore consumer to be passed into the StreamThread insta…
rhauch Sep 23, 2015
5dc7fee
Corrected StreamThreadTest, and moved tests for ProcessorTopologyTest…
rhauch Sep 23, 2015
69638c4
Corrected incorrect package name in JavaDoc
rhauch Sep 23, 2015
5b8e74f
Include testJar when uploading to Maven
guozhangwang Sep 23, 2015
de38c05
Merge pull request #69 from eljefe6a/streaming
guozhangwang Sep 23, 2015
c2ddd9a
Created an AbstractProcessor and used it in ProcessorTopologyTest and…
rhauch Sep 23, 2015
6466989
Fix delete in in-memory store, and let delete return old value
guozhangwang Sep 23, 2015
2393720
Merge pull request #57 from rhauch/streaming-processortopology-test-d…
guozhangwang Sep 23, 2015
c18bb2c
Merge pull request #73 from guozhangwang/fixInMemoryKVDelete
guozhangwang Sep 23, 2015
425df16
flush records and state changelog before local state
Sep 25, 2015
f332436
Merge pull request #76 from ymatsuda/flushRecordsFirst
ymatsuda Sep 25, 2015
d1fe347
Merge branch 'trunk' of https://github.com/apache/kafka into streaming
guozhangwang Sep 25, 2015
8812e12
rebase on apache, fixes some issues in MockRestoreConsumer
guozhangwang Sep 25, 2015
20934d1
revert some changes
guozhangwang Sep 25, 2015
7871687
add one TODO to the restoration process
guozhangwang Sep 25, 2015
6eac054
Merge pull request #78 from guozhangwang/rebaseAgain
guozhangwang Sep 25, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ do
CLASSPATH=$CLASSPATH:$file
done

for file in $base_dir/stream/build/libs/kafka-streams*.jar;
do
CLASSPATH=$CLASSPATH:$file
done

for file in $base_dir/tools/build/libs/kafka-tools*.jar;
do
CLASSPATH=$CLASSPATH:$file
Expand Down
76 changes: 69 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -215,25 +215,22 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
}

def copycatPkgs = ['copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file']
def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'log4j-appender', 'tools'] + copycatPkgs
def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'log4j-appender', 'tools', 'streams'] + copycatPkgs

tasks.create(name: "jarCopycat", dependsOn: copycatPkgs.collect { it + ":jar" }) {}
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7'] + pkgs.collect { it + ":jar" }) {
}
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7'] + pkgs.collect { it + ":jar" }) { }

tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7'] + pkgs.collect { it + ":srcJar" }) { }

tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7'] + pkgs.collect { it + ":docsJar" }) { }

tasks.create(name: "testCopycat", dependsOn: copycatPkgs.collect { it + ":test" }) {}
tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7'] + pkgs.collect { it + ":test" }) {
}
tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7'] + pkgs.collect { it + ":test" }) { }

tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_10_5', 'releaseTarGz_2_11_7']) {
}

tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7'] + pkgs.collect { it + ":uploadArchives" }) {
}
tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7'] + pkgs.collect { it + ":uploadArchives" }) { }

project(':core') {
println "Building project 'core' with Scala version $scalaVersion"
Expand Down Expand Up @@ -518,6 +515,71 @@ project(':tools') {
dependsOn 'copyDependantLibs'
}

artifacts {
archives testJar
}

configurations {
archives.extendsFrom (testCompile)
}

checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
}
test.dependsOn('checkstyleMain', 'checkstyleTest')
}

project(':streams') {
apply plugin: 'checkstyle'
archivesBaseName = "kafka-streams"

dependencies {
compile project(':clients')
compile "$slf4jlog4j"
compile 'org.rocksdb:rocksdbjni:3.10.1'

testCompile "$junit"
testCompile project(path: ':clients', configuration: 'archives')
}

task testJar(type: Jar) {
classifier = 'test'
from sourceSets.test.output
}

test {
testLogging {
events "passed", "skipped", "failed"
exceptionFormat = 'full'
}
}

javadoc {
include "**/org/apache/kafka/streams/*"
}

tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntime) {
include('slf4j-log4j12*')
}
from (configurations.runtime) {
exclude('kafka-clients*')
}
into "$buildDir/dependant-libs-${scalaVersion}"
}

jar {
dependsOn 'copyDependantLibs'
}

artifacts {
archives testJar
}

configurations {
archives.extendsFrom (testCompile)
}

checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
}
Expand Down
8 changes: 6 additions & 2 deletions checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@
</module>
<module name="LocalVariableName"/>
<module name="LocalFinalVariableName"/>
<module name="ClassTypeParameterName"/>
<module name="MemberName"/>
<module name="MethodTypeParameterName"/>
<module name="ClassTypeParameterName">
<property name="format" value="^[A-Z0-9]*$"/>
</module>
<module name="MethodTypeParameterName">
<property name="format" value="^[A-Z0-9]*$"/>
</module>
<module name="PackageName"/>
<module name="ParameterName"/>
<module name="StaticVariableName"/>
Expand Down
16 changes: 15 additions & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@
</subpackage>

<subpackage name="clients">
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.slf4j" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.clients" exact-match="true"/>
<allow pkg="org.apache.kafka.test" />

Expand All @@ -111,6 +111,20 @@
</subpackage>
</subpackage>

<subpackage name="streams">
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.test"/>
<allow pkg="org.apache.kafka.clients"/>
<allow pkg="org.apache.kafka.clients.producer" exact-match="true"/>
<allow pkg="org.apache.kafka.clients.consumer" exact-match="true"/>

<allow pkg="org.apache.kafka.streams"/>

<subpackage name="state">
<allow pkg="org.rocksdb" />
</subpackage>
</subpackage>

<subpackage name="log4jappender">
<allow pkg="org.apache.log4j" />
<allow pkg="org.apache.kafka.clients" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ public class ConsumerConfig extends AbstractConfig {

/** <code>key.deserializer</code> */
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
private static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface.";
public static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface.";

/** <code>value.deserializer</code> */
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";

/** <code>connections.max.idle.ms</code> */
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,13 +550,15 @@ private KafkaConsumer(ConsumerConfig config,
Deserializer.class);
this.keyDeserializer.configure(config.originals(), true);
} else {
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
this.keyDeserializer = keyDeserializer;
}
if (valueDeserializer == null) {
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.valueDeserializer.configure(config.originals(), false);
} else {
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
this.valueDeserializer = valueDeserializer;
}
this.fetcher = new Fetcher<K, V>(this.client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private final Map<String, List<PartitionInfo>> partitions;
private final SubscriptionState subscriptions;
private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private Set<TopicPartition> paused;
private boolean closed;
private final Map<TopicPartition, Long> beginningOffsets;
private final Map<TopicPartition, Long> endOffsets;
Expand All @@ -57,8 +58,9 @@ public class MockConsumer<K, V> implements Consumer<K, V> {

public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
this.subscriptions = new SubscriptionState(offsetResetStrategy);
this.partitions = new HashMap<String, List<PartitionInfo>>();
this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
this.partitions = new HashMap<>();
this.records = new HashMap<>();
this.paused = new HashSet<>();
this.closed = false;
this.beginningOffsets = new HashMap<>();
this.endOffsets = new HashMap<>();
Expand Down Expand Up @@ -288,14 +290,18 @@ public void updatePartitions(String topic, List<PartitionInfo> partitions) {

@Override
public void pause(TopicPartition... partitions) {
for (TopicPartition partition : partitions)
for (TopicPartition partition : partitions) {
subscriptions.pause(partition);
paused.add(partition);
}
}

@Override
public void resume(TopicPartition... partitions) {
for (TopicPartition partition : partitions)
for (TopicPartition partition : partitions) {
subscriptions.resume(partition);
paused.remove(partition);
}
}

@Override
Expand Down Expand Up @@ -332,6 +338,10 @@ public void waitForPollThen(Runnable task, long timeoutMs) {
}
}

public Set<TopicPartition> paused() {
return Collections.unmodifiableSet(new HashSet<>(paused));
}

private void ensureNotClosed() {
if (this.closed)
throw new IllegalStateException("This consumer has already been closed.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,15 @@ private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serial
Serializer.class);
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
}
config.logUnused();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ public class ProducerConfig extends AbstractConfig {

/** <code>key.serializer</code> */
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
private static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>Serializer</code> interface.";
public static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>Serializer</code> interface.";

/** <code>value.serializer</code> */
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";

/** <code>connections.max.idle.ms</code> */
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ protected Object get(String key) {
return values.get(key);
}

public void ignore(String key) {
used.add(key);
}

public Short getShort(String key) {
return (Short) get(key);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;

import java.util.Map;

public class LongDeserializer implements Deserializer<Long> {

public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}

public Long deserialize(String topic, byte[] data) {
if (data == null)
return null;
if (data.length != 8) {
throw new SerializationException("Size of data received by LongDeserializer is " +
"not 8");
}

long value = 0;
for (byte b : data) {
value <<= 8;
value |= b & 0xFF;
}
return value;
}

public void close() {
// nothing to do
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.serialization;

import java.util.Map;

public class LongSerializer implements Serializer<Long> {

public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}

public byte[] serialize(String topic, Long data) {
if (data == null)
return null;

return new byte[] {
(byte) (data >>> 56),
(byte) (data >>> 48),
(byte) (data >>> 40),
(byte) (data >>> 32),
(byte) (data >>> 24),
(byte) (data >>> 16),
(byte) (data >>> 8),
data.byteValue()
};
}

public void close() {
// nothing to do
}
}
Loading