Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# kcat v1.9.0

* Added new formatter to decode consumer offset

# kcat v1.8.0

* Added new mock cluster mode
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ Decode key as 32-bit signed integer and value as 16-bit signed integer followed

$ kcat -b mybroker -t mytopic -s key='i$' -s value='hB s'

Decode consumer offsets (uses the special "offset" format):

$ kcat -b mybroker -t mytopic -s key=offset -s value=offset -f '%k -> %s'


*Hint: see `kcat -h` for all available deserializer options.*

Expand Down
232 changes: 215 additions & 17 deletions format.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "kcat.h"
#include "rdendian.h"
#include <time.h>

static void fmt_add (fmt_type_t type, const char *str, int len) {
if (conf.fmt_cnt == KC_FMT_MAX_SIZE)
Expand Down Expand Up @@ -203,7 +204,7 @@ static int print_headers (FILE *fp, const rd_kafka_headers_t *hdrs) {
*/
void pack_check (const char *what, const char *fmt) {
const char *f = fmt;
static const char *valid = " <>bBhHiIqQcs$";
static const char *valid = " :,<>bBhHiIqQcsSvCt$";

if (!*fmt)
KC_FATAL("%s pack-format must not be empty", what);
Expand All @@ -217,6 +218,23 @@ void pack_check (const char *what, const char *fmt) {
}
}

#define expect(sz) do { \
if ((sz) > remaining) { \
snprintf(errstr, errstr_size, \
"%s truncated, expected %d bytes " \
"to unpack %c but only %d bytes remaining", \
what, (int)(sz), (int)(*f), \
(int)remaining); \
return -1; \
} \
} while (0)

#define fup_copy(dst,sz) do { \
expect(sz); \
memcpy(dst, b, sz); \
b += (sz); \
} while (0)


