From d0960ba7b89ae9b9b86fc4110f5fd11affff981c Mon Sep 17 00:00:00 2001 From: weishaolun Date: Thu, 13 Jul 2023 10:34:52 +0800 Subject: [PATCH] Feature: support dispatch extra data Extending the query dispatch mechanism, allowing extra data to be dispatched. To dispatch extra data, you just need to register an extra dispatch with a unique name and a pair of functions for generating and parsing the data to be dispatched. --- src/backend/cdb/dispatcher/Makefile | 1 + src/backend/cdb/dispatcher/cdbdisp_extra.c | 170 +++++++++++++++++++++ src/backend/cdb/dispatcher/cdbdisp_query.c | 14 ++ src/backend/tcop/postgres.c | 3 + src/include/cdb/cdbdisp_extra.h | 15 ++ 5 files changed, 203 insertions(+) create mode 100644 src/backend/cdb/dispatcher/cdbdisp_extra.c create mode 100644 src/include/cdb/cdbdisp_extra.h diff --git a/src/backend/cdb/dispatcher/Makefile b/src/backend/cdb/dispatcher/Makefile index e8ac7582898..31d9718ee58 100644 --- a/src/backend/cdb/dispatcher/Makefile +++ b/src/backend/cdb/dispatcher/Makefile @@ -12,4 +12,5 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS += -I$(libpq_srcdir) -I$(top_srcdir)/src/port -I$(top_srcdir)/src/backend/utils/misc OBJS = cdbconn.o cdbdisp.o cdbdisp_async.o cdbdispatchresult.o cdbdisp_dtx.o cdbdisp_query.o cdbgang.o cdbgang_async.o cdbpq.o +OBJS += cdbdisp_extra.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/cdb/dispatcher/cdbdisp_extra.c b/src/backend/cdb/dispatcher/cdbdisp_extra.c new file mode 100644 index 00000000000..8a032222a29 --- /dev/null +++ b/src/backend/cdb/dispatcher/cdbdisp_extra.c @@ -0,0 +1,170 @@ +#include "postgres.h" + +#include "cdb/cdbdisp_extra.h" +#include "libpq/pqformat.h" +#include "utils/hsearch.h" + + +static HTAB *ExtraDispTable = NULL; + +typedef struct ExtraDispEntry +{ + char extraDispName[EXTRADISPNAME_MAX_LEN]; + PackFunc packFunc; + UnpackFunc unpackFunc; +} ExtraDispEntry; + +void +RegisterExtraDispatch(const char *extraDispName, PackFunc packFunc, UnpackFunc unpackFunc) +{ + ExtraDispEntry *entry; + bool found; + + if (ExtraDispTable == NULL) + { + HASHCTL ctl; + + ctl.keysize = EXTRADISPNAME_MAX_LEN; + ctl.entrysize = sizeof(ExtraDispEntry); + + ExtraDispTable = hash_create("extra dispatch info", 8, &ctl, + HASH_ELEM | HASH_STRINGS); + } + + if (strlen(extraDispName) >= EXTRADISPNAME_MAX_LEN) + elog(ERROR, "extra dispatch name is too long"); + + entry = (ExtraDispEntry *) hash_search(ExtraDispTable, + extraDispName, + HASH_ENTER, &found); + if (found) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("extra dispatch name \"%s\" already exists", + extraDispName))); + + entry->packFunc = packFunc; + entry->unpackFunc = unpackFunc; +} + +/* Return packaged messages. each message has the same format: + * ("%d%s\0%s", totalLen, name, payload). totalLen is the message + * length not including itself. name is the name of this message, + * a following '\0' marks the end. payload is the main body of the + * message. + */ +char * +PackExtraMsgs(int *len) +{ + HASH_SEQ_STATUS status; + ExtraDispEntry *hentry; + char **payloads; + int *lengths; + int payloadLen; + char **names; + int nameLen; + char *total; + int totalLen; + char *pos; + int tmp; + int n; + int i; + + if (!ExtraDispTable) + { + *len = 0; + return NULL; + } + + n = hash_get_num_entries(ExtraDispTable); + payloads = (char **) palloc(n * sizeof(char *)); + lengths = (int *) palloc(n * sizeof(int)); + names = (char **) palloc(n * sizeof(char *)); + + i = 0; + totalLen = 0; + hash_seq_init(&status, ExtraDispTable); + while ((hentry = (ExtraDispEntry *) hash_seq_search(&status)) != NULL) + { + payloads[i] = (*(hentry->packFunc))(lengths + i); + names[i] = hentry->extraDispName; + totalLen += sizeof(int) + strlen(names[i]) + 1 + *(lengths + i); + i++; + } + Assert(i = n); + + total = palloc(totalLen); + pos = total; + + for(i=0; i < n; i++) + { + payloadLen = *(lengths + i); + nameLen = strlen(names[i]); + + /* lenth */ + tmp = htonl(payloadLen + nameLen + 1); + memcpy(pos, &tmp, sizeof(tmp)); + pos += sizeof(tmp); + + /* name */ + memcpy(pos, names[i], nameLen + 1); + pos += nameLen + 1; + + /* payload */ + memcpy(pos, payloads[i], payloadLen); + pos += payloadLen; + + pfree(payloads[i]); + } + + Assert(pos - total == totalLen); + + pfree(names); + pfree(payloads); + pfree(lengths); + + *len = totalLen; + return total; +} + +void +UnPackExtraMsgs(StringInfo inputMsgs) +{ + ExtraDispEntry *entry; + const char *name; + const char *payload; + int payloadLen; + int totalLen; + bool found; + int n; + int i; + + if (!ExtraDispTable) + return; + + n = hash_get_num_entries(ExtraDispTable); + i = n; + + while (inputMsgs->cursor < inputMsgs->len) + { + totalLen = pq_getmsgint(inputMsgs, 4); + name = pq_getmsgstring(inputMsgs); + payloadLen = totalLen - strlen(name) - 1; + payload = pq_getmsgbytes(inputMsgs, payloadLen); + + entry = (ExtraDispEntry *) hash_search(ExtraDispTable, + name, + HASH_FIND, &found); + if (!found) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("extra dispatch %s not found", name))); + + (*(entry->unpackFunc))(payload, payloadLen); + i--; + } + if (i != 0) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("extra dispatch count mismatch, registered %d, get %d", n, i))); +} diff --git a/src/backend/cdb/dispatcher/cdbdisp_query.c b/src/backend/cdb/dispatcher/cdbdisp_query.c index 9bbfd10dd2d..f5762c8f4df 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_query.c +++ b/src/backend/cdb/dispatcher/cdbdisp_query.c @@ -42,6 +42,7 @@ #include "mb/pg_wchar.h" #include "cdb/cdbdisp.h" +#include "cdb/cdbdisp_extra.h" #include "cdb/cdbdisp_query.h" #include "cdb/cdbdisp_dtx.h" /* for qdSerializeDtxContextInfo() */ #include "cdb/cdbdispatchresult.h" @@ -871,6 +872,9 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms, int total_query_len; char *shared_query, *pos; + char *extraMsgs; + int extraLen; + MemoryContext oldContext; /* @@ -917,6 +921,9 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms, sizeof(resgroupInfo.len) + resgroupInfo.len; + extraMsgs = PackExtraMsgs(&extraLen); + total_query_len += extraLen; + shared_query = palloc(total_query_len); pos = shared_query; @@ -1010,6 +1017,13 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms, pos += resgroupInfo.len; } + if (extraLen > 0) + { + memcpy(pos, extraMsgs, extraLen); + pos += extraLen; + pfree(extraMsgs); + } + len = pos - shared_query - 1; /* diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index c545c42298c..cf689e91367 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -97,6 +97,7 @@ #include "cdb/cdbsrlz.h" #include "cdb/cdbtm.h" #include "cdb/cdbdtxcontextinfo.h" +#include "cdb/cdbdisp_extra.h" #include "cdb/cdbdisp_query.h" #include "cdb/cdbdispatchresult.h" #include "cdb/cdbendpoint.h" @@ -5680,6 +5681,8 @@ PostgresMain(int argc, char *argv[], if (resgroupInfoLen > 0) resgroupInfoBuf = pq_getmsgbytes(&input_message, resgroupInfoLen); + UnPackExtraMsgs(&input_message); + pq_getmsgend(&input_message); elog((Debug_print_full_dtm ? LOG : DEBUG5), "MPP dispatched stmt from QD: %s.",query_string); diff --git a/src/include/cdb/cdbdisp_extra.h b/src/include/cdb/cdbdisp_extra.h new file mode 100644 index 00000000000..b6bac03b3cd --- /dev/null +++ b/src/include/cdb/cdbdisp_extra.h @@ -0,0 +1,15 @@ +#ifndef CDBDISP_EXTRA_H +#define CDBDISP_EXTRA_H + +#include "lib/stringinfo.h" + +#define EXTRADISPNAME_MAX_LEN 64 + +typedef char *(*PackFunc) (int *len); +typedef void (*UnpackFunc) (const char *msg, int len); + +extern void RegisterExtraDispatch(const char *extraDispName, PackFunc packFunc, UnpackFunc unpackFunc); +extern char *PackExtraMsgs(int *len); +extern void UnPackExtraMsgs(StringInfo strInfo); + +#endif /* CDBDISP_EXTRA_H */