diff --git a/kafka-impl/conf/bookkeeper.conf b/kafka-impl/conf/bookkeeper.conf deleted file mode 100644 index 880b82e1f9..0000000000 --- a/kafka-impl/conf/bookkeeper.conf +++ /dev/null @@ -1,656 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -## Bookie settings - -############################################################################# -## Server parameters -############################################################################# - -# Port that bookie server listen on -bookiePort=3181 - -# Directories BookKeeper outputs its write ahead log. -# Could define multi directories to store write head logs, separated by ','. -# For example: -# journalDirectories=/tmp/bk-journal1,/tmp/bk-journal2 -# If journalDirectories is set, bookies will skip journalDirectory and use -# this setting directory. -# journalDirectories=/tmp/bk-journal - -# Directory Bookkeeper outputs its write ahead log -# @deprecated since 4.5.0. journalDirectories is preferred over journalDirectory. -journalDirectory=data/bookkeeper/journal - -# Configure the bookie to allow/disallow multiple ledger/index/journal directories -# in the same filesystem disk partition -# allowMultipleDirsUnderSameDiskPartition=false - -# Minimum safe Usable size to be available in index directory for bookie to create -# Index File while replaying journal at the time of bookie Start in Readonly Mode (in bytes) -minUsableSizeForIndexFileCreation=1073741824 - -# Set the network interface that the bookie should listen on. -# If not set, the bookie will listen on all interfaces. -# listeningInterface=eth0 - -# Configure a specific hostname or IP address that the bookie should use to advertise itself to -# clients. If not set, bookie will advertised its own IP address or hostname, depending on the -# listeningInterface and useHostNameAsBookieID settings. -advertisedAddress= - -# Whether the bookie allowed to use a loopback interface as its primary -# interface(i.e. the interface it uses to establish its identity)? -# By default, loopback interfaces are not allowed as the primary -# interface. -# Using a loopback interface as the primary interface usually indicates -# a configuration error. For example, its fairly common in some VPS setups -# to not configure a hostname, or to have the hostname resolve to -# 127.0.0.1. If this is the case, then all bookies in the cluster will -# establish their identities as 127.0.0.1:3181, and only one will be able -# to join the cluster. For VPSs configured like this, you should explicitly -# set the listening interface. -allowLoopback=false - -# Interval to watch whether bookie is dead or not, in milliseconds -bookieDeathWatchInterval=1000 - -# When entryLogPerLedgerEnabled is enabled, checkpoint doesn't happens -# when a new active entrylog is created / previous one is rolled over. -# Instead SyncThread checkpoints periodically with 'flushInterval' delay -# (in milliseconds) in between executions. Checkpoint flushes both ledger -# entryLogs and ledger index pages to disk. -# Flushing entrylog and index files will introduce much random disk I/O. -# If separating journal dir and ledger dirs each on different devices, -# flushing would not affect performance. But if putting journal dir -# and ledger dirs on same device, performance degrade significantly -# on too frequent flushing. You can consider increment flush interval -# to get better performance, but you need to pay more time on bookie -# server restart after failure. -# This config is used only when entryLogPerLedgerEnabled is enabled. -flushInterval=60000 - -# Allow the expansion of bookie storage capacity. Newly added ledger -# and index dirs must be empty. -# allowStorageExpansion=false - -# Whether the bookie should use its hostname to register with the -# co-ordination service(eg: Zookeeper service). -# When false, bookie will use its ipaddress for the registration. -# Defaults to false. -useHostNameAsBookieID=false - -# Whether the bookie is allowed to use an ephemeral port (port 0) as its -# server port. By default, an ephemeral port is not allowed. -# Using an ephemeral port as the service port usually indicates a configuration -# error. However, in unit tests, using an ephemeral port will address port -# conflict problems and allow running tests in parallel. -# allowEphemeralPorts=false - -# Whether allow the bookie to listen for BookKeeper clients executed on the local JVM. -# enableLocalTransport=false - -# Whether allow the bookie to disable bind on network interfaces, -# this bookie will be available only to BookKeeper clients executed on the local JVM. -# disableServerSocketBind=false - -# The number of bytes we should use as chunk allocation for -# org.apache.bookkeeper.bookie.SkipListArena -# skipListArenaChunkSize=4194304 - -# The max size we should allocate from the skiplist arena. Allocations -# larger than this should be allocated directly by the VM to avoid fragmentation. -# skipListArenaMaxAllocSize=131072 - -# The bookie authentication provider factory class name. -# If this is null, no authentication will take place. -# bookieAuthProviderFactoryClass=null - -############################################################################# -## Garbage collection settings -############################################################################# - -# How long the interval to trigger next garbage collection, in milliseconds -# Since garbage collection is running in background, too frequent gc -# will heart performance. It is better to give a higher number of gc -# interval if there is enough disk capacity. -gcWaitTime=900000 - -# How long the interval to trigger next garbage collection of overreplicated -# ledgers, in milliseconds [Default: 1 day]. This should not be run very frequently -# since we read the metadata for all the ledgers on the bookie from zk -gcOverreplicatedLedgerWaitTime=86400000 - -# Number of threads that should handle write requests. if zero, the writes would -# be handled by netty threads directly. -numAddWorkerThreads=0 - -# Number of threads that should handle read requests. if zero, the reads would -# be handled by netty threads directly. -numReadWorkerThreads=8 - -# Number of threads that should be used for high priority requests -# (i.e. recovery reads and adds, and fencing). -numHighPriorityWorkerThreads=8 - -# If read workers threads are enabled, limit the number of pending requests, to -# avoid the executor queue to grow indefinitely -maxPendingReadRequestsPerThread=2500 - -# If add workers threads are enabled, limit the number of pending requests, to -# avoid the executor queue to grow indefinitely -maxPendingAddRequestsPerThread=10000 - -# Whether force compaction is allowed when the disk is full or almost full. -# Forcing GC may get some space back, but may also fill up disk space more quickly. -# This is because new log files are created before GC, while old garbage -# log files are deleted after GC. -# isForceGCAllowWhenNoSpace=false - -# True if the bookie should double check readMetadata prior to gc -# verifyMetadataOnGC=false - -############################################################################# -## TLS settings -############################################################################# - -# TLS Provider (JDK or OpenSSL). -# tlsProvider=OpenSSL - -# The path to the class that provides security. -# tlsProviderFactoryClass=org.apache.bookkeeper.security.SSLContextFactory - -# Type of security used by server. -# tlsClientAuthentication=true - -# Bookie Keystore type. -# tlsKeyStoreType=JKS - -# Bookie Keystore location (path). -# tlsKeyStore=null - -# Bookie Keystore password path, if the keystore is protected by a password. -# tlsKeyStorePasswordPath=null - -# Bookie Truststore type. -# tlsTrustStoreType=null - -# Bookie Truststore location (path). -# tlsTrustStore=null - -# Bookie Truststore password path, if the trust store is protected by a password. -# tlsTrustStorePasswordPath=null - -############################################################################# -## Long poll request parameter settings -############################################################################# - -# The number of threads that should handle long poll requests. -# numLongPollWorkerThreads=10 - -# The tick duration in milliseconds for long poll requests. -# requestTimerTickDurationMs=10 - -# The number of ticks per wheel for the long poll request timer. -# requestTimerNumTicks=1024 - -############################################################################# -## AutoRecovery settings -############################################################################# - -# The interval between auditor bookie checks. -# The auditor bookie check, checks ledger metadata to see which bookies should -# contain entries for each ledger. If a bookie which should contain entries is -# unavailable, then the ledger containing that entry is marked for recovery. -# Setting this to 0 disabled the periodic check. Bookie checks will still -# run when a bookie fails. -# The interval is specified in seconds. -auditorPeriodicBookieCheckInterval=86400 - -# The number of entries that a replication will rereplicate in parallel. -rereplicationEntryBatchSize=100 - -# Auto-replication -# The grace period, in seconds, that the replication worker waits before fencing and -# replicating a ledger fragment that's still being written to upon bookie failure. -openLedgerRereplicationGracePeriod=30 - -# Whether the bookie itself can start auto-recovery service also or not -autoRecoveryDaemonEnabled=true - -# How long to wait, in seconds, before starting auto recovery of a lost bookie -lostBookieRecoveryDelay=0 - -############################################################################# -## Netty server settings -############################################################################# - -# This settings is used to enabled/disabled Nagle's algorithm, which is a means of -# improving the efficiency of TCP/IP networks by reducing the number of packets -# that need to be sent over the network. -# If you are sending many small messages, such that more than one can fit in -# a single IP packet, setting server.tcpnodelay to false to enable Nagle algorithm -# can provide better performance. -# Default value is true. -serverTcpNoDelay=true - -# This setting is used to send keep-alive messages on connection-oriented sockets. -# serverSockKeepalive=true - -# The socket linger timeout on close. -# When enabled, a close or shutdown will not return until all queued messages for -# the socket have been successfully sent or the linger timeout has been reached. -# Otherwise, the call returns immediately and the closing is done in the background. -# serverTcpLinger=0 - -# The Recv ByteBuf allocator initial buf size. -# byteBufAllocatorSizeInitial=65536 - -# The Recv ByteBuf allocator min buf size. -# byteBufAllocatorSizeMin=65536 - -# The Recv ByteBuf allocator max buf size. -# byteBufAllocatorSizeMax=1048576 - -############################################################################# -## Journal settings -############################################################################# - -# The journal format version to write. -# Available formats are 1-6: -# 1: no header -# 2: a header section was added -# 3: ledger key was introduced -# 4: fencing key was introduced -# 5: expanding header to 512 and padding writes to align sector size configured by `journalAlignmentSize` -# 6: persisting explicitLac is introduced -# By default, it is `6`. -# If you'd like to disable persisting ExplicitLac, you can set this config to < `6` and also -# fileInfoFormatVersionToWrite should be set to 0. If there is mismatch then the serverconfig is considered invalid. -# You can disable `padding-writes` by setting journal version back to `4`. This feature is available in 4.5.0 -# and onward versions. -journalFormatVersionToWrite=5 - -# Max file size of journal file, in mega bytes -# A new journal file will be created when the old one reaches the file size limitation -journalMaxSizeMB=2048 - -# Max number of old journal file to kept -# Keep a number of old journal files would help data recovery in specia case -journalMaxBackups=5 - -# How much space should we pre-allocate at a time in the journal. -journalPreAllocSizeMB=16 - -# Size of the write buffers used for the journal -journalWriteBufferSizeKB=64 - -# Should we remove pages from page cache after force write -journalRemoveFromPageCache=true - -# Should the data be fsynced on journal before acknowledgment. -# By default, data sync is enabled to guarantee durability of writes. -# Beware: while disabling data sync in the Bookie journal might improve the bookie write performance, it will also -# introduce the possibility of data loss. With no sync, the journal entries are written in the OS page cache but -# not flushed to disk. In case of power failure, the affected bookie might lose the unflushed data. If the ledger -# is replicated to multiple bookies, the chances of data loss are reduced though still present. -journalSyncData=true - -# Should we group journal force writes, which optimize group commit -# for higher throughput -journalAdaptiveGroupWrites=true - -# Maximum latency to impose on a journal write to achieve grouping -journalMaxGroupWaitMSec=1 - -# Maximum writes to buffer to achieve grouping -journalBufferedWritesThreshold=524288 - -# The number of threads that should handle journal callbacks -numJournalCallbackThreads=8 - -# All the journal writes and commits should be aligned to given size. -# If not, zeros will be padded to align to given size. -# It only takes effects when journalFormatVersionToWrite is set to 5 -journalAlignmentSize=4096 - -# Maximum entries to buffer to impose on a journal write to achieve grouping. -# journalBufferedEntriesThreshold=0 - -# If we should flush the journal when journal queue is empty -journalFlushWhenQueueEmpty=false - -############################################################################# -## Ledger storage settings -############################################################################# - -# Ledger storage implementation class -ledgerStorageClass=org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage - -# Directory Bookkeeper outputs ledger snapshots -# could define multi directories to store snapshots, separated by ',' -# For example: -# ledgerDirectories=/tmp/bk1-data,/tmp/bk2-data -# -# Ideally ledger dirs and journal dir are each in a differet device, -# which reduce the contention between random i/o and sequential write. -# It is possible to run with a single disk, but performance will be significantly lower. -ledgerDirectories=data/bookkeeper/ledgers -# Directories to store index files. If not specified, will use ledgerDirectories to store. -# indexDirectories=data/bookkeeper/ledgers - -# Interval at which the auditor will do a check of all ledgers in the cluster. -# By default this runs once a week. The interval is set in seconds. -# To disable the periodic check completely, set this to 0. -# Note that periodic checking will put extra load on the cluster, so it should -# not be run more frequently than once a day. -auditorPeriodicCheckInterval=604800 - -# Whether sorted-ledger storage enabled (default true) -# sortedLedgerStorageEnabled=ture - -# The skip list data size limitation (default 64MB) in EntryMemTable -# skipListSizeLimit=67108864L - -############################################################################# -## Ledger cache settings -############################################################################# - -# Max number of ledger index files could be opened in bookie server -# If number of ledger index files reaches this limitation, bookie -# server started to swap some ledgers from memory to disk. -# Too frequent swap will affect performance. You can tune this number -# to gain performance according your requirements. -openFileLimit=0 - -# The fileinfo format version to write. -# Available formats are 0-1: -# 0: Initial version -# 1: persisting explicitLac is introduced -# By default, it is `1`. -# If you'd like to disable persisting ExplicitLac, you can set this config to 0 and -# also journalFormatVersionToWrite should be set to < 6. If there is mismatch then the -# serverconfig is considered invalid. -fileInfoFormatVersionToWrite=0 - -# Size of a index page in ledger cache, in bytes -# A larger index page can improve performance writing page to disk, -# which is efficent when you have small number of ledgers and these -# ledgers have similar number of entries. -# If you have large number of ledgers and each ledger has fewer entries, -# smaller index page would improve memory usage. -# pageSize=8192 - -# How many index pages provided in ledger cache -# If number of index pages reaches this limitation, bookie server -# starts to swap some ledgers from memory to disk. You can increment -# this value when you found swap became more frequent. But make sure -# pageLimit*pageSize should not more than JVM max memory limitation, -# otherwise you would got OutOfMemoryException. -# In general, incrementing pageLimit, using smaller index page would -# gain bettern performance in lager number of ledgers with fewer entries case -# If pageLimit is -1, bookie server will use 1/3 of JVM memory to compute -# the limitation of number of index pages. -pageLimit=0 - -############################################################################# -## Ledger manager settings -############################################################################# - -# Ledger Manager Class -# What kind of ledger manager is used to manage how ledgers are stored, managed -# and garbage collected. Try to read 'BookKeeper Internals' for detail info. -# ledgerManagerFactoryClass=org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory - -# @Drepcated - `ledgerManagerType` is deprecated in favor of using `ledgerManagerFactoryClass`. -# ledgerManagerType=hierarchical - -# Root Zookeeper path to store ledger metadata -# This parameter is used by zookeeper-based ledger manager as a root znode to -# store all ledgers. -zkLedgersRootPath=/ledgers - -############################################################################# -## Entry log settings -############################################################################# - -# Max file size of entry logger, in bytes -# A new entry log file will be created when the old one reaches the file size limitation -logSizeLimit=1073741824 - -# Enable/Disable entry logger preallocation -entryLogFilePreallocationEnabled=true - -# Entry log flush interval in bytes. -# Default is 0. 0 or less disables this feature and effectively flush -# happens on log rotation. -# Flushing in smaller chunks but more frequently reduces spikes in disk -# I/O. Flushing too frequently may also affect performance negatively. -# flushEntrylogBytes=0 - -# The number of bytes we should use as capacity for BufferedReadChannel. Default is 512 bytes. -readBufferSizeBytes=4096 - -# The number of bytes used as capacity for the write buffer. Default is 64KB. -writeBufferSizeBytes=65536 - -# Specifies if entryLog per ledger is enabled/disabled. If it is enabled, then there would be a -# active entrylog for each ledger. It would be ideal to enable this feature if the underlying -# storage device has multiple DiskPartitions or SSD and if in a given moment, entries of fewer -# number of active ledgers are written to a bookie. -# entryLogPerLedgerEnabled=false - -############################################################################# -## Entry log compaction settings -############################################################################# - -# Set the rate at which compaction will readd entries. The unit is adds per second. -compactionRate=1000 - -# If bookie is using hostname for registration and in ledger metadata then -# whether to use short hostname or FQDN hostname. Defaults to false. -# useShortHostName=false - -# Threshold of minor compaction -# For those entry log files whose remaining size percentage reaches below -# this threshold will be compacted in a minor compaction. -# If it is set to less than zero, the minor compaction is disabled. -minorCompactionThreshold=0.2 - -# Interval to run minor compaction, in seconds -# If it is set to less than zero, the minor compaction is disabled. -minorCompactionInterval=3600 - -# Set the maximum number of entries which can be compacted without flushing. -# When compacting, the entries are written to the entrylog and the new offsets -# are cached in memory. Once the entrylog is flushed the index is updated with -# the new offsets. This parameter controls the number of entries added to the -# entrylog before a flush is forced. A higher value for this parameter means -# more memory will be used for offsets. Each offset consists of 3 longs. -# This parameter should _not_ be modified unless you know what you're doing. -# The default is 100,000. -compactionMaxOutstandingRequests=100000 - -# Threshold of major compaction -# For those entry log files whose remaining size percentage reaches below -# this threshold will be compacted in a major compaction. -# Those entry log files whose remaining size percentage is still -# higher than the threshold will never be compacted. -# If it is set to less than zero, the minor compaction is disabled. -majorCompactionThreshold=0.5 - -# Interval to run major compaction, in seconds -# If it is set to less than zero, the major compaction is disabled. -majorCompactionInterval=86400 - -# Throttle compaction by bytes or by entries. -isThrottleByBytes=false - -# Set the rate at which compaction will readd entries. The unit is adds per second. -compactionRateByEntries=1000 - -# Set the rate at which compaction will readd entries. The unit is bytes added per second. -compactionRateByBytes=1000000 - -############################################################################# -## Statistics -############################################################################# - -# Whether statistics are enabled -# enableStatistics=true - -# Stats Provider Class (if statistics are enabled) -statsProviderClass=org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider - -# Default port for Prometheus metrics exporter -prometheusStatsHttpPort=8000 - -############################################################################# -## Read-only mode support -############################################################################# - -# If all ledger directories configured are full, then support only read requests for clients. -# If "readOnlyModeEnabled=true" then on all ledger disks full, bookie will be converted -# to read-only mode and serve only read requests. Otherwise the bookie will be shutdown. -# By default this will be disabled. -readOnlyModeEnabled=true - -# Whether the bookie is force started in read only mode or not -# forceReadOnlyBookie=false - -# Persiste the bookie status locally on the disks. So the bookies can keep their status upon restarts -# @Since 4.6 -# persistBookieStatusEnabled=false - -############################################################################# -## Disk utilization -############################################################################# - -# For each ledger dir, maximum disk space which can be used. -# Default is 0.95f. i.e. 95% of disk can be used at most after which nothing will -# be written to that partition. If all ledger dir partions are full, then bookie -# will turn to readonly mode if 'readOnlyModeEnabled=true' is set, else it will -# shutdown. -# Valid values should be in between 0 and 1 (exclusive). -diskUsageThreshold=0.95 - -# The disk free space low water mark threshold. -# Disk is considered full when usage threshold is exceeded. -# Disk returns back to non-full state when usage is below low water mark threshold. -# This prevents it from going back and forth between these states frequently -# when concurrent writes and compaction are happening. This also prevent bookie from -# switching frequently between read-only and read-writes states in the same cases. -# diskUsageWarnThreshold=0.95 - -# Set the disk free space low water mark threshold. Disk is considered full when -# usage threshold is exceeded. Disk returns back to non-full state when usage is -# below low water mark threshold. This prevents it from going back and forth -# between these states frequently when concurrent writes and compaction are -# happening. This also prevent bookie from switching frequently between -# read-only and read-writes states in the same cases. -# diskUsageLwmThreshold=0.90 - -# Disk check interval in milli seconds, interval to check the ledger dirs usage. -# Default is 10000 -diskCheckInterval=10000 - -############################################################################# -## ZooKeeper parameters -############################################################################# - -# A list of one of more servers on which Zookeeper is running. -# The server list can be comma separated values, for example: -# zkServers=zk1:2181,zk2:2181,zk3:2181 -zkServers=localhost:2181 - -# ZooKeeper client session timeout in milliseconds -# Bookie server will exit if it received SESSION_EXPIRED because it -# was partitioned off from ZooKeeper for more than the session timeout -# JVM garbage collection, disk I/O will cause SESSION_EXPIRED. -# Increment this value could help avoiding this issue -zkTimeout=30000 - -# The Zookeeper client backoff retry start time in millis. -# zkRetryBackoffStartMs=1000 - -# The Zookeeper client backoff retry max time in millis. -# zkRetryBackoffMaxMs=10000 - -# Set ACLs on every node written on ZooKeeper, this way only allowed users -# will be able to read and write BookKeeper metadata stored on ZooKeeper. -# In order to make ACLs work you need to setup ZooKeeper JAAS authentication -# all the bookies and Client need to share the same user, and this is usually -# done using Kerberos authentication. See ZooKeeper documentation -zkEnableSecurity=false - -############################################################################# -## Server parameters -############################################################################# - -# The flag enables/disables starting the admin http server. Default value is 'false'. -httpServerEnabled=false - -# The http server port to listen on. Default value is 8080. -# Use `8000` as the port to keep it consistent with prometheus stats provider -httpServerPort=8000 - -# The http server class -httpServerClass=org.apache.bookkeeper.http.vertx.VertxHttpServer - -# Configure a list of server components to enable and load on a bookie server. -# This provides the plugin run extra services along with a bookie server. -# -# extraServerComponents= - - -############################################################################# -## DB Ledger storage configuration -############################################################################# - -# These configs are used when the selected 'ledgerStorageClass' is -# org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage - -# Size of Write Cache. Memory is allocated from JVM direct memory. -# Write cache is used to buffer entries before flushing into the entry log -# For good performance, it should be big enough to hold a substantial amount -# of entries in the flush interval -# By default it will be allocated to 1/4th of the available direct memory -dbStorage_writeCacheMaxSizeMb= - -# Size of Read cache. Memory is allocated from JVM direct memory. -# This read cache is pre-filled doing read-ahead whenever a cache miss happens -# By default it will be allocated to 1/4th of the available direct memory -dbStorage_readAheadCacheMaxSizeMb= - -# How many entries to pre-fill in cache after a read cache miss -dbStorage_readAheadCacheBatchSize=1000 - -## RocksDB specific configurations -## DbLedgerStorage uses RocksDB to store the indexes from -## (ledgerId, entryId) -> (entryLog, offset) - -# Size of RocksDB block-cache. For best performance, this cache -# should be big enough to hold a significant portion of the index -# database which can reach ~2GB in some cases -# Default is to use 10% of the direct memory size -dbStorage_rocksDB_blockCacheSize= - -# Other RocksDB specific tunables -dbStorage_rocksDB_writeBufferSizeMB=64 -dbStorage_rocksDB_sstSizeInMB=64 -dbStorage_rocksDB_blockSize=65536 -dbStorage_rocksDB_bloomFilterBitsPerKey=10 -dbStorage_rocksDB_numLevels=-1 -dbStorage_rocksDB_numFilesInLevel0=4 -dbStorage_rocksDB_maxSizeInLevel1MB=256 diff --git a/kafka-impl/conf/global_zookeeper.conf b/kafka-impl/conf/global_zookeeper.conf deleted file mode 100644 index b763e9f630..0000000000 --- a/kafka-impl/conf/global_zookeeper.conf +++ /dev/null @@ -1,45 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# The number of milliseconds of each tick -tickTime=2000 -# The number of ticks that the initial -# synchronization phase can take -initLimit=10 -# The number of ticks that can pass between -# sending a request and getting an acknowledgement -syncLimit=5 -# the directory where the snapshot is stored. -dataDir=data/global-zookeeper -# the port at which the clients will connect -clientPort=2184 - -# the port at which the admin will listen -admin.enableServer=true -admin.serverPort=9991 - -# the maximum number of client connections. -# increase this if you need to handle more clients -#maxClientCnxns=60 -# -# Be sure to read the maintenance section of the -# administrator guide before turning on autopurge. -# -# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance -# -# The number of snapshots to retain in dataDir -autopurge.snapRetainCount=3 -# Purge task interval in hours -# Set to "0" to disable auto purge feature -autopurge.purgeInterval=1 diff --git a/kafka-impl/conf/kop.conf b/kafka-impl/conf/kop.conf deleted file mode 100755 index 1ce5f66829..0000000000 --- a/kafka-impl/conf/kop.conf +++ /dev/null @@ -1,892 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -### --- Kafka broker settings --- ### - -# The messaging Protocols that avilabale when loaded by Pulsar Broker. -messagingProtocols=kafka - -# ListenersProp for Kafka service(host should follow the advertisedAddress). -# e.g. PLAINTEXT://localhost:9092,SSL://localhost:9093 -# when KoP runs as a plugin in Pulsar, if it is not set, kop will use PLAINTEXT://advertisedAddress:9092 -listeners=PLAINTEXT://127.0.0.1:9092 - -# Kafka on Pulsar Broker tenant -kafkaTenant=public - -# Kafka on Pulsar Broker namespace -kafkaNamespace=default - -# The tenant used for storing Kafka metadata topics -kafkaMetadataTenant=public - -# The namespace used for storing Kafka metadata topics -kafkaMetadataNamespace=__kafka - -# Flag to enable group coordinator -enableGroupCoordinator=true - -# The minimum allowed session timeout for registered consumers. -# Shorter timeouts result in quicker failure detection at the cost -# of more frequent consumer heartbeating, which can overwhelm broker resources. -groupMinSessionTimeoutMs=6000 - -# The maximum allowed session timeout for registered consumers. -# Longer timeouts give consumers more time to process messages in -# between heartbeats at the cost of a longer time to detect failures. -groupMaxSessionTimeoutMs=300000 - -# The amount of time the group coordinator will wait for more consumers -# to join a new group before performing the first rebalance. A longer -# delay means potentially fewer rebalances, but increases the time until -# processing begins -groupInitialRebalanceDelayMs=3000 - -# Compression codec for the offsets topic - compression may be used to achieve "atomic" commits -offsetsTopicCompressionCodec=NONE - -# The maximum size in Bytes for a metadata entry associated with an offset commit -offsetMetadataMaxSize=4096 - -# Offsets older than this retention period will be discarded, default 7 days -offsetsRetentionMinutes=10080 - -# Frequency at which to check for stale offsets -offsetsRetentionCheckIntervalMs=600000 - -# Number of partitions for the offsets topic -offsetsTopicNumPartitions=8 - -# Maximum number of entries that are read from cursor once per time -maxReadEntriesNum=5 - -# The format of an entry. The default value is pulsar. -# Optional values: [pulsar, kafka] -# -# pulsar: -# When KoP receives messages from kafka producer, it will serialize these messages to -# the format so that pulsar consumer can read directly. -# When KoP sends entries to kafka consumer, it will treat each entry as pulsar's -# format and deserialize each entry to kafka's format. -# -# kafka: -# When KoP receives messages from kafka producer, add a header which is PulsarApi.Metadata -# before the messages' bytes, and then write to BK directly. -# When KoP sends entries to kafka consumer, it will treat each entry as kafka's format and -# just discard the pulsar header and send left bytes to Kafka consumer. -# This mode means that current pulsar clients cannot interact with kafka clients, but -# kafka producer works well with kafka consumer. -entryFormat=pulsar - -# Zookeeper path for storing kop consumer group -groupIdZooKeeperPath=/client_group_id - -### --- KoP SSL configs--- ### - -# Kafka ssl configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocol -kopSslProtocol=TLS - -# Kafka ssl configuration map with: SSL_PROVIDER_CONFIG = ssl.provider -kopSslProvider= - -# Kafka ssl configuration map with: SSL_CIPHER_SUITES_CONFIG = ssl.cipher.suites -kopSslCipherSuites= - -# Kafka ssl configuration map with: SSL_ENABLED_PROTOCOLS_CONFIG = ssl.enabled.protocols -kopSslEnabledProtocols=TLSv1.2,TLSv1.1,TLSv1 - -# Kafka ssl configuration map with: SSL_KEYSTORE_TYPE_CONFIG = ssl.keystore.type -kopSslKeystoreType=JKS - -# Kafka ssl configuration map with: SSL_KEYSTORE_LOCATION_CONFIG = ssl.keystore.location -kopSslKeystoreLocation= - -# Kafka ssl configuration map with: SSL_KEYSTORE_PASSWORD_CONFIG = ssl.keystore.password -kopSslKeystorePassword= - -# Kafka ssl configuration map with: SSL_KEY_PASSWORD_CONFIG = ssl.key.password -kopSslKeyPassword= - -# Kafka ssl configuration map with: SSL_TRUSTSTORE_TYPE_CONFIG = ssl.truststore.type -kopSslTruststoreType=JKS - -# Kafka ssl configuration map with: SSL_TRUSTSTORE_LOCATION_CONFIG = ssl.truststore.location -kopSslTruststoreLocation= - -# Kafka ssl configuration map with: SSL_TRUSTSTORE_PASSWORD_CONFIG = ssl.truststore.password -kopSslTruststorePassword= - -# Kafka ssl configuration map with: SSL_KEYMANAGER_ALGORITHM_CONFIG = ssl.keymanager.algorithm -kopSslKeymanagerAlgorithm=SunX509 - -# Kafka ssl configuration map with: SSL_TRUSTMANAGER_ALGORITHM_CONFIG = ssl.trustmanager.algorithm -kopSslTrustmanagerAlgorithm=SunX509 - -# Kafka ssl configuration map with: -# SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG = ssl.secure.random.implementation -kopSslSecureRandomImplementation= - -# supported SASL mechanisms exposed by broker -saslAllowedMechanisms= - -### --- Changed for KoP --- ### - -# Enable the deletion of inactive topics -brokerDeleteInactiveTopicsEnabled=false - -allowAutoTopicCreation=true - -allowAutoTopicCreationType=partitioned - -# Name of the cluster to which this broker belongs to -clusterName=kafka-cluster - -### --- General broker settings --- ### - -# Max Rate(in 1 seconds) of Message allowed to publish for a broker if broker publish rate limiting enabled -# (Disable message rate limit with value 0) -brokerPublisherThrottlingMaxMessageRate=0 - -# Max Rate(in 1 seconds) of Byte allowed to publish for a broker if broker publish rate limiting enabled. -# (Disable byte rate limit with value 0) -brokerPublisherThrottlingMaxByteRate=0 - -# Zookeeper quorum connection string -zookeeperServers=localhost:2181 - -# Configuration Store connection string -configurationStoreServers= - -# Broker data port -brokerServicePort=6650 - -# Broker data port for TLS - By default TLS is disabled -brokerServicePortTls= - -# Port to use to server HTTP request -webServicePort=8080 - -# Port to use to server HTTPS request - By default TLS is disabled -webServicePortTls= - -# Hostname or IP address the service binds on, default is 0.0.0.0. -bindAddress=0.0.0.0 - -# Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used. -advertisedAddress= - -# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors() -numIOThreads= - -# Number of threads to use for HTTP requests processing. Default is set to 2 * Runtime.getRuntime().availableProcessors() -numHttpServerThreads= - -# Flag to control features that are meant to be used when running in standalone mode -isRunningStandalone= - -# Enable cluster's failure-domain which can distribute brokers into logical region -failureDomainsEnabled=false - -# Zookeeper session timeout in milliseconds -zooKeeperSessionTimeoutMillis=30000 - -# ZooKeeper operation timeout in seconds -zooKeeperOperationTimeoutSeconds=30 - -# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed -brokerShutdownTimeoutMs=60000 - -# Enable backlog quota check. Enforces action on topic when the quota is reached -backlogQuotaCheckEnabled=true - -# How often to check for topics that have reached the quota -backlogQuotaCheckIntervalInSeconds=60 - -# Default per-topic backlog quota limit, less than 0 means no limitation. default is -1. -backlogQuotaDefaultLimitGB=-1 - -# Default backlog quota retention policy. Default is producer_request_hold -# 'producer_request_hold' Policy which holds producer's send request until the resource becomes available (or holding times out) -# 'producer_exception' Policy which throws javax.jms.ResourceAllocationException to the producer -# 'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog -backlogQuotaDefaultRetentionPolicy=producer_request_hold - -# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) -ttlDurationDefaultInSeconds=0 - -# Enable the deletion of inactive topics -brokerDeleteInactiveTopicsEnabled=true - -# How often to check for inactive topics -brokerDeleteInactiveTopicsFrequencySeconds=60 - -# How frequently to proactively check and purge expired messages -messageExpiryCheckIntervalInMinutes=5 - -# How long to delay rewinding cursor and dispatching messages when active consumer is changed -activeConsumerFailoverDelayTimeMillis=1000 - -# How long to delete inactive subscriptions from last consuming -# When it is 0, inactive subscriptions are not deleted automatically -subscriptionExpirationTimeMinutes=0 - -# Enable subscription message redelivery tracker to send redelivery count to consumer (default is enabled) -subscriptionRedeliveryTrackerEnabled=true - -# How frequently to proactively check and purge expired subscription -subscriptionExpiryCheckIntervalInMinutes=5 - -# Enable Key_Shared subscription (default is enabled) -subscriptionKeySharedEnable=true - -# Set the default behavior for message deduplication in the broker -# This can be overridden per-namespace. If enabled, broker will reject -# messages that were already stored in the topic -brokerDeduplicationEnabled=false - -# Maximum number of producer information that it's going to be -# persisted for deduplication purposes -brokerDeduplicationMaxNumberOfProducers=10000 - -# Number of entries after which a dedup info snapshot is taken. -# A larger interval will lead to fewer snapshots being taken, though it would -# increase the topic recovery time when the entries published after the -# snapshot need to be replayed. -brokerDeduplicationEntriesInterval=1000 - -# Time of inactivity after which the broker will discard the deduplication information -# relative to a disconnected producer. Default is 6 hours. -brokerDeduplicationProducerInactivityTimeoutMinutes=360 - -# When a namespace is created without specifying the number of bundle, this -# value will be used as the default -defaultNumberOfNamespaceBundles=4 - -# Enable check for minimum allowed client library version -clientLibraryVersionCheckEnabled=false - -# Path for the file used to determine the rotation status for the broker when responding -# to service discovery health checks -statusFilePath= - -# If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to -# use only brokers running the latest software version (to minimize impact to bundles) -preferLaterVersions=false - -# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending -# messages to consumer once, this limit reaches until consumer starts acknowledging messages back. -# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction -maxUnackedMessagesPerConsumer=50000 - -# Max number of unacknowledged messages allowed per shared subscription. Broker will stop dispatching messages to -# all consumers of the subscription once this limit reaches until consumer starts acknowledging messages back and -# unack count reaches to limit/2. Using a value of 0, is disabling unackedMessage-limit -# check and dispatcher can dispatch messages without any restriction -maxUnackedMessagesPerSubscription=200000 - -# Max number of unacknowledged messages allowed per broker. Once this limit reaches, broker will stop dispatching -# messages to all shared subscription which has higher number of unack messages until subscriptions start -# acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling -# unackedMessage-limit check and broker doesn't block dispatchers -maxUnackedMessagesPerBroker=0 - -# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages -# than this percentage limit and subscription will not receive any new messages until that subscription acks back -# limit/2 messages -maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 - -# Too many subscribe requests from a consumer can cause broker rewinding consumer cursors and loading data from bookies, -# hence causing high network bandwidth usage -# When the positive value is set, broker will throttle the subscribe requests for one consumer. -# Otherwise, the throttling will be disabled. The default value of this setting is 0 - throttling is disabled. -subscribeThrottlingRatePerConsumer=0 - -# Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s. -subscribeRatePeriodPerConsumerInSecond=30 - -# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default -# message dispatch-throttling -dispatchThrottlingRatePerTopicInMsg=0 - -# Default bytes per second dispatch throttling-limit for every topic. Using a value of 0, is disabling -# default message-byte dispatch-throttling -dispatchThrottlingRatePerTopicInByte=0 - -# Default number of message dispatching throttling-limit for a subscription. -# Using a value of 0, is disabling default message dispatch-throttling. -dispatchThrottlingRatePerSubscriptionInMsg=0 - -# Default number of message-bytes dispatching throttling-limit for a subscription. -# Using a value of 0, is disabling default message-byte dispatch-throttling. -dispatchThrottlingRatePerSubscriptionInByte=0 - -# Default messages per second dispatch throttling-limit for every replicator in replication. -# Using a value of 0, is disabling replication message dispatch-throttling -dispatchThrottlingRatePerReplicatorInMsg=0 - -# Default bytes per second dispatch throttling-limit for every replicator in replication. -# Using a value of 0, is disabling replication message-byte dispatch-throttling -dispatchThrottlingRatePerReplicatorInByte=0 - -# By default we enable dispatch-throttling for both caught up consumers as well as consumers who have -# backlog. -dispatchThrottlingOnNonBacklogConsumerEnabled=true - -# Max number of entries to read from bookkeeper. By default it is 100 entries. -dispatcherMaxReadBatchSize=100 - -# Min number of entries to read from bookkeeper. By default it is 1 entries. -# When there is an error occurred on reading entries from bookkeeper, the broker -# will backoff the batch size to this minimum number." -dispatcherMinReadBatchSize=1 - -# Max number of entries to dispatch for a shared subscription. By default it is 20 entries. -dispatcherMaxRoundRobinBatchSize=20 - -# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic -maxConcurrentLookupRequest=50000 - -# Max number of concurrent topic loading request broker allows to control number of zk-operations -maxConcurrentTopicLoadRequest=5000 - -# Max concurrent non-persistent message can be processed per connection -maxConcurrentNonPersistentMessagePerConnection=1000 - -# Number of worker threads to serve non-persistent topic -numWorkerThreadsForNonPersistentTopic=8 - -# Enable broker to load persistent topics -enablePersistentTopics=true - -# Enable broker to load non-persistent topics -enableNonPersistentTopics=true - -# Enable to run bookie along with broker -enableRunBookieTogether=false - -# Enable to run bookie autorecovery along with broker -enableRunBookieAutoRecoveryTogether=false - -# Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers -# until the number of connected producers decrease. -# Using a value of 0, is disabling maxProducersPerTopic-limit check. -maxProducersPerTopic=0 - -# Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers -# until the number of connected consumers decrease. -# Using a value of 0, is disabling maxConsumersPerTopic-limit check. -maxConsumersPerTopic=0 - -# Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers -# until the number of connected consumers decrease. -# Using a value of 0, is disabling maxConsumersPerSubscription-limit check. -maxConsumersPerSubscription=0 - -# Max size of messages. -maxMessageSize=5242880 - -# Interval between checks to see if topics with compaction policies need to be compacted -brokerServiceCompactionMonitorIntervalInSeconds=60 - -# Whether to enable the delayed delivery for messages. -# If disabled, messages will be immediately delivered and there will -# be no tracking overhead. -delayedDeliveryEnabled=true - -# Control the tick time for when retrying on delayed delivery, -# affecting the accuracy of the delivery time compared to the scheduled time. -# Default is 1 second. -delayedDeliveryTickTimeMillis=1000 - -# Enable tracking of replicated subscriptions state across clusters. -enableReplicatedSubscriptions=true - -# Frequency of snapshots for replicated subscriptions tracking. -replicatedSubscriptionsSnapshotFrequencyMillis=1000 - -# Timeout for building a consistent snapshot for tracking replicated subscriptions state. -replicatedSubscriptionsSnapshotTimeoutSeconds=30 - -# Max number of snapshot to be cached per subscription. -replicatedSubscriptionsSnapshotMaxCachedPerSubscription=10 - -### --- Authentication --- ### -# Role names that are treated as "proxy roles". If the broker sees a request with -#role as proxyRoles - it will demand to see a valid original principal. -proxyRoles= - -# If this flag is set then the broker authenticates the original Auth data -# else it just accepts the originalPrincipal and authorizes it (if required). -authenticateOriginalAuthData=false - -# Deprecated - Use webServicePortTls and brokerServicePortTls instead -tlsEnabled=false - -# Tls cert refresh duration in seconds (set 0 to check on every new connection) -tlsCertRefreshCheckDurationSec=300 - -# Path for the TLS certificate file -tlsCertificateFilePath= - -# Path for the TLS private key file -tlsKeyFilePath= - -# Path for the trusted TLS certificate file. -# This cert is used to verify that any certs presented by connecting clients -# are signed by a certificate authority. If this verification -# fails, then the certs are untrusted and the connections are dropped. -tlsTrustCertsFilePath= - -# Accept untrusted TLS certificate from client. -# If true, a client with a cert which cannot be verified with the -# 'tlsTrustCertsFilePath' cert will allowed to connect to the server, -# though the cert will not be used for client authentication. -tlsAllowInsecureConnection=false - -# Specify the tls protocols the broker will use to negotiate during TLS handshake -# (a comma-separated list of protocol names). -# Examples:- [TLSv1.2, TLSv1.1, TLSv1] -tlsProtocols= - -# Specify the tls cipher the broker will use to negotiate during TLS Handshake -# (a comma-separated list of ciphers). -# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256] -tlsCiphers= - -# Trusted client certificates are required for to connect TLS -# Reject the Connection if the Client Certificate is not trusted. -# In effect, this requires that all connecting clients perform TLS client -# authentication. -tlsRequireTrustedClientCertOnConnect=false - -### --- Authentication --- ### - -# Enable authentication -authenticationEnabled=false - -# Autentication provider name list, which is comma separated list of class names -authenticationProviders= - -# Enforce authorization -authorizationEnabled=false - -# Authorization provider fully qualified class-name -authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider - -# Allow wildcard matching in authorization -# (wildcard matching only applicable if wildcard-char: -# * presents at first or last position eg: *.pulsar.service, pulsar.service.*) -authorizationAllowWildcardsMatching=false - -# Role names that are treated as "super-user", meaning they will be able to do all admin -# operations and publish/consume from all topics -superUserRoles= - -# Authentication settings of the broker itself. Used when the broker connects to other brokers, -# either in same or other clusters -brokerClientTlsEnabled=false -brokerClientAuthenticationPlugin= -brokerClientAuthenticationParameters= -brokerClientTrustCertsFilePath= - -# Supported Athenz provider domain names(comma separated) for authentication -athenzDomainNames= - -# When this parameter is not empty, unauthenticated users perform as anonymousUserRole -anonymousUserRole= - -### --- Token Authentication Provider --- ### - -## Symmetric key -# Configure the secret key to be used to validate auth tokens -# The key can be specified like: -# tokenSecretKey=data:base64,xxxxxxxxx -# tokenSecretKey=file:///my/secret.key -tokenSecretKey= - -## Asymmetric public/private key pair -# Configure the public key to be used to validate auth tokens -# The key can be specified like: -# tokenPublicKey=data:base64,xxxxxxxxx -# tokenPublicKey=file:///my/public.key -tokenPublicKey= - -# The token "claim" that will be interpreted as the authentication "role" or "principal" by AuthenticationProviderToken (defaults to "sub" if blank) -tokenAuthClaim= - -### --- SASL Authentication Provider --- ### - -# This is a regexp, which limits the range of possible ids which can connect to the Broker using SASL. -# Default value: `SaslConstants.JAAS_CLIENT_ALLOWED_IDS_DEFAULT`, which is ".*pulsar.*", -# so only clients whose id contains 'pulsar' are allowed to connect. -saslJaasClientAllowedIds= - -# Service Principal, for login context name. -# Default value `SaslConstants.JAAS_DEFAULT_BROKER_SECTION_NAME`, which is "Broker". -saslJaasBrokerSectionName= - -### --- BookKeeper Client --- ### - -# Authentication plugin to use when connecting to bookies -bookkeeperClientAuthenticationPlugin= - -# BookKeeper auth plugin implementatation specifics parameters name and values -bookkeeperClientAuthenticationParametersName= -bookkeeperClientAuthenticationParameters= - -# Timeout for BK add / read operations -bookkeeperClientTimeoutInSeconds=30 - -# Speculative reads are initiated if a read request doesn't complete within a certain time -# Using a value of 0, is disabling the speculative reads -bookkeeperClientSpeculativeReadTimeoutInMillis=0 - -# Use older Bookkeeper wire protocol with bookie -bookkeeperUseV2WireProtocol=true - -# Enable bookies health check. Bookies that have more than the configured number of failure within -# the interval will be quarantined for some time. During this period, new ledgers won't be created -# on these bookies -bookkeeperClientHealthCheckEnabled=true -bookkeeperClientHealthCheckIntervalSeconds=60 -bookkeeperClientHealthCheckErrorThresholdPerInterval=5 -bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800 - -# Enable rack-aware bookie selection policy. BK will chose bookies from different racks when -# forming a new bookie ensemble -bookkeeperClientRackawarePolicyEnabled=true - -# Enable region-aware bookie selection policy. BK will chose bookies from -# different regions and racks when forming a new bookie ensemble -# If enabled, the value of bookkeeperClientRackawarePolicyEnabled is ignored -bookkeeperClientRegionawarePolicyEnabled=false - -# Enable/disable reordering read sequence on reading entries. -bookkeeperClientReorderReadSequenceEnabled=false - -# Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie -# outside the specified groups will not be used by the broker -bookkeeperClientIsolationGroups= - -# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't -# have enough bookie available. -bookkeeperClientSecondaryIsolationGroups= - -# Minimum bookies that should be available as part of bookkeeperClientIsolationGroups -# else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list. -bookkeeperClientMinAvailableBookiesInIsolationGroups= - -# Enable/disable having read operations for a ledger to be sticky to a single bookie. -# If this flag is enabled, the client will use one single bookie (by preference) to read -# all entries for a ledger. -# -# Disable Sticy Read until {@link https://github.com/apache/bookkeeper/issues/1970} is fixed -bookkeeperEnableStickyReads=false - -### --- Managed Ledger --- ### - -# Number of bookies to use when creating a ledger -managedLedgerDefaultEnsembleSize=1 - -# Number of copies to store for each message -managedLedgerDefaultWriteQuorum=1 - -# Number of guaranteed copies (acks to wait before write is complete) -managedLedgerDefaultAckQuorum=1 - -# Default type of checksum to use when writing to BookKeeper. Default is "CRC32C" -# Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum). -managedLedgerDigestType=CRC32C - -# Number of threads to be used for managed ledger tasks dispatching -managedLedgerNumWorkerThreads=8 - -# Number of threads to be used for managed ledger scheduled tasks -managedLedgerNumSchedulerThreads=8 - -# Amount of memory to use for caching data payload in managed ledger. This memory -# is allocated from JVM direct memory and it's shared across all the topics -# running in the same broker. By default, uses 1/5th of available direct memory -managedLedgerCacheSizeMB= - -# Whether we should make a copy of the entry payloads when inserting in cache -managedLedgerCacheCopyEntries=false - -# Threshold to which bring down the cache level when eviction is triggered -managedLedgerCacheEvictionWatermark=0.9 - -# Configure the cache eviction frequency for the managed ledger cache (evictions/sec) -managedLedgerCacheEvictionFrequency=100.0 - -# All entries that have stayed in cache for more than the configured time, will be evicted -managedLedgerCacheEvictionTimeThresholdMillis=1000 - -# Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged' -# and thus should be set as inactive. -managedLedgerCursorBackloggedThreshold=1000 - -# Rate limit the amount of writes per second generated by consumer acking the messages -managedLedgerDefaultMarkDeleteRateLimit=1.0 - -# Max number of entries to append to a ledger before triggering a rollover -# A ledger rollover is triggered on these conditions -# * Either the max rollover time has been reached -# * or max entries have been written to the ledged and at least min-time -# has passed -managedLedgerMaxEntriesPerLedger=50000 - -# Minimum time between ledger rollover for a topic -managedLedgerMinLedgerRolloverTimeMinutes=10 - -# Maximum time before forcing a ledger rollover for a topic -managedLedgerMaxLedgerRolloverTimeMinutes=240 - -# Delay between a ledger being successfully offloaded to long term storage -# and the ledger being deleted from bookkeeper (default is 4 hours) -managedLedgerOffloadDeletionLagMs=14400000 - -# Max number of entries to append to a cursor ledger -managedLedgerCursorMaxEntriesPerLedger=50000 - -# Max time before triggering a rollover on a cursor ledger -managedLedgerCursorRolloverTimeInSeconds=14400 - -# Max number of "acknowledgment holes" that are going to be persistently stored. -# When acknowledging out of order, a consumer will leave holes that are supposed -# to be quickly filled by acking all the messages. The information of which -# messages are acknowledged is persisted by compressing in "ranges" of messages -# that were acknowledged. After the max number of ranges is reached, the information -# will only be tracked in memory and messages will be redelivered in case of -# crashes. -managedLedgerMaxUnackedRangesToPersist=10000 - -# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher -# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into -# zookeeper. -managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 - -# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets -# corrupted at bookkeeper and managed-cursor is stuck at that ledger. -autoSkipNonRecoverableData=false - -# operation timeout while updating managed-ledger metadata. -managedLedgerMetadataOperationsTimeoutSeconds=60 - -# Read entries timeout when broker tries to read messages from bookkeeper. -managedLedgerReadEntryTimeoutSeconds=0 - -# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it). -managedLedgerAddEntryTimeoutSeconds=0 - -### --- Load balancer --- ### - -# Enable load balancer -loadBalancerEnabled=true - -# Percentage of change to trigger load report update -loadBalancerReportUpdateThresholdPercentage=10 - -# maximum interval to update load report -loadBalancerReportUpdateMaxIntervalMinutes=15 - -# Frequency of report to collect -loadBalancerHostUsageCheckIntervalMinutes=1 - -# Enable/disable automatic bundle unloading for load-shedding -loadBalancerSheddingEnabled=true - -# Load shedding interval. Broker periodically checks whether some traffic should be offload from -# some over-loaded broker to other under-loaded brokers -loadBalancerSheddingIntervalMinutes=1 - -# Prevent the same topics to be shed and moved to other broker more that once within this timeframe -loadBalancerSheddingGracePeriodMinutes=30 - -# Usage threshold to allocate max number of topics to broker -loadBalancerBrokerMaxTopics=50000 - -# Usage threshold to determine a broker as over-loaded -loadBalancerBrokerOverloadedThresholdPercentage=85 - -# Interval to flush dynamic resource quota to ZooKeeper -loadBalancerResourceQuotaUpdateIntervalMinutes=15 - -# enable/disable namespace bundle auto split -loadBalancerAutoBundleSplitEnabled=true - -# enable/disable automatic unloading of split bundles -loadBalancerAutoUnloadSplitBundlesEnabled=true - -# maximum topics in a bundle, otherwise bundle split will be triggered -loadBalancerNamespaceBundleMaxTopics=1000 - -# maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered -loadBalancerNamespaceBundleMaxSessions=1000 - -# maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered -loadBalancerNamespaceBundleMaxMsgRate=30000 - -# maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered -loadBalancerNamespaceBundleMaxBandwidthMbytes=100 - -# maximum number of bundles in a namespace -loadBalancerNamespaceMaximumBundles=128 - -# Override the auto-detection of the network interfaces max speed. -# This option is useful in some environments (eg: EC2 VMs) where the max speed -# reported by Linux is not reflecting the real bandwidth available to the broker. -# Since the network usage is employed by the load manager to decide when a broker -# is overloaded, it is important to make sure the info is correct or override it -# with the right value here. The configured value can be a double (eg: 0.8) and that -# can be used to trigger load-shedding even before hitting on NIC limits. -loadBalancerOverrideBrokerNicSpeedGbps= - -# Name of load manager to use -loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - -### --- Replication --- ### - -# Enable replication metrics -replicationMetricsEnabled=true - -# Max number of connections to open for each broker in a remote cluster -# More connections host-to-host lead to better throughput over high-latency -# links. -replicationConnectionsPerBroker=16 - -# Replicator producer queue size -replicationProducerQueueSize=1000 - -# Replicator prefix used for replicator producer name and cursor name -replicatorPrefix=pulsar.repl - -# Default message retention time -defaultRetentionTimeInMinutes=600 - -# Default retention size -defaultRetentionSizeInMB=1000 - -# How often to check whether the connections are still alive -keepAliveIntervalSeconds=30 - -# How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected) -brokerServicePurgeInactiveFrequencyInSeconds=60 - -# bootstrap namespaces -bootstrapNamespaces= - -### --- WebSocket --- ### - -# Enable the WebSocket API service in broker -webSocketServiceEnabled=false - -# Number of IO threads in Pulsar Client used in WebSocket proxy -webSocketNumIoThreads=8 - -# Number of connections per Broker in Pulsar Client used in WebSocket proxy -webSocketConnectionsPerBroker=8 - -# Time in milliseconds that idle WebSocket session times out -webSocketSessionIdleTimeoutMillis=300000 - -### --- Metrics --- ### - -# Enable topic level metrics -exposeTopicLevelMetricsInPrometheus=true - -# Enable consumer level metrics. default is false -exposeConsumerLevelMetricsInPrometheus=false - -# Classname of Pluggable JVM GC metrics logger that can log GC specific metrics -# jvmGCMetricsLoggerClassName= - -### --- Functions --- ### - -# Enable Functions Worker Service in Broker -functionsWorkerEnabled=false - -### --- Broker Web Stats --- ### - -# Enable topic level metrics -exposePublisherStats=true -statsUpdateFrequencyInSecs=60 -statsUpdateInitialDelayInSecs=60 - -### --- Schema storage --- ### -# The schema storage implementation used by this broker -schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory - -# Enforce schema validation on following cases: -# -# - if a producer without a schema attempts to produce to a topic with schema, the producer will be -# failed to connect. PLEASE be carefully on using this, since non-java clients don't support schema. -# if you enable this setting, it will cause non-java clients failed to produce. -isSchemaValidationEnforced=false - -### --- Ledger Offloading --- ### - -# The directory for all the offloader implementations -offloadersDirectory=./offloaders - -# Driver to use to offload old data to long term storage (Possible values: S3, aws-s3, google-cloud-storage) -# When using google-cloud-storage, Make sure both Google Cloud Storage and Google Cloud Storage JSON API are enabled for -# the project (check from Developers Console -> Api&auth -> APIs). -managedLedgerOffloadDriver= - -# Maximum number of thread pool threads for ledger offloading -managedLedgerOffloadMaxThreads=2 - -# Use Open Range-Set to cache unacked messages -managedLedgerUnackedRangesOpenCacheSetEnabled=true - -# For Amazon S3 ledger offload, AWS region -s3ManagedLedgerOffloadRegion= - -# For Amazon S3 ledger offload, Bucket to place offloaded ledger into -s3ManagedLedgerOffloadBucket= - -# For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing) -s3ManagedLedgerOffloadServiceEndpoint= - -# For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) -s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864 - -# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default) -s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576 - -# For Google Cloud Storage ledger offload, region where offload bucket is located. -# reference this page for more details: https://cloud.google.com/storage/docs/bucket-locations -gcsManagedLedgerOffloadRegion= - -# For Google Cloud Storage ledger offload, Bucket to place offloaded ledger into -gcsManagedLedgerOffloadBucket= - -# For Google Cloud Storage ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) -gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864 - -# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MB by default) -gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576 - -# For Google Cloud Storage, path to json file containing service account credentials. -# For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849 -gcsManagedLedgerOffloadServiceAccountKeyFile= - -### --- Deprecated config variables --- ### - -# Deprecated. Use configurationStoreServers -globalZookeeperServers= - -# Deprecated - Enable TLS when talking with other clusters to replicate messages -replicationTlsEnabled=false - diff --git a/kafka-impl/conf/kop_env.sh b/kafka-impl/conf/kop_env.sh deleted file mode 100755 index e6fa35a3ed..0000000000 --- a/kafka-impl/conf/kop_env.sh +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env bash -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -# Set JAVA_HOME here to override the environment setting -# JAVA_HOME= - -# Log4j configuration file -# KOP_LOG_CONF= - -# Configuration file of settings used in Kop -# KOP_CONF= - -# Extra options to be passed to the jvm -KOP_MEM=${KOP_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"} - -# Garbage collection options -KOP_GC=" -XX:+UseG1GC -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC -XX:-ResizePLAB" - -# Extra options to be passed to the jvm -KOP_EXTRA_OPTS="${KOP_EXTRA_OPTS} ${KOP_MEM} ${KOP_GC} -Dio.netty.leakDetectionLevel=disabled -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024" - diff --git a/kafka-impl/conf/kop_standalone.conf b/kafka-impl/conf/kop_standalone.conf deleted file mode 100644 index 92a027c55a..0000000000 --- a/kafka-impl/conf/kop_standalone.conf +++ /dev/null @@ -1,699 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -### --- Kafka broker settings --- ### - -# The messaging Protocols that avilabale when loaded by Pulsar Broker. -messagingProtocols=kafka - -# ListenersProp for Kafka service(host should follow the advertisedAddress). -# e.g. PLAINTEXT://localhost:9092,SSL://localhost:9093 -# when KoP runs as a plugin in Pulsar, if it is not set, kop will use PLAINTEXT://advertisedAddress:9092 -listeners=PLAINTEXT://127.0.0.1:9092 - -# Kafka on Pulsar Broker tenant -kafkaTenant=public - -# Kafka on Pulsar Broker namespace -kafkaNamespace=default - -# The tenant used for storing Kafka metadata topics -kafkaMetadataTenant=public - -# The namespace used for storing Kafka metadata topics -kafkaMetadataNamespace=__kafka - -# Flag to enable group coordinator -enableGroupCoordinator=true - -# The minimum allowed session timeout for registered consumers. -# Shorter timeouts result in quicker failure detection at the cost -# of more frequent consumer heartbeating, which can overwhelm broker resources. -groupMinSessionTimeoutMs=6000 - -# The maximum allowed session timeout for registered consumers. -# Longer timeouts give consumers more time to process messages in -# between heartbeats at the cost of a longer time to detect failures. -groupMaxSessionTimeoutMs=300000 - -# The amount of time the group coordinator will wait for more consumers -# to join a new group before performing the first rebalance. A longer -# delay means potentially fewer rebalances, but increases the time until -# processing begins -groupInitialRebalanceDelayMs=3000 - -# Compression codec for the offsets topic - compression may be used to achieve "atomic" commits -offsetsTopicCompressionCodec=NONE - -# The maximum size in Bytes for a metadata entry associated with an offset commit -offsetMetadataMaxSize=4096 - -# Offsets older than this retention period will be discarded, default 7 days -offsetsRetentionMinutes=10080 - -# Frequency at which to check for stale offsets -offsetsRetentionCheckIntervalMs=600000 - -# Number of partitions for the offsets topic -offsetsTopicNumPartitions=8 - -# Maximum number of entries that are read from cursor once per time -maxReadEntriesNum=1 - -# The format of an entry. The default value is pulsar. -# Optional values: [pulsar] -# -# pulsar: -# When KoP receives messages from kafka producer, it will serialize these messages to -# the format so that pulsar consumer can read directly. -# When KoP sends entries to kafka consumer, it will treat each entry as pulsar's -# format and deserialize each entry to kafka's format. -# -# kafka: -# When KoP receives messages from kafka producer, add a header which is PulsarApi.Metadata -# before the messages' bytes, and then write to BK directly. -# When KoP sends entries to kafka consumer, it will treat each entry as kafka's format and -# just discard the pulsar header and send left bytes to Kafka consumer. -# This mode means that current pulsar clients cannot interact with kafka clients, but -# kafka producer works well with kafka consumer. -entryFormat=pulsar - -### --- KoP SSL configs--- ### - -# Kafka ssl configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocol -kopSslProtocol=TLS - -# Kafka ssl configuration map with: SSL_PROVIDER_CONFIG = ssl.provider -kopSslProvider= - -# Kafka ssl configuration map with: SSL_CIPHER_SUITES_CONFIG = ssl.cipher.suites -kopSslCipherSuites= - -# Kafka ssl configuration map with: SSL_ENABLED_PROTOCOLS_CONFIG = ssl.enabled.protocols -kopSslEnabledProtocols=TLSv1.2,TLSv1.1,TLSv1 - -# Kafka ssl configuration map with: SSL_KEYSTORE_TYPE_CONFIG = ssl.keystore.type -kopSslKeystoreType=JKS - -# Kafka ssl configuration map with: SSL_KEYSTORE_LOCATION_CONFIG = ssl.keystore.location -kopSslKeystoreLocation= - -# Kafka ssl configuration map with: SSL_KEYSTORE_PASSWORD_CONFIG = ssl.keystore.password -kopSslKeystorePassword= - -# Kafka ssl configuration map with: SSL_KEY_PASSWORD_CONFIG = ssl.key.password -kopSslKeyPassword= - -# Kafka ssl configuration map with: SSL_TRUSTSTORE_TYPE_CONFIG = ssl.truststore.type -kopSslTruststoreType=JKS - -# Kafka ssl configuration map with: SSL_TRUSTSTORE_LOCATION_CONFIG = ssl.truststore.location -kopSslTruststoreLocation= - -# Kafka ssl configuration map with: SSL_TRUSTSTORE_PASSWORD_CONFIG = ssl.truststore.password -kopSslTruststorePassword= - -# Kafka ssl configuration map with: SSL_KEYMANAGER_ALGORITHM_CONFIG = ssl.keymanager.algorithm -kopSslKeymanagerAlgorithm=SunX509 - -# Kafka ssl configuration map with: SSL_TRUSTMANAGER_ALGORITHM_CONFIG = ssl.trustmanager.algorithm -kopSslTrustmanagerAlgorithm=SunX509 - -# Kafka ssl configuration map with: -# SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG = ssl.secure.random.implementation -kopSslSecureRandomImplementation= - -# supported SASL mechanisms exposed by broker -saslAllowedMechanisms= - -### --- Changed for KoP --- ### - -# Enable the deletion of inactive topics -brokerDeleteInactiveTopicsEnabled=false - -allowAutoTopicCreation=true - -allowAutoTopicCreationType=partitioned - -### --- General broker settings --- ### - -# Max Rate(in 1 seconds) of Message allowed to publish for a broker if broker publish rate limiting enabled -# (Disable message rate limit with value 0) -brokerPublisherThrottlingMaxMessageRate=0 - -# Max Rate(in 1 seconds) of Byte allowed to publish for a broker if broker publish rate limiting enabled. -# (Disable byte rate limit with value 0) -brokerPublisherThrottlingMaxByteRate=0 - -# Zookeeper quorum connection string -zookeeperServers= - -# Configuration Store connection string -configurationStoreServers= - -brokerServicePort=6650 - -# Port to use to server HTTP request -webServicePort=8080 - -# Hostname or IP address the service binds on, default is 0.0.0.0. -bindAddress=0.0.0.0 - -# Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used. -advertisedAddress=127.0.0.1 - -# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors() -numIOThreads= - -# Number of threads to use for HTTP requests processing. Default is set to 2 * Runtime.getRuntime().availableProcessors() -numHttpServerThreads= - -# Name of the cluster to which this broker belongs to -clusterName=kafka - -# Enable cluster's failure-domain which can distribute brokers into logical region -failureDomainsEnabled=false - -# Zookeeper session timeout in milliseconds -zooKeeperSessionTimeoutMillis=30000 - -# ZooKeeper operation timeout in seconds -zooKeeperOperationTimeoutSeconds=30 - -# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed -brokerShutdownTimeoutMs=60000 - -# Enable backlog quota check. Enforces action on topic when the quota is reached -backlogQuotaCheckEnabled=true - -# How often to check for topics that have reached the quota -backlogQuotaCheckIntervalInSeconds=60 - -# Default per-topic backlog quota limit -backlogQuotaDefaultLimitGB=10 - -# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) -ttlDurationDefaultInSeconds=0 - -# How often to check for inactive topics -brokerDeleteInactiveTopicsFrequencySeconds=60 - -# How frequently to proactively check and purge expired messages -messageExpiryCheckIntervalInMinutes=5 - -# How long to delay rewinding cursor and dispatching messages when active consumer is changed -activeConsumerFailoverDelayTimeMillis=1000 - -# How long to delete inactive subscriptions from last consuming -# When it is 0, inactive subscriptions are not deleted automatically -subscriptionExpirationTimeMinutes=0 - -# Enable subscription message redelivery tracker to send redelivery count to consumer (default is enabled) -subscriptionRedeliveryTrackerEnabled=true - -# How frequently to proactively check and purge expired subscription -subscriptionExpiryCheckIntervalInMinutes=5 - -# Set the default behavior for message deduplication in the broker -# This can be overridden per-namespace. If enabled, broker will reject -# messages that were already stored in the topic -brokerDeduplicationEnabled=false - -# Maximum number of producer information that it's going to be -# persisted for deduplication purposes -brokerDeduplicationMaxNumberOfProducers=10000 - -# Number of entries after which a dedup info snapshot is taken. -# A bigger interval will lead to less snapshots being taken though it would -# increase the topic recovery time, when the entries published after the -# snapshot need to be replayed -brokerDeduplicationEntriesInterval=1000 - -# Time of inactivity after which the broker will discard the deduplication information -# relative to a disconnected producer. Default is 6 hours. -brokerDeduplicationProducerInactivityTimeoutMinutes=360 - -# When a namespace is created without specifying the number of bundle, this -# value will be used as the default -defaultNumberOfNamespaceBundles=4 - -# Enable check for minimum allowed client library version -clientLibraryVersionCheckEnabled=false - -# Path for the file used to determine the rotation status for the broker when responding -# to service discovery health checks -statusFilePath=/usr/local/apache/htdocs - -# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending -# messages to consumer once, this limit reaches until consumer starts acknowledging messages back -# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction -maxUnackedMessagesPerConsumer=50000 - -# Max number of unacknowledged messages allowed per shared subscription. Broker will stop dispatching messages to -# all consumers of the subscription once this limit reaches until consumer starts acknowledging messages back and -# unack count reaches to limit/2. Using a value of 0, is disabling unackedMessage-limit -# check and dispatcher can dispatch messages without any restriction -maxUnackedMessagesPerSubscription=200000 - -# Max number of unacknowledged messages allowed per broker. Once this limit reaches, broker will stop dispatching -# messages to all shared subscription which has higher number of unack messages until subscriptions start -# acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling -# unackedMessage-limit check and broker doesn't block dispatchers -maxUnackedMessagesPerBroker=0 - -# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages -# than this percentage limit and subscription will not receive any new messages until that subscription acks back -# limit/2 messages -maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 - -# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default -# message dispatch-throttling -dispatchThrottlingRatePerTopicInMsg=0 - -# Default bytes per second dispatch throttling-limit for every topic. Using a value of 0, is disabling -# default message-byte dispatch-throttling -dispatchThrottlingRatePerTopicInByte=0 - -# By default we enable dispatch-throttling for both caught up consumers as well as consumers who have -# backlog. -dispatchThrottlingOnNonBacklogConsumerEnabled=true - -# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic -maxConcurrentLookupRequest=50000 - -# Max number of concurrent topic loading request broker allows to control number of zk-operations -maxConcurrentTopicLoadRequest=5000 - -# Max concurrent non-persistent message can be processed per connection -maxConcurrentNonPersistentMessagePerConnection=1000 - -# Number of worker threads to serve non-persistent topic -numWorkerThreadsForNonPersistentTopic=8 - -# Enable broker to load persistent topics -enablePersistentTopics=true - -# Enable broker to load non-persistent topics -enableNonPersistentTopics=true - -# Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers -# until the number of connected producers decrease. -# Using a value of 0, is disabling maxProducersPerTopic-limit check. -maxProducersPerTopic=0 - -# Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers -# until the number of connected consumers decrease. -# Using a value of 0, is disabling maxConsumersPerTopic-limit check. -maxConsumersPerTopic=0 - -# Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers -# until the number of connected consumers decrease. -# Using a value of 0, is disabling maxConsumersPerSubscription-limit check. -maxConsumersPerSubscription=0 - -### --- Authentication --- ### -# Role names that are treated as "proxy roles". If the broker sees a request with -#role as proxyRoles - it will demand to see a valid original principal. -proxyRoles= - -# If this flag is set then the broker authenticates the original Auth data -# else it just accepts the originalPrincipal and authorizes it (if required). -authenticateOriginalAuthData=false - -# Enable authentication -authenticationEnabled=false - -# Autentication provider name list, which is comma separated list of class names -authenticationProviders= - -# Enforce authorization -authorizationEnabled=false - -# Authorization provider fully qualified class-name -authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider - -# Allow wildcard matching in authorization -# (wildcard matching only applicable if wildcard-char: -# * presents at first or last position eg: *.pulsar.service, pulsar.service.*) -authorizationAllowWildcardsMatching=false - -# Role names that are treated as "super-user", meaning they will be able to do all admin -# operations and publish/consume from all topics -superUserRoles= - -# Authentication settings of the broker itself. Used when the broker connects to other brokers, -# either in same or other clusters -brokerClientAuthenticationPlugin= -brokerClientAuthenticationParameters= - -# Supported Athenz provider domain names(comma separated) for authentication -athenzDomainNames= - -# When this parameter is not empty, unauthenticated users perform as anonymousUserRole -anonymousUserRole= - -# The token "claim" that will be interpreted as the authentication "role" or "principal" by AuthenticationProviderToken (defaults to "sub" if blank) -tokenAuthClaim= - -### --- BookKeeper Client --- ### - -# Authentication plugin to use when connecting to bookies -bookkeeperClientAuthenticationPlugin= - -# BookKeeper auth plugin implementatation specifics parameters name and values -bookkeeperClientAuthenticationParametersName= -bookkeeperClientAuthenticationParameters= - -# Timeout for BK add / read operations -bookkeeperClientTimeoutInSeconds=30 - -# Speculative reads are initiated if a read request doesn't complete within a certain time -# Using a value of 0, is disabling the speculative reads -bookkeeperClientSpeculativeReadTimeoutInMillis=0 - -# Enable bookies health check. Bookies that have more than the configured number of failure within -# the interval will be quarantined for some time. During this period, new ledgers won't be created -# on these bookies -bookkeeperClientHealthCheckEnabled=true -bookkeeperClientHealthCheckIntervalSeconds=60 -bookkeeperClientHealthCheckErrorThresholdPerInterval=5 -bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800 - -# Enable rack-aware bookie selection policy. BK will chose bookies from different racks when -# forming a new bookie ensemble -bookkeeperClientRackawarePolicyEnabled=true - -# Enable region-aware bookie selection policy. BK will chose bookies from -# different regions and racks when forming a new bookie ensemble. -# If enabled, the value of bookkeeperClientRackawarePolicyEnabled is ignored -bookkeeperClientRegionawarePolicyEnabled=false - -# Enable/disable reordering read sequence on reading entries. -bookkeeperClientReorderReadSequenceEnabled=false - -# Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie -# outside the specified groups will not be used by the broker -bookkeeperClientIsolationGroups= - -# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't -# have enough bookie available. -bookkeeperClientSecondaryIsolationGroups= - -# Minimum bookies that should be available as part of bookkeeperClientIsolationGroups -# else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list. -bookkeeperClientMinAvailableBookiesInIsolationGroups= - -### --- Managed Ledger --- ### - -# Number of bookies to use when creating a ledger -managedLedgerDefaultEnsembleSize=1 - -# Number of copies to store for each message -managedLedgerDefaultWriteQuorum=1 - -# Number of guaranteed copies (acks to wait before write is complete) -managedLedgerDefaultAckQuorum=1 - -# Default type of checksum to use when writing to BookKeeper. Default is "CRC32" -# Other possible options are "CRC32C" (which is faster), "MAC" or "DUMMY" (no checksum). -managedLedgerDigestType=CRC32 - -# Number of threads to be used for managed ledger tasks dispatching -managedLedgerNumWorkerThreads=4 - -# Number of threads to be used for managed ledger scheduled tasks -managedLedgerNumSchedulerThreads=4 - -# Amount of memory to use for caching data payload in managed ledger. This memory -# is allocated from JVM direct memory and it's shared across all the topics -# running in the same broker. By default, uses 1/5th of available direct memory -managedLedgerCacheSizeMB= - -# Whether we should make a copy of the entry payloads when inserting in cache -managedLedgerCacheCopyEntries=false - -# Threshold to which bring down the cache level when eviction is triggered -managedLedgerCacheEvictionWatermark=0.9 - -# Configure the cache eviction frequency for the managed ledger cache (evictions/sec) -managedLedgerCacheEvictionFrequency=100.0 - -# All entries that have stayed in cache for more than the configured time, will be evicted -managedLedgerCacheEvictionTimeThresholdMillis=1000 - -# Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged' -# and thus should be set as inactive. -managedLedgerCursorBackloggedThreshold=1000 - -# Rate limit the amount of writes generated by consumer acking the messages -managedLedgerDefaultMarkDeleteRateLimit=0.1 - -# Max number of entries to append to a ledger before triggering a rollover -# A ledger rollover is triggered on these conditions -# * Either the max rollover time has been reached -# * or max entries have been written to the ledged and at least min-time -# has passed -managedLedgerMaxEntriesPerLedger=50000 - -# Minimum time between ledger rollover for a topic -managedLedgerMinLedgerRolloverTimeMinutes=10 - -# Maximum time before forcing a ledger rollover for a topic -managedLedgerMaxLedgerRolloverTimeMinutes=240 - -# Max number of entries to append to a cursor ledger -managedLedgerCursorMaxEntriesPerLedger=50000 - -# Max time before triggering a rollover on a cursor ledger -managedLedgerCursorRolloverTimeInSeconds=14400 - -# Max number of "acknowledgment holes" that are going to be persistently stored. -# When acknowledging out of order, a consumer will leave holes that are supposed -# to be quickly filled by acking all the messages. The information of which -# messages are acknowledged is persisted by compressing in "ranges" of messages -# that were acknowledged. After the max number of ranges is reached, the information -# will only be tracked in memory and messages will be redelivered in case of -# crashes. -managedLedgerMaxUnackedRangesToPersist=10000 - -# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher -# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into -# zookeeper. -managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 - -# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets -# corrupted at bookkeeper and managed-cursor is stuck at that ledger. -autoSkipNonRecoverableData=false - -# operation timeout while updating managed-ledger metadata. -managedLedgerMetadataOperationsTimeoutSeconds=60 - -# Read entries timeout when broker tries to read messages from bookkeeper. -managedLedgerReadEntryTimeoutSeconds=0 - -# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it). -managedLedgerAddEntryTimeoutSeconds=0 - -# Use Open Range-Set to cache unacked messages -managedLedgerUnackedRangesOpenCacheSetEnabled=true - -### --- Load balancer --- ### - -loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager - -# Enable load balancer -loadBalancerEnabled=false - -# Percentage of change to trigger load report update -loadBalancerReportUpdateThresholdPercentage=10 - -# maximum interval to update load report -loadBalancerReportUpdateMaxIntervalMinutes=15 - -# Frequency of report to collect -loadBalancerHostUsageCheckIntervalMinutes=1 - -# Load shedding interval. Broker periodically checks whether some traffic should be offload from -# some over-loaded broker to other under-loaded brokers -loadBalancerSheddingIntervalMinutes=1 - -# Prevent the same topics to be shed and moved to other broker more that once within this timeframe -loadBalancerSheddingGracePeriodMinutes=30 - -# Usage threshold to allocate max number of topics to broker -loadBalancerBrokerMaxTopics=50000 - -# Interval to flush dynamic resource quota to ZooKeeper -loadBalancerResourceQuotaUpdateIntervalMinutes=15 - -# enable/disable namespace bundle auto split -loadBalancerAutoBundleSplitEnabled=true - -# enable/disable automatic unloading of split bundles -loadBalancerAutoUnloadSplitBundlesEnabled=true - -# maximum topics in a bundle, otherwise bundle split will be triggered -loadBalancerNamespaceBundleMaxTopics=1000 - -# maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered -loadBalancerNamespaceBundleMaxSessions=1000 - -# maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered -loadBalancerNamespaceBundleMaxMsgRate=30000 - -# maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered -loadBalancerNamespaceBundleMaxBandwidthMbytes=100 - -# maximum number of bundles in a namespace -loadBalancerNamespaceMaximumBundles=128 - -### --- Replication --- ### - -# Enable replication metrics -replicationMetricsEnabled=true - -# Max number of connections to open for each broker in a remote cluster -# More connections host-to-host lead to better throughput over high-latency -# links. -replicationConnectionsPerBroker=16 - -# Replicator producer queue size -replicationProducerQueueSize=1000 - -# Default message retention time -defaultRetentionTimeInMinutes=60 - -# Default retention size -defaultRetentionSizeInMB=100 - -# How often to check whether the connections are still alive -keepAliveIntervalSeconds=30 - -# How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected) -brokerServicePurgeInactiveFrequencyInSeconds=60 - -### --- WebSocket --- ### - -# Enable the WebSocket API service in broker -webSocketServiceEnabled=true - -# Number of IO threads in Pulsar Client used in WebSocket proxy -webSocketNumIoThreads=8 - -# Number of connections per Broker in Pulsar Client used in WebSocket proxy -webSocketConnectionsPerBroker=8 - -# Time in milliseconds that idle WebSocket session times out -webSocketSessionIdleTimeoutMillis=300000 - -### --- Metrics --- ### - -# Enable topic level metrics -exposeTopicLevelMetricsInPrometheus=true - -# Classname of Pluggable JVM GC metrics logger that can log GC specific metrics -# jvmGCMetricsLoggerClassName= - -### --- Broker Web Stats --- ### - -# Enable topic level metrics -exposePublisherStats=true - -### --- Deprecated config variables --- ### - -# Deprecated. Use configurationStoreServers -globalZookeeperServers= - - -### --- BookKeeper Configuration --- ##### - -ledgerStorageClass=org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage - -# Size of Write Cache. Memory is allocated from JVM direct memory. -# Write cache is used to buffer entries before flushing into the entry log -# For good performance, it should be big enough to hold a substantial amount -# of entries in the flush interval -# By default it will be allocated to 1/4th of the available direct memory -dbStorage_writeCacheMaxSizeMb= - -# Size of Read cache. Memory is allocated from JVM direct memory. -# This read cache is pre-filled doing read-ahead whenever a cache miss happens -# By default it will be allocated to 1/4th of the available direct memory -dbStorage_readAheadCacheMaxSizeMb= - -# How many entries to pre-fill in cache after a read cache miss -dbStorage_readAheadCacheBatchSize=1000 - -flushInterval=60000 - -## RocksDB specific configurations -## DbLedgerStorage uses RocksDB to store the indexes from -## (ledgerId, entryId) -> (entryLog, offset) - -# Size of RocksDB block-cache. For best performance, this cache -# should be big enough to hold a significant portion of the index -# database which can reach ~2GB in some cases -# Default is to use 10% of the direct memory size -dbStorage_rocksDB_blockCacheSize= - -# Other RocksDB specific tunables -dbStorage_rocksDB_writeBufferSizeMB=4 -dbStorage_rocksDB_sstSizeInMB=4 -dbStorage_rocksDB_blockSize=4096 -dbStorage_rocksDB_bloomFilterBitsPerKey=10 -dbStorage_rocksDB_numLevels=-1 -dbStorage_rocksDB_numFilesInLevel0=4 -dbStorage_rocksDB_maxSizeInLevel1MB=256 - -# Maximum latency to impose on a journal write to achieve grouping -journalMaxGroupWaitMSec=1 - -# Should the data be fsynced on journal before acknowledgment. -journalSyncData=false - - -# For each ledger dir, maximum disk space which can be used. -# Default is 0.95f. i.e. 95% of disk can be used at most after which nothing will -# be written to that partition. If all ledger dir partions are full, then bookie -# will turn to readonly mode if 'readOnlyModeEnabled=true' is set, else it will -# shutdown. -# Valid values should be in between 0 and 1 (exclusive). -diskUsageThreshold=0.99 - -# The disk free space low water mark threshold. -# Disk is considered full when usage threshold is exceeded. -# Disk returns back to non-full state when usage is below low water mark threshold. -# This prevents it from going back and forth between these states frequently -# when concurrent writes and compaction are happening. This also prevent bookie from -# switching frequently between read-only and read-writes states in the same cases. -diskUsageWarnThreshold=0.99 - -# Whether the bookie allowed to use a loopback interface as its primary -# interface(i.e. the interface it uses to establish its identity)? -# By default, loopback interfaces are not allowed as the primary -# interface. -# Using a loopback interface as the primary interface usually indicates -# a configuration error. For example, its fairly common in some VPS setups -# to not configure a hostname, or to have the hostname resolve to -# 127.0.0.1. If this is the case, then all bookies in the cluster will -# establish their identities as 127.0.0.1:3181, and only one will be able -# to join the cluster. For VPSs configured like this, you should explicitly -# set the listening interface. -allowLoopback=true - -# How long the interval to trigger next garbage collection, in milliseconds -# Since garbage collection is running in background, too frequent gc -# will heart performance. It is better to give a higher number of gc -# interval if there is enough disk capacity. -gcWaitTime=300000 diff --git a/kafka-impl/conf/log4j2.yaml b/kafka-impl/conf/log4j2.yaml deleted file mode 100755 index 2b49fa3685..0000000000 --- a/kafka-impl/conf/log4j2.yaml +++ /dev/null @@ -1,53 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -Configuration: - status: INFO - monitorInterval: 30 - name: kop - - Appenders: - Console: - name: Console - target: SYSTEM_OUT - PatternLayout: - Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" - - Loggers: - Root: - level: info - additivity: false - AppenderRef: - - ref: Console - level: info - - Logger: - - name: io.streamnative.pulsar.handlers.kop - level: info - additivity: false - AppenderRef: - - ref: Console - - - name: verbose - level: info - additivity: false - AppenderRef: - - ref: Console - - - name: org.apache.pulsar.broker.service - level: info - additivity: false - AppenderRef: - - ref: Console diff --git a/kafka-impl/conf/zookeeper.conf b/kafka-impl/conf/zookeeper.conf deleted file mode 100644 index c6fcc4bf62..0000000000 --- a/kafka-impl/conf/zookeeper.conf +++ /dev/null @@ -1,51 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# The number of milliseconds of each tick -tickTime=2000 -# The number of ticks that the initial -# synchronization phase can take -initLimit=10 -# The number of ticks that can pass between -# sending a request and getting an acknowledgement -syncLimit=5 -# the directory where the snapshot is stored. -dataDir=data/zookeeper -# the port at which the clients will connect -clientPort=2181 - -# the port at which the admin will listen -admin.enableServer=true -admin.serverPort=9990 - -# the maximum number of client connections. -# increase this if you need to handle more clients -#maxClientCnxns=60 -# -# Be sure to read the maintenance section of the -# administrator guide before turning on autopurge. -# -# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance -# -# The number of snapshots to retain in dataDir -autopurge.snapRetainCount=3 -# Purge task interval in hours -# Set to "0" to disable auto purge feature -autopurge.purgeInterval=1 - -# Requires updates to be synced to media of the transaction log before finishing -# processing the update. If this option is set to 'no', ZooKeeper will not require -# updates to be synced to the media. -# WARNING: it's not recommended to run a production ZK cluster with forceSync disabled. -forceSync=yes diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaBrokerStarter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaBrokerStarter.java deleted file mode 100644 index 737398a90d..0000000000 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaBrokerStarter.java +++ /dev/null @@ -1,295 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.streamnative.pulsar.handlers.kop; - - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; -import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.commons.lang3.StringUtils.isNotBlank; -import static org.apache.pulsar.common.configuration.PulsarConfigurationLoader.isComplete; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.google.common.annotations.VisibleForTesting; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils; -import java.io.File; -import java.io.FileInputStream; -import java.net.MalformedURLException; -import java.nio.file.Paths; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Date; -import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.common.util.ReflectionUtils; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.discover.BookieServiceInfo; -import org.apache.bookkeeper.proto.BookieServer; -import org.apache.bookkeeper.replication.AutoRecoveryMain; -import org.apache.bookkeeper.stats.StatsProvider; -import org.apache.bookkeeper.util.DirectMemoryUtils; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; -import org.apache.pulsar.common.protocol.Commands; -import org.slf4j.bridge.SLF4JBridgeHandler; - -/** - * A starter to start Kafka-on-Pulsar broker. - */ -@Slf4j -public class KafkaBrokerStarter { - - private static class BrokerStarter { - private final KafkaServiceConfiguration brokerConfig; - private final KafkaService kafkaService; - private final BookieServer bookieServer; - private final AutoRecoveryMain autoRecoveryMain; - private final StatsProvider bookieStatsProvider; - private final ServerConfiguration bookieConfig; - - BrokerStarter(String[] args) throws Exception { - StarterArguments starterArguments = new StarterArguments(); - JCommander jcommander = new JCommander(starterArguments); - jcommander.setProgramName("KafkaBrokerStarter"); - - // parse args by JCommander - jcommander.parse(args); - if (starterArguments.help) { - jcommander.usage(); - Runtime.getRuntime().exit(-1); - } - - // init broker config - if (isBlank(starterArguments.brokerConfigFile)) { - jcommander.usage(); - throw new IllegalArgumentException("Need to specify a configuration file for kafkaBroker"); - } else { - brokerConfig = loadConfig(starterArguments.brokerConfigFile); - } - - int maxFrameSize = brokerConfig.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING; - if (maxFrameSize >= DirectMemoryUtils.maxDirectMemory()) { - throw new IllegalArgumentException("Max message size need smaller than jvm directMemory"); - } - - - if (brokerConfig.getAdvertisedAddress() != null - && !brokerConfig.getListeners().contains(brokerConfig.getAdvertisedAddress())) { - String err = "Error config: advertisedAddress - " + brokerConfig.getAdvertisedAddress() - + " and listeners - " + brokerConfig.getListeners() + " not match."; - log.error(err); - throw new IllegalArgumentException(err); - } - - // init kafka broker service - kafkaService = new KafkaService(brokerConfig); - - // if no argument to run bookie in cmd line, read from pulsar config - if (!argsContains(args, "-rb") && !argsContains(args, "--run-bookie")) { - checkState(!starterArguments.runBookie, - "runBookie should be false if has no argument specified"); - starterArguments.runBookie = brokerConfig.isEnableRunBookieTogether(); - } - if (!argsContains(args, "-ra") && !argsContains(args, "--run-bookie-autorecovery")) { - checkState(!starterArguments.runBookieAutoRecovery, - "runBookieAutoRecovery should be false if has no argument specified"); - starterArguments.runBookieAutoRecovery = brokerConfig.isEnableRunBookieAutoRecoveryTogether(); - } - - if ((starterArguments.runBookie || starterArguments.runBookieAutoRecovery) - && isBlank(starterArguments.bookieConfigFile)) { - jcommander.usage(); - throw new IllegalArgumentException("No configuration file for Bookie"); - } - - // init stats provider - if (starterArguments.runBookie || starterArguments.runBookieAutoRecovery) { - checkState(isNotBlank(starterArguments.bookieConfigFile), - "No configuration file for Bookie"); - bookieConfig = readBookieConfFile(starterArguments.bookieConfigFile); - Class statsProviderClass = bookieConfig.getStatsProviderClass(); - bookieStatsProvider = ReflectionUtils.newInstance(statsProviderClass); - } else { - bookieConfig = null; - bookieStatsProvider = null; - } - - // init bookie server - if (starterArguments.runBookie) { - checkNotNull(bookieConfig, "No ServerConfiguration for Bookie"); - checkNotNull(bookieStatsProvider, "No Stats Provider for Bookie"); - bookieServer = new BookieServer( - bookieConfig, bookieStatsProvider.getStatsLogger(""), BookieServiceInfo.NO_INFO); - } else { - bookieServer = null; - } - - // init bookie AutorecoveryMain - if (starterArguments.runBookieAutoRecovery) { - checkNotNull(bookieConfig, "No ServerConfiguration for Bookie Autorecovery"); - autoRecoveryMain = new AutoRecoveryMain(bookieConfig); - } else { - autoRecoveryMain = null; - } - } - - public void start() throws Exception { - if (bookieStatsProvider != null) { - bookieStatsProvider.start(bookieConfig); - log.info("started bookieStatsProvider."); - } - if (bookieServer != null) { - bookieServer.start(); - log.info("started bookieServer."); - } - if (autoRecoveryMain != null) { - autoRecoveryMain.start(); - log.info("started bookie autoRecoveryMain."); - } - - kafkaService.start(); - log.info("KafkaService started."); - } - - public void join() throws InterruptedException { - kafkaService.waitUntilClosed(); - - if (bookieServer != null) { - bookieServer.join(); - } - if (autoRecoveryMain != null) { - autoRecoveryMain.join(); - } - } - - @SuppressFBWarnings("RU_INVOKE_RUN") - public void shutdown() { - kafkaService.getShutdownService().run(); - log.info("Shut down kafkaBroker service successfully."); - - if (bookieStatsProvider != null) { - bookieStatsProvider.stop(); - log.info("Shut down bookieStatsProvider successfully."); - } - if (bookieServer != null) { - bookieServer.shutdown(); - log.info("Shut down bookieServer successfully."); - } - if (autoRecoveryMain != null) { - autoRecoveryMain.shutdown(); - log.info("Shut down autoRecoveryMain successfully."); - } - } - } - - public static void main(String[] args) throws Exception { - DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS"); - Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> { - System.out.println(String.format("%s [%s] error Uncaught exception in thread %s: %s", - dateFormat.format(new Date()), - thread.getContextClassLoader(), - thread.getName(), - exception.getMessage())); - }); - - BrokerStarter starter = new BrokerStarter(args); - Runtime.getRuntime().addShutdownHook( - new Thread(() -> { - starter.shutdown(); - }) - ); - - PulsarByteBufAllocator.registerOOMListener(oomException -> { - log.error("-- Shutting down - Received OOM exception: {}", oomException.getMessage(), oomException); - starter.shutdown(); - }); - - try { - starter.start(); - } catch (Exception e) { - log.error("Failed to start pulsar service.", e); - Runtime.getRuntime().halt(1); - } - - starter.join(); - } - - @VisibleForTesting - private static class StarterArguments { - @Parameter(names = { - "-c", "--kop-conf" - }, description = "Configuration file for Kafka on Pulsar Broker") - private String brokerConfigFile = - Paths.get("").toAbsolutePath().normalize().toString() + "/conf/kop.conf"; - - @Parameter(names = { - "-rb", "--run-bookie" - }, description = "Run Bookie together with Broker") - private boolean runBookie = false; - - @Parameter(names = { - "-ra", "--run-bookie-autorecovery" - }, description = "Run Bookie Autorecovery together with kafkaBroker") - private boolean runBookieAutoRecovery = false; - - @Parameter(names = { - "-bc", "--bookie-conf" - }, description = "Configuration file for Bookie") - private String bookieConfigFile = - Paths.get("").toAbsolutePath().normalize().toString() + "/conf/bookkeeper.conf"; - - @Parameter(names = { - "-h", "--help" - }, description = "Show this help message") - private boolean help = false; - } - - private static boolean argsContains(String[] args, String arg) { - return Arrays.asList(args).contains(arg); - } - - private static KafkaServiceConfiguration loadConfig(String configFile) throws Exception { - SLF4JBridgeHandler.removeHandlersForRootLogger(); - SLF4JBridgeHandler.install(); - KafkaServiceConfiguration config = ConfigurationUtils.create( - new FileInputStream(configFile), - KafkaServiceConfiguration.class); - // it validates provided configuration is completed - isComplete(config); - return config; - } - - private static ServerConfiguration readBookieConfFile(String bookieConfigFile) throws IllegalArgumentException { - ServerConfiguration bookieConf = new ServerConfiguration(); - try { - bookieConf.loadConf(new File(bookieConfigFile).toURI().toURL()); - bookieConf.validate(); - log.info("Using bookie configuration file {}", bookieConfigFile); - } catch (MalformedURLException e) { - log.error("Could not open configuration file: {}", bookieConfigFile, e); - throw new IllegalArgumentException("Could not open configuration file"); - } catch (ConfigurationException e) { - log.error("Malformed configuration file: {}", bookieConfigFile, e); - throw new IllegalArgumentException("Malformed configuration file"); - } - - if (bookieConf.getMaxPendingReadRequestPerThread() < bookieConf.getRereplicationEntryBatchSize()) { - throw new IllegalArgumentException( - "rereplicationEntryBatchSize should be smaller than " + "maxPendingReadRequestPerThread"); - } - return bookieConf; - } -} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaService.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaService.java deleted file mode 100644 index e0926440ea..0000000000 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaService.java +++ /dev/null @@ -1,244 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.streamnative.pulsar.handlers.kop; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; -import java.net.InetSocketAddress; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Supplier; -import lombok.Getter; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.common.util.OrderedExecutor; -import org.apache.commons.lang3.builder.ReflectionToStringBuilder; -import org.apache.pulsar.ZookeeperSessionExpiredHandlers; -import org.apache.pulsar.broker.BookKeeperClientFactory; -import org.apache.pulsar.broker.ManagedLedgerClientFactory; -import org.apache.pulsar.broker.PulsarServerException; -import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.loadbalance.LoadManager; -import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.schema.SchemaRegistryService; -import org.apache.pulsar.broker.stats.MetricsGenerator; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; -import org.apache.pulsar.broker.web.WebService; -import org.apache.pulsar.common.configuration.VipStatus; -import org.apache.pulsar.common.policies.data.OffloadPolicies; -import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService; -import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler; -import org.eclipse.jetty.servlet.ServletHolder; - -/** - * Main class for Kafka-on-Pulsar broker service. - */ -@Slf4j -public class KafkaService extends PulsarService { - - @Getter - private final KafkaServiceConfiguration kafkaConfig; - @Getter - @Setter - private GroupCoordinator groupCoordinator; - - public KafkaService(KafkaServiceConfiguration config) { - super(config); - kafkaConfig = config; - } - - @Override - public Map getProtocolDataToAdvertise() { - return ImmutableMap.builder() - .put("kafka", kafkaConfig.getListeners()) - .build(); - } - - @Override - public void start() throws PulsarServerException { - ReentrantLock lock = getMutex(); - - lock.lock(); - - try { - // TODO: add Kafka on Pulsar Version support -- https://github.com/streamnative/kop/issues/3 - log.info("Starting Pulsar Broker service powered by Pulsar version: '{}'", - (getBrokerVersion() != null ? getBrokerVersion() : "unknown")); - - if (getState() != State.Init) { - throw new PulsarServerException("Cannot start the service once it was stopped"); - } - - if (kafkaConfig.getListeners() == null || kafkaConfig.getListeners().isEmpty()) { - throw new IllegalArgumentException("Kafka Listeners should be provided through brokerConf.listeners"); - } - - if (kafkaConfig.getAdvertisedAddress() != null - && !kafkaConfig.getListeners().contains(kafkaConfig.getAdvertisedAddress())) { - String err = "Error config: advertisedAddress - " + kafkaConfig.getAdvertisedAddress() - + " and listeners - " + kafkaConfig.getListeners() + " not match."; - log.error(err); - throw new IllegalArgumentException(err); - } - - setOrderedExecutor(OrderedExecutor.newBuilder().numThreads(8).name("pulsar-ordered") - .build()); - - // init KafkaProtocolHandler - KafkaProtocolHandler kafkaProtocolHandler = new KafkaProtocolHandler(); - kafkaProtocolHandler.initialize(kafkaConfig); - - // Now we are ready to start services - setLocalZooKeeperConnectionProvider(new LocalZooKeeperConnectionService(getZooKeeperClientFactory(), - kafkaConfig.getZookeeperServers(), kafkaConfig.getZooKeeperSessionTimeoutMillis())); - // TODO: check shutdown policy and reconnect policy - ZookeeperSessionExpiredHandler expiredHandler = - ZookeeperSessionExpiredHandlers.shutdownWhenZookeeperSessionExpired(getShutdownService()); - getLocalZooKeeperConnectionProvider().start(expiredHandler); - - // Initialize and start service to access configuration repository. - startZkCacheService(); - - BookKeeperClientFactory bkClientFactory = newBookKeeperClientFactory(); - setBkClientFactory(bkClientFactory); - final ManagedLedgerClientFactory managedLedgerClientFactory = new ManagedLedgerClientFactory(); - managedLedgerClientFactory.initialize(kafkaConfig, getZkClient(), bkClientFactory); - setManagedLedgerClientFactory(managedLedgerClientFactory); - setBrokerService(new BrokerService(this)); - - // Start load management service (even if load balancing is disabled) - getLoadManager().set(LoadManager.create(this)); - - setDefaultOffloader(createManagedLedgerOffloader(OffloadPolicies.create(kafkaConfig.getProperties()))); - - getBrokerService().start(); - - WebService webService = new WebService(this); - setWebService(webService); - Map attributeMap = Maps.newHashMap(); - attributeMap.put(WebService.ATTRIBUTE_PULSAR_NAME, this); - Map vipAttributeMap = Maps.newHashMap(); - vipAttributeMap.put(VipStatus.ATTRIBUTE_STATUS_FILE_PATH, kafkaConfig.getStatusFilePath()); - vipAttributeMap.put(VipStatus.ATTRIBUTE_IS_READY_PROBE, new Supplier() { - @Override - public Boolean get() { - // Ensure the VIP status is only visible when the broker is fully initialized - return getState() == State.Started; - } - }); - webService.addRestResources("/", - VipStatus.class.getPackage().getName(), false, vipAttributeMap); - webService.addRestResources("/", - "org.apache.pulsar.broker.web", false, attributeMap); - webService.addRestResources("/admin", - "org.apache.pulsar.broker.admin.v1", true, attributeMap); - webService.addRestResources("/admin/v2", - "org.apache.pulsar.broker.admin.v2", true, attributeMap); - webService.addRestResources("/admin/v3", - "org.apache.pulsar.broker.admin.v3", true, attributeMap); - webService.addRestResources("/lookup", - "org.apache.pulsar.broker.lookup", true, attributeMap); - - webService.addServlet("/metrics", - new ServletHolder( - new PrometheusMetricsServlet( - this, - false, - kafkaConfig.isExposeTopicLevelMetricsInPrometheus(), - kafkaConfig.isExposeConsumerLevelMetricsInPrometheus())), - false, attributeMap); - - if (log.isDebugEnabled()) { - log.debug("Attempting to add static directory"); - } - webService.addStaticResources("/static", "/static"); - - // TODO: Configure SchemaStorage later, currently set it to null just for DefaultSchemaRegistryService - setSchemaRegistryService(SchemaRegistryService.create(null, new HashSet<>())); - - webService.start(); - - // Refresh addresses, since the port might have been dynamically assigned - setWebServiceAddress(webAddress(kafkaConfig)); - setWebServiceAddressTls(webAddressTls(kafkaConfig)); - setBrokerServiceUrl(kafkaConfig.getBrokerServicePort().isPresent() - ? brokerUrl(advertisedAddress(kafkaConfig), getBrokerListenPort().get()) - : null); - setBrokerServiceUrlTls(brokerUrlTls(kafkaConfig)); - - // needs load management service - this.startNamespaceService(); - - // Start the leader election service - startLeaderElectionService(); - - // Register heartbeat and bootstrap namespaces. - getNsService().registerBootstrapNamespaces(); - - setMetricsGenerator(new MetricsGenerator(this)); - - // By starting the Load manager service, the broker will also become visible - // to the rest of the broker by creating the registration z-node. This needs - // to be done only when the broker is fully operative. - startLoadManagementService(); - - acquireSLANamespace(); - - final String bootstrapMessage = "bootstrap service " - + (kafkaConfig.getWebServicePort().isPresent() - ? "port = " + kafkaConfig.getWebServicePort().get() : "") - + (kafkaConfig.getWebServicePortTls().isPresent() - ? "tls-port = " + kafkaConfig.getWebServicePortTls() : "") - + ("kafka listener url= " + kafkaConfig.getListeners()); - - // start Kafka protocol handler. - // put after load manager for the use of existing broker service to create internal topics. - kafkaProtocolHandler.start(this.getBrokerService()); - - Map> channelInitializer = - kafkaProtocolHandler.newChannelInitializers(); - Map>> protocolHandlers = ImmutableMap - .>>builder() - .put("kafka", channelInitializer) - .build(); - getBrokerService().startProtocolHandlers(protocolHandlers); - - this.groupCoordinator = kafkaProtocolHandler.getGroupCoordinator(); - - setState(State.Started); - - log.info("Kafka messaging service is ready, {}, cluster={}, configs={}", - bootstrapMessage, kafkaConfig.getClusterName(), - ReflectionToStringBuilder.toString(kafkaConfig)); - } catch (Exception e) { - log.error(e.getMessage(), e); - throw new PulsarServerException(e); - } finally { - lock.unlock(); - } - } - - @Override - public void close() throws PulsarServerException { - if (groupCoordinator != null) { - this.groupCoordinator.shutdown(); - } - super.close(); - } - -} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaStandalone.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaStandalone.java deleted file mode 100644 index 1752edf15e..0000000000 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaStandalone.java +++ /dev/null @@ -1,297 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.streamnative.pulsar.handlers.kop; - -import com.beust.jcommander.Parameter; -import com.google.common.collect.Sets; -import java.io.File; -import java.net.URL; -import java.util.Set; -import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.RetentionPolicies; -import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; - -/** - * A standalone instance includes all the components for running Kafka-on-Pulsar. - */ -@Slf4j -public class KafkaStandalone implements AutoCloseable { - KafkaService kafkaBroker; - PulsarAdmin admin; - LocalBookkeeperEnsemble bkEnsemble; - KafkaServiceConfiguration config; - - public void setKafkaBroker(KafkaService kafkaBroker) { - this.kafkaBroker = kafkaBroker; - } - - public void setAdmin(PulsarAdmin admin) { - this.admin = admin; - } - - public void setBkEnsemble(LocalBookkeeperEnsemble bkEnsemble) { - this.bkEnsemble = bkEnsemble; - } - - public void setBkPort(int bkPort) { - this.bkPort = bkPort; - } - - public void setBkDir(String bkDir) { - this.bkDir = bkDir; - } - - public void setAdvertisedAddress(String advertisedAddress) { - this.advertisedAddress = advertisedAddress; - } - - public void setConfig(KafkaServiceConfiguration config) { - this.config = config; - } - - public void setConfigFile(String configFile) { - this.configFile = configFile; - } - - public void setWipeData(boolean wipeData) { - this.wipeData = wipeData; - } - - public void setNumOfBk(int numOfBk) { - this.numOfBk = numOfBk; - } - - public void setZkPort(int zkPort) { - this.zkPort = zkPort; - } - - public void setZkDir(String zkDir) { - this.zkDir = zkDir; - } - - public void setNoBroker(boolean noBroker) { - this.noBroker = noBroker; - } - - public void setOnlyBroker(boolean onlyBroker) { - this.onlyBroker = onlyBroker; - } - - public void setNoStreamStorage(boolean noStreamStorage) { - this.noStreamStorage = noStreamStorage; - } - - public void setStreamStoragePort(int streamStoragePort) { - this.streamStoragePort = streamStoragePort; - } - - public void setHelp(boolean help) { - this.help = help; - } - - public ServiceConfiguration getConfig() { - return config; - } - - public String getConfigFile() { - return configFile; - } - - public boolean isWipeData() { - return wipeData; - } - - public int getNumOfBk() { - return numOfBk; - } - - public int getZkPort() { - return zkPort; - } - - public int getBkPort() { - return bkPort; - } - - public String getZkDir() { - return zkDir; - } - - public String getBkDir() { - return bkDir; - } - - public boolean isNoBroker() { - return noBroker; - } - - public boolean isOnlyBroker() { - return onlyBroker; - } - - public boolean isNoStreamStorage() { - return noStreamStorage; - } - - public int getStreamStoragePort() { - return streamStoragePort; - } - - public String getAdvertisedAddress() { - return advertisedAddress; - } - - public boolean isHelp() { - return help; - } - - @Parameter(names = { "-c", "--config" }, description = "Configuration file path", required = true) - private String configFile; - - @Parameter(names = { "--wipe-data" }, description = "Clean up previous ZK/BK data") - private boolean wipeData = false; - - @Parameter(names = { "--num-bookies" }, description = "Number of local Bookies") - private int numOfBk = 1; - - @Parameter(names = { "--zookeeper-port" }, description = "Local zookeeper's port") - private int zkPort = 2181; - - @Parameter(names = { "--bookkeeper-port" }, description = "Local bookies base port") - private int bkPort = 3181; - - @Parameter(names = { "--zookeeper-dir" }, description = "Local zooKeeper's data directory") - private String zkDir = "data/standalone/zookeeper"; - - @Parameter(names = { "--bookkeeper-dir" }, description = "Local bookies base data directory") - private String bkDir = "data/standalone/bookkeeper"; - - @Parameter(names = { "--no-kafkaBroker" }, description = "Only start ZK and BK services, no kafkaBroker") - private boolean noBroker = false; - - @Parameter(names = { "--only-kafkaBroker" }, description = "Only start Pulsar kafkaBroker service (no ZK, BK)") - private boolean onlyBroker = false; - - @Parameter(names = {"-nss", "--no-stream-storage"}, description = "Disable stream storage") - private boolean noStreamStorage = true; - - @Parameter(names = { "--stream-storage-port" }, description = "Local bookies stream storage port") - private int streamStoragePort = 4181; - - @Parameter(names = { "-a", "--advertised-address" }, description = "Standalone kafkaBroker advertised address") - private String advertisedAddress = null; - - @Parameter(names = { "-h", "--help" }, description = "Show this help message") - private boolean help = false; - - public void start() throws Exception { - - if (config == null) { - throw new IllegalArgumentException("Null configuration is provided"); - } - - if (config.getAdvertisedAddress() != null && !config.getListeners().contains(config.getAdvertisedAddress())) { - String err = "Error config: advertisedAddress - " + config.getAdvertisedAddress() + " and listeners - " - + config.getListeners() + " not match."; - log.error(err); - throw new IllegalArgumentException(err); - } - - log.info("--- setup KafkaStandaloneStarter ---"); - - if (!this.isOnlyBroker()) { - ServerConfiguration bkServerConf = new ServerConfiguration(); - bkServerConf.loadConf(new File(configFile).toURI().toURL()); - - // Start LocalBookKeeper - bkEnsemble = new LocalBookkeeperEnsemble( - this.getNumOfBk(), this.getZkPort(), this.getBkPort(), this.getStreamStoragePort(), this.getZkDir(), - this.getBkDir(), this.isWipeData(), "127.0.0.1"); - bkEnsemble.startStandalone(bkServerConf, !this.isNoStreamStorage()); - } - - if (this.isNoBroker()) { - return; - } - - // Start Broker - kafkaBroker = new KafkaService(config); - kafkaBroker.start(); - - URL webServiceUrl = new URL( - String.format("http://%s:%d", config.getAdvertisedAddress(), config.getWebServicePort().get())); - final String brokerServiceUrl = String.format("pulsar://%s:%d", config.getAdvertisedAddress(), - config.getBrokerServicePort().get()); - - admin = PulsarAdmin.builder().serviceHttpUrl(webServiceUrl.toString()).authentication( - config.getBrokerClientAuthenticationPlugin(), config.getBrokerClientAuthenticationParameters()).build(); - - final String cluster = config.getClusterName(); - - createDefaultNameSpace(webServiceUrl, brokerServiceUrl, cluster); - - log.info("--- setup completed ---"); - } - - private void createDefaultNameSpace(URL webServiceUrl, String brokerServiceUrl, String cluster) { - // Create a public tenant and default namespace - final String publicTenant = TopicName.PUBLIC_TENANT; - final String defaultNamespace = TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE; - try { - ClusterData clusterData = new ClusterData(webServiceUrl.toString(), null /* serviceUrlTls */, - brokerServiceUrl, null /* brokerServiceUrlTls */); - if (!admin.clusters().getClusters().contains(cluster)) { - admin.clusters().createCluster(cluster, clusterData); - } else { - admin.clusters().updateCluster(cluster, clusterData); - } - - if (!admin.tenants().getTenants().contains(publicTenant)) { - admin.tenants().createTenant(publicTenant, - new TenantInfo(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster))); - } - if (!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) { - Set clusters = Sets.newHashSet(config.getClusterName()); - admin.namespaces().createNamespace(defaultNamespace, clusters); - admin.namespaces().setNamespaceReplicationClusters(defaultNamespace, clusters); - admin.namespaces().setRetention(defaultNamespace, - new RetentionPolicies(20, 100)); - } - } catch (PulsarAdminException e) { - log.info("error while create default namespace: {}", e.getMessage()); - } - } - - @Override - public void close() { - try { - if (kafkaBroker != null) { - kafkaBroker.close(); - } - - if (bkEnsemble != null) { - bkEnsemble.stop(); - } - } catch (Exception e) { - log.error("Shutdown failed: {}", e.getMessage()); - } - } -} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaStandaloneStarter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaStandaloneStarter.java deleted file mode 100644 index dc411f895d..0000000000 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaStandaloneStarter.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.streamnative.pulsar.handlers.kop; - - -import static org.apache.commons.lang3.StringUtils.isBlank; - -import com.beust.jcommander.JCommander; -import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils; -import java.io.FileInputStream; -import java.util.Arrays; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.ServiceConfigurationUtils; - -/** - * Starter to start kafka-on-pulsar broker. - */ -@Slf4j -public class KafkaStandaloneStarter extends KafkaStandalone { - - public KafkaStandaloneStarter(String[] args) throws Exception { - - JCommander jcommander = new JCommander(); - try { - jcommander.addObject(this); - jcommander.parse(args); - if (this.isHelp() || isBlank(this.getConfigFile())) { - jcommander.usage(); - return; - } - - if (this.isNoBroker() && this.isOnlyBroker()) { - log.error("Only one option is allowed between '--no-broker' and '--only-broker'"); - jcommander.usage(); - return; - } - } catch (Exception e) { - jcommander.usage(); - log.error(e.getMessage()); - return; - } - - this.config = ConfigurationUtils.create( - new FileInputStream(this.getConfigFile()), - KafkaServiceConfiguration.class); - - String zkServers = "127.0.0.1"; - - if (this.getAdvertisedAddress() != null) { - // Use advertised address from command line - config.setAdvertisedAddress(this.getAdvertisedAddress()); - zkServers = this.getAdvertisedAddress(); - } else if (isBlank(config.getAdvertisedAddress())) { - // Use advertised address as local hostname - config.setAdvertisedAddress(ServiceConfigurationUtils.unsafeLocalhostResolve()); - } else { - // Use advertised address from config file - } - - // Set ZK server's host to localhost - // Priority: args > conf > default - if (argsContains(args, "--zookeeper-port")) { - config.setZookeeperServers(zkServers + ":" + this.getZkPort()); - } else { - if (config.getZookeeperServers() != null) { - this.setZkPort(Integer.parseInt(config.getZookeeperServers().split(":")[1])); - } - config.setZookeeperServers(zkServers + ":" + this.getZkPort()); - } - - if (config.getConfigurationStoreServers() == null) { - config.setConfigurationStoreServers(zkServers + ":" + this.getZkPort()); - } - - config.setRunningStandalone(true); - - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - try { - if (kafkaBroker != null) { - kafkaBroker.close(); - } - - if (bkEnsemble != null) { - bkEnsemble.stop(); - } - } catch (Exception e) { - log.error("Shutdown failed: {}", e.getMessage()); - } - } - }); - } - - private static boolean argsContains(String[] args, String arg) { - return Arrays.asList(args).contains(arg); - } - - public static void main(String args[]) throws Exception { - // Start standalone - KafkaStandaloneStarter standalone = new KafkaStandaloneStarter(args); - try { - standalone.start(); - } catch (Throwable th) { - log.error("Failed to start pulsar service.", th); - Runtime.getRuntime().exit(1); - } - - } -}