/**
* @brief Unpack (deserialize) the data at \p buf using the
Expand All @@ -238,22 +256,9 @@ static int unpack (FILE *fp, const char *what, const char *fmt,
big_endian,
little_endian
} endian = big_endian;

#define endian_swap(val,to_little,to_big) \
(endian == big_endian ? to_big(val) : to_little(val))

#define fup_copy(dst,sz) do { \
if ((sz) > remaining) { \
snprintf(errstr, errstr_size, \
"%s truncated, expected %d bytes " \
"to unpack %c but only %d bytes remaining", \
what, (int)(sz), (int)(*f), \
(int)remaining); \
return -1; \
} \
memcpy(dst, b, sz); \
b += (sz); \
} while (0)

while (*f) {
size_t remaining = (int)(end - b);
Expand All @@ -263,6 +268,12 @@ static int unpack (FILE *fp, const char *what, const char *fmt,
case ' ':
fprintf(fp, " ");
break;
case ':':
fprintf(fp, ":");
break;
case ',':
fprintf(fp, ",");
break;
case '<':
endian = little_endian;
break;
Expand All @@ -287,15 +298,15 @@ static int unpack (FILE *fp, const char *what, const char *fmt,
{
int16_t v;
fup_copy(&v, sizeof(v));
v = endian_swap(v, be16toh, be16toh);
v = endian_swap(v, be16toh, htobe16);
fprintf(fp, "%hd", v);
}
break;
case 'H':
{
uint16_t v;
fup_copy(&v, sizeof(v));
v = endian_swap(v, be16toh, be16toh);
v = endian_swap(v, be16toh, htobe16);
fprintf(fp, "%hu", v);
}
break;
Expand Down Expand Up @@ -341,6 +352,66 @@ static int unpack (FILE *fp, const char *what, const char *fmt,
b += remaining;
}
break;
case 'S':
{
int16_t v;
fup_copy(&v, sizeof(v));
v = be16toh(v);
if (v < 0)
fprintf(fp, "(null)");
else {
fprintf(fp, "%.*s", v, b);
expect((unsigned) v);
b += v;
}
}
break;
case 'v':
{
uint8_t v;
uint64_t value = 0;
int i = 0;
do {
fup_copy(&v, sizeof(v));
value |= ((uint64_t) (v & 0x7f)) << i;
i += 7;
} while (v & 0x80);
fprintf(fp, "%"PRIu64, value);
}
break;
case 'C':
{
uint8_t v;
uint64_t value = 0;
int i = 0;
do {
fup_copy(&v, sizeof(v));
value |= ((uint64_t) (v & 0x7f)) << i;
i += 7;
} while (v & 0x80);
expect((unsigned) value);
fprintf(fp, "%.*s", (unsigned) value, b);
b += value;
}
break;
case 't':
{
char* buf;
int64_t v;
time_t timestamp;
fup_copy(&v, sizeof(v));
v = be64toh(v);
if (v == -1) {
fprintf(fp, "(none)");
} else {
timestamp = (time_t) (v / 1000);
buf = ctime(&timestamp);
/* this is totally vile but it works */
*(buf + strlen(buf) - 1) = '\0';
fprintf(fp, "%s", buf);
}
}
break;
case '$':
{
if (remaining > 0) {
Expand All @@ -366,10 +437,100 @@ static int unpack (FILE *fp, const char *what, const char *fmt,
return 0;

#undef endian_swap
#undef fup_copy
}

static int unpack_offset_key(FILE *fp,
const char *buf, size_t len,
char *errstr, size_t errstr_size) {
const char *b = buf;
const char *end = buf + len;
size_t remaining = (int)(end - b);
const char *what = "key";
const char *f = "h";

uint16_t version = 0;
int retval = 0;

fup_copy(&version, sizeof(version));
version = be16toh(version);
buf += 2;
len -= 2;
remaining = (int)(end - b);

if (version < 0 || version > 2) {
snprintf(errstr, errstr_size,
"Unknown key offset version V%d", (int) version);
return -1;
}
retval = version;
if (version < 2) {
fprintf(fp, "[offset] ");
if (unpack(fp, what, "S: S:i", buf, len, errstr, errstr_size) == -1)
retval = -1;
} else {
fprintf(fp, "[group] ");
if (unpack(fp, what, "S", buf, len, errstr, errstr_size) == -1)
retval = -1;
}
return retval;
}

static int unpack_offset_value(FILE *fp,
int offset_key_type,
const char *buf, size_t len,
char *errstr, size_t errstr_size) {
const char *b = buf;
const char *end = buf + len;
size_t remaining = (int)(end - b);
const char *what = "value";
const char *f = "h";

uint16_t version = 0;

fup_copy(&version, sizeof(version));
version = be16toh(version);
buf += 2;
len -= 2;
remaining = (int)(end - b);


if (version >= 0 && version <= 3)
fprintf(fp, "V%d: ", (int) version);
if (offset_key_type == 0 || offset_key_type == 1) {
switch(version) {
case 0:
return unpack(fp, what, "q S,t", buf, len, errstr, errstr_size);
case 1:
return unpack(fp, what, "q S,t,t", buf, len, errstr, errstr_size);
case 2:
return unpack(fp, what, "q S,t", buf, len, errstr, errstr_size);
case 3:
return unpack(fp, what, "q i,S,t", buf, len, errstr, errstr_size);
default:
snprintf(errstr, errstr_size,
"Unknown offset version V%d", (int) version);
return -1;
}
} else if (offset_key_type == 2) {
switch (version) {
case 0:
case 1:
return unpack(fp, what, "S i S S", buf, len, errstr, errstr_size);
case 2:
case 3:
return unpack(fp, what, "S i S S t", buf, len, errstr, errstr_size);
default:
/* cannot decode this yet */
return 0;
}
} else {
fprintf(fp, "(cannot decode)");
return 0;
}
}

#undef fup_copy
#undef expect


/**
Expand All @@ -381,12 +542,18 @@ static void fmt_msg_output_str (FILE *fp,
char errstr[256];

*errstr = '\0';
int value_reprocess = -1;
int continue_after = -1;
int offset_key_type = -1;

for (i = 0 ; i < conf.fmt_cnt ; i++) {
int r = 1;
uint32_t belen;
const char *what_failed = "";

if (i < continue_after && i != value_reprocess)
continue;

switch (conf.fmt[i].type)
{
case KC_FMT_OFFSET:
Expand Down Expand Up @@ -415,6 +582,18 @@ static void fmt_msg_output_str (FILE *fp,
#else
KC_FATAL("NOTREACHED");
#endif
} else if (conf.flags & CONF_F_FMT_OFFSET_KEY) {
offset_key_type = unpack_offset_key(fp,
rkmessage->key,
rkmessage->key_len,
errstr,
sizeof(errstr));
if (offset_key_type == -1)
goto fail;
else if (value_reprocess != -1) {
continue_after = i;
i = value_reprocess;
}
} else if (conf.pack[KC_MSG_FIELD_KEY]) {
if (unpack(fp,
"key",
Expand Down Expand Up @@ -463,6 +642,25 @@ static void fmt_msg_output_str (FILE *fp,
#else
KC_FATAL("NOTREACHED");
#endif
} else if (conf.flags & CONF_F_FMT_OFFSET_VALUE) {
if ((conf.flags & CONF_F_FMT_OFFSET_KEY)) {
if (offset_key_type == -1) {
value_reprocess = i;
break;
}
} else {
/* assume "regular" offset, will work 99% of the time */
offset_key_type = 1;
}

if (unpack_offset_value(fp,
offset_key_type,
rkmessage->payload,
rkmessage->len,
errstr,
sizeof(errstr)) ==
-1)
goto fail;
} else if (conf.pack[KC_MSG_FIELD_VALUE]) {
if (unpack(fp,
"value",
Expand Down
17 changes: 14 additions & 3 deletions kcat.c
Original file line number Diff line number Diff line change
Expand Up @@ -1446,10 +1446,15 @@ static void RD_NORETURN usage (const char *argv0, int exitcode,
" Q: unsigned 64-bit integer\n"
" c: ASCII character\n"
" s: remaining data is string\n"
" S: short count followed by string\n"
" v: VARINT\n"
" C: Compact string (VARINT count + string)\n"
" t: 64-bit timestamp from epoch\n"
" $: match end-of-input (no more bytes remaining or a parse error is raised).\n"
" Not including this token skips any\n"
" remaining data after the pack-str is\n"
" exhausted.\n"
" offset - Binary format for consumer offset\n"
#if ENABLE_AVRO
" avro - Avro-formatted with schema in Schema-Registry (requires -r)\n"
" E.g.: -s key=i -s value=avro - key is 32-bit integer, value is Avro.\n"
Expand Down Expand Up @@ -2203,13 +2208,13 @@ static void argparse (int argc, char **argv,
}

if (field == -1 || field == KC_MSG_FIELD_KEY) {
if (strcmp(t, "avro"))
if (strcmp(t, "avro") && strcmp(t, "offset"))
pack_check("key", t);
conf.pack[KC_MSG_FIELD_KEY] = t;
}

if (field == -1 || field == KC_MSG_FIELD_VALUE) {
if (strcmp(t, "avro"))
if (strcmp(t, "avro") && strcmp(t, "offset"))
pack_check("value", t);
conf.pack[KC_MSG_FIELD_VALUE] = t;
}
Expand Down Expand Up @@ -2430,7 +2435,13 @@ static void argparse (int argc, char **argv,
else if (i == KC_MSG_FIELD_KEY)
conf.flags |= CONF_F_FMT_AVRO_KEY;
continue;
}
} else if (conf.pack[i] && !strcmp(conf.pack[i], "offset")) {
if (i == KC_MSG_FIELD_KEY)
conf.flags |= CONF_F_FMT_OFFSET_KEY;
else if (i == KC_MSG_FIELD_VALUE)
conf.flags |= CONF_F_FMT_OFFSET_VALUE;
continue;
}
}


Expand Down
Loading