From f6c2ecf73ff915247980e185fe4cd652c5633a7f Mon Sep 17 00:00:00 2001 From: husen Date: Thu, 10 Aug 2023 19:55:04 +0800 Subject: [PATCH 1/9] Support separation of catalog and compute. core changes: 1. add macro serverless, configure with --enable-serverless 2. add hooks to get control in transaction/dispatch management 3. add transaction processing framework 4. add session state dispatch framework --- configure | 34 +++++ configure.ac | 10 ++ src/Makefile.global.in | 1 + src/backend/Makefile | 3 + src/backend/access/transam/slru.c | 47 ++++++ src/backend/access/transam/varsup.c | 8 + src/backend/access/transam/xact.c | 43 ++++++ src/backend/access/transam/xlog.c | 54 +++++++ src/backend/cdb/cdbtm.c | 8 + src/backend/cdb/dispatcher/Makefile | 2 +- src/backend/cdb/dispatcher/cdbdisp_extra.c | 170 +++++++++++++++++++++ src/backend/cdb/dispatcher/cdbdisp_query.c | 39 ++++- src/backend/executor/execMain.c | 5 + src/backend/postmaster/autovacuum.c | 8 + src/backend/storage/buffer/bufmgr.c | 18 +++ src/backend/storage/ipc/procarray.c | 16 +- src/backend/tcop/postgres.c | 21 +++ src/backend/tcop/utility.c | 13 +- src/backend/utils/cache/relmapper.c | 7 + src/backend/utils/init/postinit.c | 7 + src/include/access/slru.h | 8 + src/include/access/transam.h | 4 + src/include/access/xact.h | 43 ++++++ src/include/access/xlog.h | 26 +++- src/include/cdb/cdbdisp_extra.h | 15 ++ src/include/cdb/cdbdisp_query.h | 12 ++ src/include/executor/executor.h | 3 + src/include/pg_config.h.in | 3 + src/include/postgres.h | 5 + src/include/postmaster/autovacuum.h | 4 + src/include/storage/procarray.h | 4 + src/include/tcop/utility.h | 4 + src/include/utils/relmapper.h | 4 + src/include/utils/snapmgr.h | 5 + 34 files changed, 643 insertions(+), 11 deletions(-) create mode 100644 src/backend/cdb/dispatcher/cdbdisp_extra.c create mode 100644 src/include/cdb/cdbdisp_extra.h diff --git a/configure b/configure index 19b49cb9975..8c063b4d265 100755 --- a/configure +++ b/configure @@ -761,6 +761,7 @@ enable_gpcloud enable_mapreduce enable_orca enable_catalog_ext +enable_serverless autodepend PKG_CONFIG_LIBDIR PKG_CONFIG_PATH @@ -894,6 +895,7 @@ enable_depend enable_cassert enable_orca enable_catalog_ext +enable_serverless enable_mapreduce enable_gpcloud enable_external_fts @@ -1604,6 +1606,7 @@ Optional Features: --enable-cassert enable assertion checks (for debugging) --disable-orca disable ORCA optimizer --enable-catalog-ext enable CloudberryDB catalog extension + --enable-serverless use Cloudberry serverless architecture --enable-mapreduce enable CloudberryDB Mapreduce support --enable-gpcloud enable gpcloud support --enable-external-fts enable external fts support @@ -8340,6 +8343,37 @@ fi { $as_echo "$as_me:${as_lineno-$LINENO}: result: checking whether to build with CloudberryDB catalog extension... $enable_catalog_ext" >&5 $as_echo "checking whether to build with CloudberryDB catalog extension ... $enable_catalog_ext" >&6; } +# +# Enable serverless architecture +# + + +# Check whether --enable-serverless was given. +if test "${enable_serverless+set}" = set; then : + enableval=$enable_serverless; + case $enableval in + yes) + +$as_echo "#define SERVERLESS 1" >>confdefs.h + + ;; + no) + : + ;; + *) + as_fn_error $? "no argument expected for --enable-serverless option" "$LINENO" 5 + ;; + esac + +else + enable_serverless=no + +fi + + +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: checking whether to use serverless architecture of Cloudberry ... $enable_serverless" >&5 +$as_echo "checking whether to use serverless architecture of Cloudberry ... $enable_serverless" >&6; } + # # --enable-mapreduce enables GPMapreduce support diff --git a/configure.ac b/configure.ac index dbe87362bf6..142f4ab63ca 100644 --- a/configure.ac +++ b/configure.ac @@ -846,6 +846,16 @@ PGAC_ARG_BOOL(enable, catalog-ext, no, [enable Cloudberry catalog extension], AC_MSG_RESULT([checking whether to build with catalog extension... $enable_catalog_ext]) AC_SUBST(enable_catalog_ext) +# +# +# --enable-serverless uses serverless architecture of Cloudberry +# +PGAC_ARG_BOOL(enable, serverless, no, [use serverless architecture of Cloudberry], + [AC_DEFINE([SERVERLESS], 1, + [Define to 1 to use serverless architecture of Cloudberry. (--enable-serverless)])]) +AC_MSG_RESULT([checking whether to use serverless architecture of Cloudberry... $enable_serverless]) +AC_SUBST(enable_serverless) + # # --enable-mapreduce enables GPMapreduce support # diff --git a/src/Makefile.global.in b/src/Makefile.global.in index 9e03d53d547..a6de4048d36 100644 --- a/src/Makefile.global.in +++ b/src/Makefile.global.in @@ -223,6 +223,7 @@ enable_strong_random = @enable_strong_random@ enable_largefile = @enable_largefile@ enable_orca = @enable_orca@ enable_catalog_ext = @enable_catalog_ext@ +enable_serverless = @enable_serverless@ enable_gpfdist = @enable_gpfdist@ enable_pxf = @enable_pxf@ enable_debug_extensions = @enable_debug_extensions@ diff --git a/src/backend/Makefile b/src/backend/Makefile index 4190d0d683c..ed0495d708d 100644 --- a/src/backend/Makefile +++ b/src/backend/Makefile @@ -32,6 +32,9 @@ ifeq ($(enable_catalog_ext),yes) SUBDIRS += catalog-extension LDFLAGS += -lprotobuf -lstdc++ endif +ifeq ($(enable_serverless),yes) +LDFLAGS += -lprotobuf -lstdc++ -ljansson +endif include $(srcdir)/common.mk diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c index a1950fd0944..284d998ad11 100644 --- a/src/backend/access/transam/slru.c +++ b/src/backend/access/transam/slru.c @@ -54,6 +54,7 @@ #include "access/slru.h" #include "access/transam.h" #include "access/xlog.h" +#include "cdb/cdbvars.h" #include "miscadmin.h" #include "pgstat.h" #include "storage/fd.h" @@ -133,6 +134,11 @@ typedef enum static SlruErrorCause slru_errcause; static int slru_errno; +/* + * Hooks for plugins to get control in SlruPhysicalReadPage/SlruPhysicalWritePage + */ +SlruPhysicalReadPage_hook_type SlruPhysicalReadPage_hook = NULL; +SlruPhysicalWritePage_hook_type SlruPhysicalWritePage_hook = NULL; static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno); static void SimpleLruWaitIO(SlruCtl ctl, int slotno); @@ -421,6 +427,17 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, /* Now we must recheck state from the top */ continue; } +#ifdef SERVERLESS + /* + * TODO: add hook/GUC instead? + * The page in buffer may be out of date, we need to check the buffer + * and refresh the buffer if the page has been modified. + */ + if (Gp_role == GP_ROLE_EXECUTE) + { + goto PageRead; + } +#endif /* Otherwise, it's ready to use */ SlruRecentlyUsed(shared, slotno); @@ -435,6 +452,10 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, (shared->page_status[slotno] == SLRU_PAGE_VALID && !shared->page_dirty[slotno])); +#ifdef SERVERLESS +PageRead: +#endif + /* Mark the slot read-busy */ shared->page_number[slotno] = pageno; shared->page_status[slotno] = SLRU_PAGE_READ_IN_PROGRESS; @@ -506,6 +527,18 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid) shared->page_status[slotno] != SLRU_PAGE_EMPTY && shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS) { +#ifdef SERVERLESS + /* + * TODO: add hook/GUC instead? + * The page in buffer may be out of date, we need to check the buffer + * and refresh the buffer if the page has been modified. + */ + if (Gp_role == GP_ROLE_EXECUTE) + { + break; + } +#endif + /* See comments for SlruRecentlyUsed macro */ SlruRecentlyUsed(shared, slotno); @@ -688,6 +721,13 @@ SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno) off_t offset = rpageno * BLCKSZ; char path[MAXPGPATH]; int fd; + bool result; + + if (SlruPhysicalReadPage_hook && + SlruPhysicalReadPage_hook(ctl, pageno, slotno, &result)) + { + return result; + } SlruFileName(ctl, path, segno); @@ -760,6 +800,7 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruWriteAll fdata) off_t offset = rpageno * BLCKSZ; char path[MAXPGPATH]; int fd = -1; + bool result; /* update the stats counter of written pages */ pgstat_count_slru_page_written(shared->slru_stats_idx); @@ -806,6 +847,12 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruWriteAll fdata) } } + if (SlruPhysicalWritePage_hook && + SlruPhysicalWritePage_hook(ctl, pageno, slotno, &result)) + { + return result; + } + /* * During a WriteAll, we may already have the desired file open. */ diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 90788509f8a..adc042c87db 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -43,6 +43,11 @@ int xid_warn_limit; NewSegRelfilenode_assign_hook_type NewSegRelfilenode_assign_hook = NULL; +/* + * Hook for plugins to get control in GetNewTransactionId. + */ +GetNewTransactionId_hook_type GetNewTransactionId_hook = NULL; + /* * Allocate the next FullTransactionId for a new transaction or * subtransaction. @@ -61,6 +66,9 @@ GetNewTransactionId(bool isSubXact) FullTransactionId full_xid; TransactionId xid; + if (GetNewTransactionId_hook) + return (*GetNewTransactionId_hook) (isSubXact); + /* * Workers synchronize transaction state at the beginning of each parallel * operation, so we can't account for new XIDs after that point. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 0df851f2fd0..19601b13c9d 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -298,6 +298,14 @@ static TransactionId unreportedXids[PGPROC_MAX_CACHED_SUBXIDS]; static TransactionState CurrentTransactionState = &TopTransactionStateData; +/* + * Hooks for plugins to get control in Transaction Management. + */ +TransactionParticipateEnd_hook_type TransactionParticipateEnd_hook = NULL; +NotifySubTransaction_hook_type NotifySubTransaction_hook = NULL; +XactLogCommitRecord_hook_type XactLogCommitRecord_hook = NULL; +XactLogAbortRecord_hook_type XactLogAbortRecord_hook = NULL; + /* * The subtransaction ID and command ID assignment counters are global * to a whole transaction, so we do not keep them in the state stack. @@ -902,6 +910,12 @@ GetCurrentCommandId(bool used) return currentCommandId; } +void +SetCurrentCommandId(CommandId cid) +{ + currentCommandId = cid; +} + /* * SetParallelStartTimestamps * @@ -2945,6 +2959,9 @@ CommitTransaction(void) if (notifyCommittedDtxTransactionIsNeeded()) notifyCommittedDtxTransaction(); + if (TransactionParticipateEnd_hook) + TransactionParticipateEnd_hook(true); + /* * Let others know about no transaction in progress by me. Note that this * must be done _before_ releasing locks we hold and _after_ @@ -3594,6 +3611,9 @@ AbortTransaction(void) */ rollbackDtxTransaction(); + if (TransactionParticipateEnd_hook) + TransactionParticipateEnd_hook(false); + /* * Let others know about no transaction in progress by me. Note that this * must be done _before_ releasing locks we hold and _after_ @@ -5183,6 +5203,9 @@ DefineSavepoint(const char *name) { TransactionState s = CurrentTransactionState; + if (NotifySubTransaction_hook) + NotifySubTransaction_hook(TXN_PROTOCOL_COMMAND_SUB_BEGIN); + /* * Workers synchronize transaction state at the beginning of each parallel * operation, so we can't account for new subtransactions after that @@ -5551,6 +5574,9 @@ BeginInternalSubTransaction(const char *name) } } + if (NotifySubTransaction_hook) + NotifySubTransaction_hook(TXN_PROTOCOL_COMMAND_SUB_BEGIN); + /* * Workers synchronize transaction state at the beginning of each parallel * operation, so we can't account for new subtransactions after that @@ -6044,6 +6070,9 @@ CommitSubTransaction(void) /* Must CCI to ensure commands of subtransaction are seen as done */ CommandCounterIncrement(); + if (NotifySubTransaction_hook) + NotifySubTransaction_hook(TXN_PROTOCOL_COMMAND_SUB_RELEASE); + /* * Prior to 8.4 we marked subcommit in clog at this point. We now only * perform that step, if required, as part of the atomic update of the @@ -6215,6 +6244,9 @@ AbortSubTransaction(void) s->parallelModeLevel = 0; } + if (NotifySubTransaction_hook) + NotifySubTransaction_hook(TXN_PROTOCOL_COMMAND_SUB_ROLLBACK); + /* * We can skip all this stuff if the subxact failed before creating a * ResourceOwner... @@ -6834,6 +6866,12 @@ XactLogCommitRecord(TimestampTz commit_time, Assert(CritSectionCount > 0); + if (XactLogCommitRecord_hook) + return (* XactLogCommitRecord_hook) (commit_time, tablespace_oid_to_delete_on_commit, + nsubxacts, subxacts, nrels, rels, nmsgs, msgs, + ndeldbs, deldbs, relcacheInval, xactflags, + twophase_xid, twophase_gid); + xl_xinfo.xinfo = 0; /* decide between a plain and 2pc commit */ @@ -7024,6 +7062,11 @@ XactLogAbortRecord(TimestampTz abort_time, Assert(CritSectionCount > 0); + if (XactLogAbortRecord_hook) + (*XactLogAbortRecord_hook) (abort_time, tablespace_oid_to_delete_on_abort, + nsubxacts, subxacts, nrels, rels, ndeldbs, deldbs, + xactflags, twophase_xid, twophase_gid); + xl_xinfo.xinfo = 0; /* decide between a plain and 2pc abort */ diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 4d331ed7a7c..d69b3fe577b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -101,6 +101,21 @@ extern uint32 bootstrap_data_checksum_version; extern int bootstrap_file_encryption_method; +/* + * Hook for plugins to get control in StartupXLOG. + */ +StartupXLOG_hook_type StartupXLOG_hook = NULL; + +/* + * Hook for plugins to get control in XLogFlush. + */ +XLogFlush_hook_type XLogFlush_hook = NULL; + +/* + * Hook for plugins to get control in CreateCheckPoint. + */ +CreateCheckPoint_hook_type CreateCheckPoint_hook = NULL; + /* Unsupported old recovery command file names (relative to $PGDATA) */ #define RECOVERY_COMMAND_FILE "recovery.conf" #define RECOVERY_COMMAND_DONE "recovery.done" @@ -2938,6 +2953,9 @@ XLogFlush(XLogRecPtr record) XLogRecPtr WriteRqstPtr; XLogwrtRqst WriteRqst; + if (XLogFlush_hook) + return (*XLogFlush_hook) (record); + /* * During REDO, we are reading not writing WAL. Therefore, instead of * trying to flush the WAL, we should update minRecoveryPoint instead. We @@ -3130,6 +3148,15 @@ XLogBackgroundFlush(void) TimestampTz now; int flushbytes; +#ifdef SERVERLESS + /* + * TODO: use GUC/hook instead of macro. + * + * Indeed, walwriter is not needed in serverless, we have no WAL in buffer. + */ + return true; +#endif + /* XLOG doesn't need flushing during recovery */ if (RecoveryInProgress()) return false; @@ -6755,6 +6782,9 @@ StartupXLOG(void) bool promoted = false; struct stat st; + if (StartupXLOG_hook) + return (*StartupXLOG_hook) (); + /* * We should have an aux process resource owner to use, and we should not * be in a transaction that's installed some other resowner. @@ -9409,6 +9439,12 @@ CreateCheckPoint(int flags) VirtualTransactionId *vxids; int nvxids; + if (CreateCheckPoint_hook) + { + (*CreateCheckPoint_hook) (flags); + return; + } + /* * An end-of-recovery checkpoint is really a shutdown checkpoint, just * issued at a different time. @@ -13957,3 +13993,21 @@ XLogRequestWalReceiverReply(void) { doRequestWalReceiverReply = true; } + +/* + * Return pointer to pg_control in shared memory; + */ +ControlFileData * +GetControlFile(void) +{ + return ControlFile; +} + +/* + * Return pointer to XLogCtlData in shared memory; + */ +XLogCtlData * +GetXLogCtl(void) +{ + return XLogCtl; +} diff --git a/src/backend/cdb/cdbtm.c b/src/backend/cdb/cdbtm.c index acd6168cc9a..f8128eded26 100644 --- a/src/backend/cdb/cdbtm.c +++ b/src/backend/cdb/cdbtm.c @@ -1608,6 +1608,14 @@ doQEDistributedExplicitBegin() static bool isDtxQueryDispatcher(void) { +#ifdef SERVERLESS + /* + * TODO: use GUC/hook instead of macro. + * + * Distributed transaction is not necessary in serverless. + */ + return false; +#endif bool isDtmStarted; bool isSharedLocalSnapshotSlotPresent; diff --git a/src/backend/cdb/dispatcher/Makefile b/src/backend/cdb/dispatcher/Makefile index e8ac7582898..8aaf74b1970 100644 --- a/src/backend/cdb/dispatcher/Makefile +++ b/src/backend/cdb/dispatcher/Makefile @@ -11,5 +11,5 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS += -I$(libpq_srcdir) -I$(top_srcdir)/src/port -I$(top_srcdir)/src/backend/utils/misc -OBJS = cdbconn.o cdbdisp.o cdbdisp_async.o cdbdispatchresult.o cdbdisp_dtx.o cdbdisp_query.o cdbgang.o cdbgang_async.o cdbpq.o +OBJS = cdbconn.o cdbdisp.o cdbdisp_async.o cdbdispatchresult.o cdbdisp_dtx.o cdbdisp_query.o cdbgang.o cdbgang_async.o cdbpq.o cdbdisp_extra.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/cdb/dispatcher/cdbdisp_extra.c b/src/backend/cdb/dispatcher/cdbdisp_extra.c new file mode 100644 index 00000000000..8a032222a29 --- /dev/null +++ b/src/backend/cdb/dispatcher/cdbdisp_extra.c @@ -0,0 +1,170 @@ +#include "postgres.h" + +#include "cdb/cdbdisp_extra.h" +#include "libpq/pqformat.h" +#include "utils/hsearch.h" + + +static HTAB *ExtraDispTable = NULL; + +typedef struct ExtraDispEntry +{ + char extraDispName[EXTRADISPNAME_MAX_LEN]; + PackFunc packFunc; + UnpackFunc unpackFunc; +} ExtraDispEntry; + +void +RegisterExtraDispatch(const char *extraDispName, PackFunc packFunc, UnpackFunc unpackFunc) +{ + ExtraDispEntry *entry; + bool found; + + if (ExtraDispTable == NULL) + { + HASHCTL ctl; + + ctl.keysize = EXTRADISPNAME_MAX_LEN; + ctl.entrysize = sizeof(ExtraDispEntry); + + ExtraDispTable = hash_create("extra dispatch info", 8, &ctl, + HASH_ELEM | HASH_STRINGS); + } + + if (strlen(extraDispName) >= EXTRADISPNAME_MAX_LEN) + elog(ERROR, "extra dispatch name is too long"); + + entry = (ExtraDispEntry *) hash_search(ExtraDispTable, + extraDispName, + HASH_ENTER, &found); + if (found) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("extra dispatch name \"%s\" already exists", + extraDispName))); + + entry->packFunc = packFunc; + entry->unpackFunc = unpackFunc; +} + +/* Return packaged messages. each message has the same format: + * ("%d%s\0%s", totalLen, name, payload). totalLen is the message + * length not including itself. name is the name of this message, + * a following '\0' marks the end. payload is the main body of the + * message. + */ +char * +PackExtraMsgs(int *len) +{ + HASH_SEQ_STATUS status; + ExtraDispEntry *hentry; + char **payloads; + int *lengths; + int payloadLen; + char **names; + int nameLen; + char *total; + int totalLen; + char *pos; + int tmp; + int n; + int i; + + if (!ExtraDispTable) + { + *len = 0; + return NULL; + } + + n = hash_get_num_entries(ExtraDispTable); + payloads = (char **) palloc(n * sizeof(char *)); + lengths = (int *) palloc(n * sizeof(int)); + names = (char **) palloc(n * sizeof(char *)); + + i = 0; + totalLen = 0; + hash_seq_init(&status, ExtraDispTable); + while ((hentry = (ExtraDispEntry *) hash_seq_search(&status)) != NULL) + { + payloads[i] = (*(hentry->packFunc))(lengths + i); + names[i] = hentry->extraDispName; + totalLen += sizeof(int) + strlen(names[i]) + 1 + *(lengths + i); + i++; + } + Assert(i = n); + + total = palloc(totalLen); + pos = total; + + for(i=0; i < n; i++) + { + payloadLen = *(lengths + i); + nameLen = strlen(names[i]); + + /* lenth */ + tmp = htonl(payloadLen + nameLen + 1); + memcpy(pos, &tmp, sizeof(tmp)); + pos += sizeof(tmp); + + /* name */ + memcpy(pos, names[i], nameLen + 1); + pos += nameLen + 1; + + /* payload */ + memcpy(pos, payloads[i], payloadLen); + pos += payloadLen; + + pfree(payloads[i]); + } + + Assert(pos - total == totalLen); + + pfree(names); + pfree(payloads); + pfree(lengths); + + *len = totalLen; + return total; +} + +void +UnPackExtraMsgs(StringInfo inputMsgs) +{ + ExtraDispEntry *entry; + const char *name; + const char *payload; + int payloadLen; + int totalLen; + bool found; + int n; + int i; + + if (!ExtraDispTable) + return; + + n = hash_get_num_entries(ExtraDispTable); + i = n; + + while (inputMsgs->cursor < inputMsgs->len) + { + totalLen = pq_getmsgint(inputMsgs, 4); + name = pq_getmsgstring(inputMsgs); + payloadLen = totalLen - strlen(name) - 1; + payload = pq_getmsgbytes(inputMsgs, payloadLen); + + entry = (ExtraDispEntry *) hash_search(ExtraDispTable, + name, + HASH_FIND, &found); + if (!found) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("extra dispatch %s not found", name))); + + (*(entry->unpackFunc))(payload, payloadLen); + i--; + } + if (i != 0) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("extra dispatch count mismatch, registered %d, get %d", n, i))); +} diff --git a/src/backend/cdb/dispatcher/cdbdisp_query.c b/src/backend/cdb/dispatcher/cdbdisp_query.c index 9bbfd10dd2d..4aa6129ce90 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_query.c +++ b/src/backend/cdb/dispatcher/cdbdisp_query.c @@ -42,6 +42,7 @@ #include "mb/pg_wchar.h" #include "cdb/cdbdisp.h" +#include "cdb/cdbdisp_extra.h" #include "cdb/cdbdisp_query.h" #include "cdb/cdbdisp_dtx.h" /* for qdSerializeDtxContextInfo() */ #include "cdb/cdbdispatchresult.h" @@ -98,6 +99,12 @@ typedef struct DispatchCommandQueryParms int serializedDtxContextInfolen; } DispatchCommandQueryParms; +/* + * Hooks for plugins to get control in command dispatch + */ +CdbNeedDispatchCommand_hook_type CdbNeedDispatchCommand_hook = NULL; +CdbNeedDispatchUtility_hook_type CdbNeedDispatchUtility_hook = NULL; + static int fillSliceVector(SliceTable *sliceTable, int sliceIndex, SliceVec *sliceVector, @@ -391,7 +398,12 @@ CdbDispatchCommandToSegments(const char *strCommand, CdbPgResults *cdb_pgresults) { DispatchCommandQueryParms *pQueryParms; - bool needTwoPhase = flags & DF_NEED_TWO_PHASE; + bool needTwoPhase; + + if (CdbNeedDispatchCommand_hook && !CdbNeedDispatchCommand_hook(strCommand, &flags, segments, cdb_pgresults)) + return; + + needTwoPhase = flags & DF_NEED_TWO_PHASE; if (needTwoPhase) setupDtxTransaction(); @@ -429,9 +441,14 @@ CdbDispatchUtilityStatement(struct Node *stmt, CdbPgResults *cdb_pgresults) { DispatchCommandQueryParms *pQueryParms; - bool needTwoPhase = flags & DF_NEED_TWO_PHASE; + bool needTwoPhase; Assert(Gp_role == GP_ROLE_DISPATCH && ENABLE_DISPATCH()); + + if (CdbNeedDispatchUtility_hook && !CdbNeedDispatchUtility_hook(stmt, &flags)) + return; + + needTwoPhase = flags & DF_NEED_TWO_PHASE; if (needTwoPhase) setupDtxTransaction(); @@ -871,6 +888,8 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms, int total_query_len; char *shared_query, *pos; + char *extraMsgs; + int extraLen; MemoryContext oldContext; /* @@ -917,6 +936,9 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms, sizeof(resgroupInfo.len) + resgroupInfo.len; + extraMsgs = PackExtraMsgs(&extraLen); + total_query_len += extraLen; + shared_query = palloc(total_query_len); pos = shared_query; @@ -1010,6 +1032,13 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms, pos += resgroupInfo.len; } + if (extraLen > 0) + { + memcpy(pos, extraMsgs, extraLen); + pos += extraLen; + pfree(extraMsgs); + } + len = pos - shared_query - 1; /* @@ -1306,8 +1335,12 @@ CdbDispatchCopyStart(struct CdbCopy *cdbCopy, Node *stmt, int flags) CdbDispatcherState *ds; Gang *primaryGang; ErrorData *error = NULL; - bool needTwoPhase = flags & DF_NEED_TWO_PHASE; + bool needTwoPhase; + + if (CdbNeedDispatchUtility_hook && !CdbNeedDispatchUtility_hook(stmt, &flags)) + return; + needTwoPhase = flags & DF_NEED_TWO_PHASE; if (needTwoPhase) setupDtxTransaction(); diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index b21ef9c9f0f..4908b707706 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -120,6 +120,9 @@ ExecutorEnd_hook_type ExecutorEnd_hook = NULL; /* Hook for plugin to get control in ExecCheckRTPerms() */ ExecutorCheckPerms_hook_type ExecutorCheckPerms_hook = NULL; +/* Hook for plugins to get control in DtxTransaction Management */ +SetDtxFlag_hook_type SetDtxFlag_hook = NULL; + /* decls for local routines only used within this module */ static void InitPlan(QueryDesc *queryDesc, int eflags); static void CheckValidRowMarkRel(Relation rel, RowMarkType markType); @@ -562,6 +565,8 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) * work for this query. */ needDtx = ExecutorSaysTransactionDoesWrites(); + if (SetDtxFlag_hook) + needDtx = SetDtxFlag_hook(needDtx); if (needDtx) setupDtxTransaction(); diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index 36879297d9b..6da2a2b524f 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -207,6 +207,11 @@ static int default_multixact_freeze_table_age; /* Memory context for long-lived data */ static MemoryContext AutovacMemCxt; +/* + * Hook for plugins to get control in AutoVacLauncher. + */ +AutoVacLauncherMain_hook_type AutoVacLauncherMain_hook = NULL; + /* struct to keep track of databases in launcher */ typedef struct avl_dbase { @@ -489,6 +494,9 @@ AutoVacLauncherMain(int argc, char *argv[]) { sigjmp_buf local_sigjmp_buf; + if (AutoVacLauncherMain_hook) + (*AutoVacLauncherMain_hook) (argc, argv); + am_autovacuum_launcher = true; MyBackendType = B_AUTOVAC_LAUNCHER; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index e796faddf31..ce5c5ef2c6b 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -43,6 +43,7 @@ #include "catalog/catalog.h" #include "catalog/storage.h" #include "catalog/storage_xlog.h" +#include "cdb/cdbvars.h" #include "crypto/bufenc.h" #include "executor/instrument.h" #include "lib/binaryheap.h" @@ -1255,6 +1256,23 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, *foundPtr = true; +#ifdef SERVERLESS + /* + * TODO: use GUC/hook instead of macro + * + * The page in buffer may be out of date, we need to check the buffer + * and refresh the buffer if the page has been modified. + */ + if (Gp_role == GP_ROLE_EXECUTE && valid) + { + uint32 buf_state = LockBufHdr(buf); + + buf_state &= ~BM_VALID; + UnlockBufHdr(buf, buf_state); + + valid = false; + } +#endif if (!valid) { /* diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 93e6152ae84..06e70b5f675 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -83,6 +83,11 @@ CountDBSession_hook_type CountDBSession_hook = NULL; +/* + * Hook for plugins to get control in GetSnapshotData + */ +GetSnapshotData_hook_type GetSnapshotData_hook = NULL; + /* Our shared memory area */ typedef struct ProcArrayStruct { @@ -297,10 +302,10 @@ static TransactionId standbySnapshotPendingXmin; * GlobalVisState for details. As shared, catalog, normal and temporary * relations can have different horizons, one such state exists for each. */ -static GlobalVisState GlobalVisSharedRels; -static GlobalVisState GlobalVisCatalogRels; -static GlobalVisState GlobalVisDataRels; -static GlobalVisState GlobalVisTempRels; +GlobalVisState GlobalVisSharedRels; +GlobalVisState GlobalVisCatalogRels; +GlobalVisState GlobalVisDataRels; +GlobalVisState GlobalVisTempRels; /* * This backend's RecentXmin at the last time the accurate xmin horizon was @@ -2942,6 +2947,9 @@ GetSnapshotData(Snapshot snapshot, DtxContext distributedTransactionContext) errmsg("out of memory"))); } + if (GetSnapshotData_hook) + return (*GetSnapshotData_hook) (snapshot, distributedTransactionContext); + /* * GP: Distributed snapshot. */ diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index c545c42298c..6e821734f2f 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -97,6 +97,7 @@ #include "cdb/cdbsrlz.h" #include "cdb/cdbtm.h" #include "cdb/cdbdtxcontextinfo.h" +#include "cdb/cdbdisp_extra.h" #include "cdb/cdbdisp_query.h" #include "cdb/cdbdispatchresult.h" #include "cdb/cdbendpoint.h" @@ -148,6 +149,11 @@ int client_connection_check_interval = 0; */ cancel_pending_hook_type cancel_pending_hook = NULL; +/* + * Hook for plugins to handle the txn message + */ +HandleTxnCommand_hook_type HandleTxnCommand_hook = NULL; + /* ---------------- * private typedefs etc * ---------------- @@ -509,6 +515,7 @@ SocketBackend(StringInfo inBuf) break; + case 't': case 'T': /* Cloudberry Database dispatched transaction protocol from QD */ maxmsglen = PQ_LARGE_MESSAGE_LIMIT; doing_extended_query_message = false; @@ -5680,6 +5687,8 @@ PostgresMain(int argc, char *argv[], if (resgroupInfoLen > 0) resgroupInfoBuf = pq_getmsgbytes(&input_message, resgroupInfoLen); + UnPackExtraMsgs(&input_message); + pq_getmsgend(&input_message); elog((Debug_print_full_dtm ? LOG : DEBUG5), "MPP dispatched stmt from QD: %s.",query_string); @@ -5794,6 +5803,18 @@ PostgresMain(int argc, char *argv[], } break; + case 't': /* handle plugin's MPP dispatched txn protocol command from QD */ + { + if (HandleTxnCommand_hook) + HandleTxnCommand_hook(&input_message, &send_ready_for_query); + else + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid frontend message type %d", firstchar), + errdetail("HandleTxnCommand_hook is NULL"))); + } + break; + case 'P': /* parse */ { const char *stmt_name; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index b558ae1db4d..863ca30090c 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -87,6 +87,9 @@ /* Hook for plugins to get control in ProcessUtility() */ ProcessUtility_hook_type ProcessUtility_hook = NULL; +/* Hook for plugins to send explicit begin command */ +SendTxnExplicitBegin_hook_type SendTxnExplicitBegin_hook = NULL; + /* counter to disable dispatch */ int dispatch_nest_level = 0; @@ -668,7 +671,10 @@ standard_ProcessUtility(PlannedStmt *pstmt, /* gp_dispatch */ false); } - sendDtxExplicitBegin(); + if (SendTxnExplicitBegin_hook) + SendTxnExplicitBegin_hook(); + else + sendDtxExplicitBegin(); } break; @@ -730,7 +736,10 @@ standard_ProcessUtility(PlannedStmt *pstmt, * that the BEGIN has been dispatched * before we start dispatching our savepoint. */ - sendDtxExplicitBegin(); + if (SendTxnExplicitBegin_hook) + SendTxnExplicitBegin_hook(); + else + sendDtxExplicitBegin(); DefineDispatchSavepoint(stmt->savepoint_name); break; diff --git a/src/backend/utils/cache/relmapper.c b/src/backend/utils/cache/relmapper.c index 338505d63e2..f8afdba4b5f 100644 --- a/src/backend/utils/cache/relmapper.c +++ b/src/backend/utils/cache/relmapper.c @@ -138,6 +138,10 @@ static RelMapFile active_local_updates; static RelMapFile pending_shared_updates; static RelMapFile pending_local_updates; +/* + * Hook for plugins to get control in load_relmap_file + */ +LoadRelMap_hook_type LoadRelMap_hook = NULL; /* non-export function prototypes */ static void apply_map_update(RelMapFile *map, Oid relationId, RelFileNodeId fileNode, @@ -724,6 +728,9 @@ load_relmap_file(bool shared, bool lock_held) map = &local_map; } + if (LoadRelMap_hook) + return (*LoadRelMap_hook) (shared, lock_held, map); + /* Read data ... */ fd = OpenTransientFile(mapfilename, O_RDONLY | PG_BINARY); if (fd < 0) diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index e482e303940..27257a5d6e2 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -1143,6 +1143,12 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, */ fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace); +#ifndef SERVERLESS + /* + * TODO: use GUC instead of macro. + * + * No database directories/files in serverless, skip sanity check. + */ if (!bootstrap) { if (access(fullpath, F_OK) == -1) @@ -1163,6 +1169,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, ValidatePgVersion(fullpath); } +#endif SetDatabasePath(fullpath); diff --git a/src/include/access/slru.h b/src/include/access/slru.h index 88653ae7e66..c8136f3c1a7 100644 --- a/src/include/access/slru.h +++ b/src/include/access/slru.h @@ -138,6 +138,14 @@ typedef struct SlruCtlData typedef SlruCtlData *SlruCtl; +/* + * Hooks for plugins to get control in SlruPhysicalReadPage/SlruPhysicalWritePage + */ +typedef bool (*SlruPhysicalReadPage_hook_type)(SlruCtl ctl, int pageno, int slotno, bool *result); +extern PGDLLIMPORT SlruPhysicalReadPage_hook_type SlruPhysicalReadPage_hook; + +typedef bool (*SlruPhysicalWritePage_hook_type)(SlruCtl ctl, int pageno, int slotno, bool *result); +extern PGDLLIMPORT SlruPhysicalWritePage_hook_type SlruPhysicalWritePage_hook; extern Size SimpleLruShmemSize(int nslots, int nlsns); extern void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, diff --git a/src/include/access/transam.h b/src/include/access/transam.h index 5a61cdc9ebe..dd6d5f2903d 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -308,6 +308,10 @@ extern bool gp_pause_on_restore_point_replay; typedef RelFileNodeId (*NewSegRelfilenode_assign_hook_type)(void); extern PGDLLIMPORT NewSegRelfilenode_assign_hook_type NewSegRelfilenode_assign_hook; +/* Hook for plugins to get control in GetNewTransactionId */ +typedef FullTransactionId (*GetNewTransactionId_hook_type)(bool isSubXact); +extern PGDLLIMPORT GetNewTransactionId_hook_type GetNewTransactionId_hook; + /* * prototypes for functions in transam/transam.c */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 5a35d7de88f..a35ceabbb29 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -189,6 +189,48 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XACT_XINFO_HAS_DISTRIB (1U << 8) #define XACT_XINFO_HAS_DELDBS (1U << 9) +typedef enum +{ + TXN_PROTOCOL_COMMAND_BEGIN = 0, + TXN_PROTOCOL_COMMAND_ABORT, + TXN_PROTOCOL_COMMAND_COMMIT, + TXN_PROTOCOL_COMMAND_POST_COMMIT, + TXN_PROTOCOL_COMMAND_SUB_BEGIN, + TXN_PROTOCOL_COMMAND_SUB_RELEASE, + TXN_PROTOCOL_COMMAND_SUB_ROLLBACK, +} TxnProtocolCommand; + +/* + * Hooks for plugins to get control in Transaction Management + */ +typedef void(*TransactionParticipateEnd_hook_type)(bool commit); +extern PGDLLIMPORT TransactionParticipateEnd_hook_type TransactionParticipateEnd_hook; + +typedef bool(*NotifySubTransaction_hook_type)(TxnProtocolCommand command); +extern PGDLLIMPORT NotifySubTransaction_hook_type NotifySubTransaction_hook; + +typedef XLogRecPtr +(*XactLogCommitRecord_hook_type) (TimestampTz commit_time, + Oid tablespace_oid_to_delete_on_commit, + int nsubxacts, TransactionId *subxacts, + int nrels, RelFileNodePendingDelete *rels, + int nmsgs, SharedInvalidationMessage *msgs, + int ndeldbs, DbDirNode *deldbs, + bool relcacheInval, + int xactflags, TransactionId twophase_xid, + const char *twophase_gid); +extern PGDLLIMPORT XactLogCommitRecord_hook_type XactLogCommitRecord_hook; + +typedef XLogRecPtr +(*XactLogAbortRecord_hook_type) (TimestampTz abort_time, + Oid tablespace_oid_to_delete_on_abort, + int nsubxacts, TransactionId *subxacts, + int nrels, RelFileNodePendingDelete *rels, + int ndeldbs, DbDirNode *deldbs, + int xactflags, TransactionId twophase_xid, + const char *twophase_gid); +extern PGDLLIMPORT XactLogAbortRecord_hook_type XactLogAbortRecord_hook; + /* * Also stored in xinfo, these indicating a variety of additional actions that * need to occur when emulating transaction effects during recovery. @@ -452,6 +494,7 @@ extern void MarkCurrentTransactionIdLoggedIfAny(void); extern void MarkTopTransactionWriteXLogOnExecutor(void); extern bool SubTransactionIsActive(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); +extern void SetCurrentCommandId(CommandId cid); extern void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts); extern TimestampTz GetCurrentTransactionStartTimestamp(void); extern TimestampTz GetCurrentStatementStartTimestamp(void); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index cec7f91a3ba..6f548e1fc54 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -162,10 +162,22 @@ extern bool StandbyMode; /* tde feature enable or not */ extern int FileEncryptionEnabled; -/* Hook for plugins to do some startup job */ +/* Hook for plugins to get control in StartupXLOG */ +typedef void (*StartupXLOG_hook_type) (void); +extern PGDLLIMPORT StartupXLOG_hook_type StartupXLOG_hook; + +/* Hook for plugins to do additional startup works */ typedef void (*Startup_hook_type) (void); extern PGDLLIMPORT Startup_hook_type Startup_hook; +/* Hook for plugins to get control in XLogFlush */ +typedef void (*XLogFlush_hook_type) (XLogRecPtr record); +extern PGDLLIMPORT XLogFlush_hook_type XLogFlush_hook; + +/* Hook for plugins to get control in CreateCheckPoint */ +typedef void (*CreateCheckPoint_hook_type)(int flags); +extern PGDLLIMPORT CreateCheckPoint_hook_type CreateCheckPoint_hook; + /* Archive modes */ typedef enum ArchiveMode { @@ -229,7 +241,16 @@ extern PGDLLIMPORT int wal_level; (DataChecksumsEnabled() || FileEncryptionEnabled || wal_log_hints) /* Do we need to WAL-log information required only for Hot Standby and logical replication? */ +#ifdef SERVERLESS +/* + * This is not necessary. + * + * Standby is not needed in serverless, so we do not need to WAL-log anything. + */ +#define XLogStandbyInfoActive() (false) +#else #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA) +#endif /* Do we need to WAL-log information required only for logical replication? */ #define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL) @@ -306,6 +327,7 @@ typedef enum WALAvailability } WALAvailability; struct XLogRecData; +typedef struct XLogCtlData XLogCtlData; extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata, XLogRecPtr fpw_lsn, @@ -438,5 +460,7 @@ extern bool IsRoleMirror(void); extern void SignalPromote(void); extern XLogRecPtr XLogLastInsertBeginLoc(void); extern void initialize_wal_bytes_written(void); +extern ControlFileData *GetControlFile(void); +extern XLogCtlData *GetXLogCtl(void); #endif /* XLOG_H */ diff --git a/src/include/cdb/cdbdisp_extra.h b/src/include/cdb/cdbdisp_extra.h new file mode 100644 index 00000000000..b6bac03b3cd --- /dev/null +++ b/src/include/cdb/cdbdisp_extra.h @@ -0,0 +1,15 @@ +#ifndef CDBDISP_EXTRA_H +#define CDBDISP_EXTRA_H + +#include "lib/stringinfo.h" + +#define EXTRADISPNAME_MAX_LEN 64 + +typedef char *(*PackFunc) (int *len); +typedef void (*UnpackFunc) (const char *msg, int len); + +extern void RegisterExtraDispatch(const char *extraDispName, PackFunc packFunc, UnpackFunc unpackFunc); +extern char *PackExtraMsgs(int *len); +extern void UnPackExtraMsgs(StringInfo strInfo); + +#endif /* CDBDISP_EXTRA_H */ diff --git a/src/include/cdb/cdbdisp_query.h b/src/include/cdb/cdbdisp_query.h index 34c1deea4d8..cb05c49a842 100644 --- a/src/include/cdb/cdbdisp_query.h +++ b/src/include/cdb/cdbdisp_query.h @@ -40,6 +40,18 @@ struct CdbDispatcherState; struct CdbPgResults; struct CdbCopy; +/* + * Hooks for plugins to get control in command dispatch + */ +typedef bool (*CdbNeedDispatchCommand_hook_type) (const char *strCommand, + int *flags, + List *segments, + struct CdbPgResults *cdb_pgresults); +extern PGDLLIMPORT CdbNeedDispatchCommand_hook_type CdbNeedDispatchCommand_hook; + +typedef bool (*CdbNeedDispatchUtility_hook_type) (struct Node *stmt, int *flags); +extern PGDLLIMPORT CdbNeedDispatchUtility_hook_type CdbNeedDispatchUtility_hook; + /* Compose and dispatch the MPPEXEC commands corresponding to a plan tree * within a complete parallel plan. * diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index e53ed647a93..fde52ec6e34 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -97,6 +97,9 @@ extern PGDLLIMPORT ExecutorEnd_hook_type ExecutorEnd_hook; typedef bool (*ExecutorCheckPerms_hook_type) (List *, bool); extern PGDLLIMPORT ExecutorCheckPerms_hook_type ExecutorCheckPerms_hook; +/* Hook for plugins to get control in DtxTransaction Management */ +typedef bool (*SetDtxFlag_hook_type) (bool needDxt); +extern PGDLLIMPORT SetDtxFlag_hook_type SetDtxFlag_hook; /* * prototypes from functions in execAmi.c diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index 5557c826fe0..9f6e22a60cc 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -966,6 +966,9 @@ RELSEG_SIZE requires an initdb. */ #undef RELSEG_SIZE +/* Define to 1 to use serverless architecture of Cloudberry. (--enable-serverless) */ +#undef SERVERLESS + /* The size of `bool', as computed by sizeof. */ #undef SIZEOF_BOOL diff --git a/src/include/postgres.h b/src/include/postgres.h index 3561e2ed40b..244024f5d73 100644 --- a/src/include/postgres.h +++ b/src/include/postgres.h @@ -45,6 +45,7 @@ #define POSTGRES_H #include "c.h" +#include "lib/stringinfo.h" #include "utils/elog.h" #include "utils/palloc.h" #include "storage/itemptr.h" @@ -460,6 +461,10 @@ typedef struct NullableDatum /* due to alignment padding this could be used for flags for free */ } NullableDatum; +/* Hook for plugins to handle the txn message */ +typedef void(*HandleTxnCommand_hook_type)(StringInfo input_message, volatile bool *send_ready_for_query); +extern PGDLLIMPORT HandleTxnCommand_hook_type HandleTxnCommand_hook; + #define SIZEOF_DATUM SIZEOF_VOID_P StaticAssertDecl(SIZEOF_DATUM == 8, "sizeof datum is not 8"); /* diff --git a/src/include/postmaster/autovacuum.h b/src/include/postmaster/autovacuum.h index aacdd0f5753..5810ce55890 100644 --- a/src/include/postmaster/autovacuum.h +++ b/src/include/postmaster/autovacuum.h @@ -55,6 +55,10 @@ extern bool IsAutoVacuumWorkerProcess(void); #define IsAnyAutoVacuumProcess() \ (IsAutoVacuumLauncherProcess() || IsAutoVacuumWorkerProcess()) +/* Hook for plugins to get control in AutoVacLauncher */ +typedef void (*AutoVacLauncherMain_hook_type)(int argc, char *argv[]); +extern PGDLLIMPORT AutoVacLauncherMain_hook_type AutoVacLauncherMain_hook; + /* Functions to start autovacuum process, called from postmaster */ extern void autovac_init(void); extern int StartAutoVacLauncher(void); diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index a7b0eff7b22..9f1e60f2f2a 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -48,6 +48,10 @@ extern void ExpireOldKnownAssignedTransactionIds(TransactionId xid); extern int GetMaxSnapshotXidCount(void); extern int GetMaxSnapshotSubxidCount(void); +/* Hook for plugins to get control in GetSnapshotData */ +typedef Snapshot (*GetSnapshotData_hook_type)(Snapshot snapshot, DtxContext distributedTransactionContext); +extern PGDLLIMPORT GetSnapshotData_hook_type GetSnapshotData_hook; + extern Snapshot GetSnapshotData(Snapshot snapshot, DtxContext distributedTransactionContext); extern bool ProcArrayInstallImportedXmin(TransactionId xmin, diff --git a/src/include/tcop/utility.h b/src/include/tcop/utility.h index 212e9b32806..3a7bf2c390c 100644 --- a/src/include/tcop/utility.h +++ b/src/include/tcop/utility.h @@ -77,6 +77,10 @@ typedef void (*ProcessUtility_hook_type) (PlannedStmt *pstmt, DestReceiver *dest, QueryCompletion *qc); extern PGDLLIMPORT ProcessUtility_hook_type ProcessUtility_hook; +/* Hook for plugins to send explicit begin command */ +typedef void (*SendTxnExplicitBegin_hook_type)(void); +extern PGDLLIMPORT SendTxnExplicitBegin_hook_type SendTxnExplicitBegin_hook; + extern void ProcessUtility(PlannedStmt *pstmt, const char *queryString, bool readOnlyTree, ProcessUtilityContext context, ParamListInfo params, diff --git a/src/include/utils/relmapper.h b/src/include/utils/relmapper.h index c0c772cce14..419730c8a9e 100644 --- a/src/include/utils/relmapper.h +++ b/src/include/utils/relmapper.h @@ -35,6 +35,10 @@ typedef struct xl_relmap_update #define MinSizeOfRelmapUpdate offsetof(xl_relmap_update, data) +typedef struct RelMapFile RelMapFile; +/* Hook for plugins to get control in load_relmap_file */ +typedef void (*LoadRelMap_hook_type)(bool shared, bool lock_held, RelMapFile *map); +extern PGDLLIMPORT LoadRelMap_hook_type LoadRelMap_hook; extern RelFileNodeId RelationMapOidToFilenode(Oid relationId, bool shared); diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index 6b959fa04fa..eaaddc0c7ea 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -162,6 +162,11 @@ extern TransactionId GlobalVisTestNonRemovableHorizon(GlobalVisState *state); extern bool GlobalVisCheckRemovableXid(Relation rel, TransactionId xid); extern bool GlobalVisCheckRemovableFullXid(Relation rel, FullTransactionId fxid); +extern GlobalVisState GlobalVisSharedRels; +extern GlobalVisState GlobalVisCatalogRels; +extern GlobalVisState GlobalVisDataRels; +extern GlobalVisState GlobalVisTempRels; + /* * Utility functions for implementing visibility routines in table AMs. */ From 4dbcda72a10224d74c0b1c9bd82d40437819af82 Mon Sep 17 00:00:00 2001 From: husen Date: Fri, 11 Aug 2023 17:23:48 +0800 Subject: [PATCH 2/9] 1. add global variable enable_serverless, default to false, set to true in plugin. 2. add SimpleLruReadPage_hook for plugin to read SLRU page. 3. add StartChildProcess_hook for plugin to get control in child process startup. --- configure | 2 +- src/backend/access/transam/slru.c | 56 +++++++++-------------------- src/backend/access/transam/xlog.c | 20 ----------- src/backend/cdb/cdbtm.c | 11 ++---- src/backend/postmaster/postmaster.c | 14 ++++++++ src/backend/storage/buffer/bufmgr.c | 8 ++--- src/backend/utils/init/postinit.c | 9 +---- src/include/access/slru.h | 8 +++++ src/include/access/xlog.h | 13 ------- src/include/postmaster/postmaster.h | 9 +++++ 10 files changed, 54 insertions(+), 96 deletions(-) diff --git a/configure b/configure index 8c063b4d265..d1ec8649731 100755 --- a/configure +++ b/configure @@ -1606,7 +1606,7 @@ Optional Features: --enable-cassert enable assertion checks (for debugging) --disable-orca disable ORCA optimizer --enable-catalog-ext enable CloudberryDB catalog extension - --enable-serverless use Cloudberry serverless architecture + --enable-serverless use CloudberryDB serverless architecture --enable-mapreduce enable CloudberryDB Mapreduce support --enable-gpcloud enable gpcloud support --enable-external-fts enable external fts support diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c index 284d998ad11..a5ce17e7a30 100644 --- a/src/backend/access/transam/slru.c +++ b/src/backend/access/transam/slru.c @@ -54,7 +54,6 @@ #include "access/slru.h" #include "access/transam.h" #include "access/xlog.h" -#include "cdb/cdbvars.h" #include "miscadmin.h" #include "pgstat.h" #include "storage/fd.h" @@ -135,20 +134,15 @@ static SlruErrorCause slru_errcause; static int slru_errno; /* - * Hooks for plugins to get control in SlruPhysicalReadPage/SlruPhysicalWritePage + * Hooks for plugins to get control in SlruPhysicalReadPage/SlruPhysicalWritePage/SimpleLruReadPage */ SlruPhysicalReadPage_hook_type SlruPhysicalReadPage_hook = NULL; SlruPhysicalWritePage_hook_type SlruPhysicalWritePage_hook = NULL; +SimpleLruReadPage_hook_type SimpleLruReadPage_hook = NULL; -static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno); -static void SimpleLruWaitIO(SlruCtl ctl, int slotno); static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata); -static bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno); static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruWriteAll fdata); -static void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid); -static int SlruSelectLRUPage(SlruCtl ctl, int pageno); - static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int segpage, void *data); static void SlruInternalDeleteSegment(SlruCtl ctl, int segno); @@ -325,7 +319,7 @@ SimpleLruZeroPage(SlruCtl ctl, int pageno) * * This assumes that InvalidXLogRecPtr is bitwise-all-0. */ -static void +void SimpleLruZeroLSNs(SlruCtl ctl, int slotno) { SlruShared shared = ctl->shared; @@ -342,7 +336,7 @@ SimpleLruZeroLSNs(SlruCtl ctl, int slotno) * * Control lock must be held at entry, and will be held at exit. */ -static void +void SimpleLruWaitIO(SlruCtl ctl, int slotno) { SlruShared shared = ctl->shared; @@ -402,6 +396,9 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, { SlruShared shared = ctl->shared; + if (SimpleLruReadPage_hook) + return (*SimpleLruReadPage_hook) (ctl, pageno, write_ok, xid); + /* Outer loop handles restart if we must wait for someone else's I/O */ for (;;) { @@ -427,17 +424,6 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, /* Now we must recheck state from the top */ continue; } -#ifdef SERVERLESS - /* - * TODO: add hook/GUC instead? - * The page in buffer may be out of date, we need to check the buffer - * and refresh the buffer if the page has been modified. - */ - if (Gp_role == GP_ROLE_EXECUTE) - { - goto PageRead; - } -#endif /* Otherwise, it's ready to use */ SlruRecentlyUsed(shared, slotno); @@ -452,10 +438,6 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, (shared->page_status[slotno] == SLRU_PAGE_VALID && !shared->page_dirty[slotno])); -#ifdef SERVERLESS -PageRead: -#endif - /* Mark the slot read-busy */ shared->page_number[slotno] = pageno; shared->page_status[slotno] = SLRU_PAGE_READ_IN_PROGRESS; @@ -517,6 +499,12 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid) SlruShared shared = ctl->shared; int slotno; + if (SimpleLruReadPage_hook) + { + LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + return (*SimpleLruReadPage_hook) (ctl, pageno, true, xid); + } + /* Try to find the page while holding only shared lock */ LWLockAcquire(shared->ControlLock, LW_SHARED); @@ -527,18 +515,6 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid) shared->page_status[slotno] != SLRU_PAGE_EMPTY && shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS) { -#ifdef SERVERLESS - /* - * TODO: add hook/GUC instead? - * The page in buffer may be out of date, we need to check the buffer - * and refresh the buffer if the page has been modified. - */ - if (Gp_role == GP_ROLE_EXECUTE) - { - break; - } -#endif - /* See comments for SlruRecentlyUsed macro */ SlruRecentlyUsed(shared, slotno); @@ -712,7 +688,7 @@ SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno) * For now, assume it's not worth keeping a file pointer open across * read/write operations. We could cache one virtual file pointer ... */ -static bool +bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno) { SlruShared shared = ctl->shared; @@ -973,7 +949,7 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruWriteAll fdata) * Issue the error message after failure of SlruPhysicalReadPage or * SlruPhysicalWritePage. Call this after cleaning up shared-memory state. */ -static void +void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid) { int segno = pageno / SLRU_PAGES_PER_SEGMENT; @@ -1058,7 +1034,7 @@ SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid) * * Control lock must be held at entry, and will be held at exit. */ -static int +int SlruSelectLRUPage(SlruCtl ctl, int pageno) { SlruShared shared = ctl->shared; diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index d69b3fe577b..de72cd7de48 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -111,11 +111,6 @@ StartupXLOG_hook_type StartupXLOG_hook = NULL; */ XLogFlush_hook_type XLogFlush_hook = NULL; -/* - * Hook for plugins to get control in CreateCheckPoint. - */ -CreateCheckPoint_hook_type CreateCheckPoint_hook = NULL; - /* Unsupported old recovery command file names (relative to $PGDATA) */ #define RECOVERY_COMMAND_FILE "recovery.conf" #define RECOVERY_COMMAND_DONE "recovery.done" @@ -3148,15 +3143,6 @@ XLogBackgroundFlush(void) TimestampTz now; int flushbytes; -#ifdef SERVERLESS - /* - * TODO: use GUC/hook instead of macro. - * - * Indeed, walwriter is not needed in serverless, we have no WAL in buffer. - */ - return true; -#endif - /* XLOG doesn't need flushing during recovery */ if (RecoveryInProgress()) return false; @@ -9439,12 +9425,6 @@ CreateCheckPoint(int flags) VirtualTransactionId *vxids; int nvxids; - if (CreateCheckPoint_hook) - { - (*CreateCheckPoint_hook) (flags); - return; - } - /* * An end-of-recovery checkpoint is really a shutdown checkpoint, just * issued at a different time. diff --git a/src/backend/cdb/cdbtm.c b/src/backend/cdb/cdbtm.c index f8128eded26..ef558d05001 100644 --- a/src/backend/cdb/cdbtm.c +++ b/src/backend/cdb/cdbtm.c @@ -1608,14 +1608,6 @@ doQEDistributedExplicitBegin() static bool isDtxQueryDispatcher(void) { -#ifdef SERVERLESS - /* - * TODO: use GUC/hook instead of macro. - * - * Distributed transaction is not necessary in serverless. - */ - return false; -#endif bool isDtmStarted; bool isSharedLocalSnapshotSlotPresent; @@ -1624,7 +1616,8 @@ isDtxQueryDispatcher(void) return (Gp_role == GP_ROLE_DISPATCH && isDtmStarted && - isSharedLocalSnapshotSlotPresent); + isSharedLocalSnapshotSlotPresent && + !enable_serverless); } /* diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 3d9a199c8cb..f3fbd6248a6 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -282,6 +282,9 @@ bool remove_temp_files_after_crash = true; /* Hook for plugins to start background workers */ start_bgworkers_hook_type start_bgworkers_hook = NULL; +/* Hook for plugins to get control in StartChildProcess */ +StartChildProcess_hook_type StartChildProcess_hook = NULL; + /* * PIDs of special child processes; 0 when not running. When adding a new PID * to the list, remember to add the process title to GetServerProcessTitle() @@ -466,6 +469,8 @@ bool ClientAuthInProgress = false; /* T during new-client bool redirection_done = false; /* stderr redirected for syslogger? */ +bool enable_serverless = false; /* use CloudberryDB serverless architecture */ + /* received START_AUTOVAC_LAUNCHER signal */ static volatile sig_atomic_t start_autovac_launcher = false; @@ -5990,6 +5995,15 @@ CountChildren(int target) */ static pid_t StartChildProcess(AuxProcType type) +{ + if (StartChildProcess_hook) + return (*StartChildProcess_hook) (type); + + return StartChildProcessInternal(type); +} + +pid_t +StartChildProcessInternal(AuxProcType type) { pid_t pid; char *av[10]; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index ce5c5ef2c6b..810e6ed96a2 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -51,6 +51,7 @@ #include "pg_trace.h" #include "pgstat.h" #include "postmaster/bgwriter.h" +#include "postmaster/postmaster.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/ipc.h" @@ -1256,14 +1257,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, *foundPtr = true; -#ifdef SERVERLESS /* - * TODO: use GUC/hook instead of macro - * * The page in buffer may be out of date, we need to check the buffer * and refresh the buffer if the page has been modified. */ - if (Gp_role == GP_ROLE_EXECUTE && valid) + if (enable_serverless && Gp_role == GP_ROLE_EXECUTE && valid) { uint32 buf_state = LockBufHdr(buf); @@ -1272,7 +1270,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, valid = false; } -#endif + if (!valid) { /* diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 27257a5d6e2..cf78cd717e8 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -1143,13 +1143,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, */ fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace); -#ifndef SERVERLESS - /* - * TODO: use GUC instead of macro. - * - * No database directories/files in serverless, skip sanity check. - */ - if (!bootstrap) + if (!bootstrap && !enable_serverless) { if (access(fullpath, F_OK) == -1) { @@ -1169,7 +1163,6 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, ValidatePgVersion(fullpath); } -#endif SetDatabasePath(fullpath); diff --git a/src/include/access/slru.h b/src/include/access/slru.h index c8136f3c1a7..542df07f586 100644 --- a/src/include/access/slru.h +++ b/src/include/access/slru.h @@ -147,16 +147,22 @@ extern PGDLLIMPORT SlruPhysicalReadPage_hook_type SlruPhysicalReadPage_hook; typedef bool (*SlruPhysicalWritePage_hook_type)(SlruCtl ctl, int pageno, int slotno, bool *result); extern PGDLLIMPORT SlruPhysicalWritePage_hook_type SlruPhysicalWritePage_hook; +typedef int (*SimpleLruReadPage_hook_type)(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid); +extern PGDLLIMPORT SimpleLruReadPage_hook_type SimpleLruReadPage_hook; + extern Size SimpleLruShmemSize(int nslots, int nlsns); extern void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler); extern int SimpleLruZeroPage(SlruCtl ctl, int pageno); +extern void SimpleLruZeroLSNs(SlruCtl ctl, int slotno); +extern void SimpleLruWaitIO(SlruCtl ctl, int slotno); extern int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid); extern int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid); extern void SimpleLruWritePage(SlruCtl ctl, int slotno); +extern int SlruSelectLRUPage(SlruCtl ctl, int pageno); extern void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied); #ifdef USE_ASSERT_CHECKING extern void SlruPagePrecedesUnitTests(SlruCtl ctl, int per_page); @@ -166,6 +172,8 @@ extern void SlruPagePrecedesUnitTests(SlruCtl ctl, int per_page); extern void SimpleLruTruncate(SlruCtl ctl, int cutoffPage); extern void SimpleLruTruncateWithLock(SlruCtl ctl, int cutoffPage); extern bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno); +extern bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno); +extern void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid); typedef bool (*SlruScanCallback) (SlruCtl ctl, char *filename, int segpage, void *data); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 6f548e1fc54..42821d60d29 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -174,10 +174,6 @@ extern PGDLLIMPORT Startup_hook_type Startup_hook; typedef void (*XLogFlush_hook_type) (XLogRecPtr record); extern PGDLLIMPORT XLogFlush_hook_type XLogFlush_hook; -/* Hook for plugins to get control in CreateCheckPoint */ -typedef void (*CreateCheckPoint_hook_type)(int flags); -extern PGDLLIMPORT CreateCheckPoint_hook_type CreateCheckPoint_hook; - /* Archive modes */ typedef enum ArchiveMode { @@ -241,16 +237,7 @@ extern PGDLLIMPORT int wal_level; (DataChecksumsEnabled() || FileEncryptionEnabled || wal_log_hints) /* Do we need to WAL-log information required only for Hot Standby and logical replication? */ -#ifdef SERVERLESS -/* - * This is not necessary. - * - * Standby is not needed in serverless, so we do not need to WAL-log anything. - */ -#define XLogStandbyInfoActive() (false) -#else #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA) -#endif /* Do we need to WAL-log information required only for logical replication? */ #define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL) diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h index 8d4d84582da..1426e626705 100644 --- a/src/include/postmaster/postmaster.h +++ b/src/include/postmaster/postmaster.h @@ -14,6 +14,8 @@ #ifndef _POSTMASTER_H #define _POSTMASTER_H +#include "miscadmin.h" + /* GUC options */ extern bool EnableSSL; extern int ReservedBackends; @@ -35,6 +37,8 @@ extern bool remove_temp_files_after_crash; extern int terminal_fd; +extern bool enable_serverless; + #ifdef WIN32 extern HANDLE PostmasterHandle; #else @@ -57,6 +61,10 @@ extern int postmaster_alive_fds[2]; extern PGDLLIMPORT const char *progname; +/* Hook for plugins to get control in StartChildProcess */ +typedef pid_t (*StartChildProcess_hook_type) (AuxProcType type); +extern PGDLLIMPORT StartChildProcess_hook_type StartChildProcess_hook; + extern void PostmasterMain(int argc, char *argv[]) pg_attribute_noreturn(); extern void ClosePostmasterPorts(bool am_syslogger); extern void InitProcessGlobals(void); @@ -78,6 +86,7 @@ extern void ShmemBackendArrayAllocation(void); extern void load_auxiliary_libraries(void); extern bool amAuxiliaryBgWorker(void); +extern pid_t StartChildProcessInternal(AuxProcType type); #ifdef ENABLE_IC_PROXY # define IC_PROXY_NUM_BGWORKER 1 #else /* ENABLE_IC_PROXY */ From e3ddd0f3af6592f9ab59c7a9b999e0adb18dcb02 Mon Sep 17 00:00:00 2001 From: husen Date: Tue, 15 Aug 2023 10:19:49 +0800 Subject: [PATCH 3/9] rename json_object to pg_json_object to resolve conflict with library jansson --- src/backend/utils/adt/json.c | 2 +- src/include/catalog/pg_proc.dat | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/utils/adt/json.c b/src/backend/utils/adt/json.c index 30ca2cf6c81..c29c8b7a1c7 100644 --- a/src/backend/utils/adt/json.c +++ b/src/backend/utils/adt/json.c @@ -1102,7 +1102,7 @@ json_build_array_noargs(PG_FUNCTION_ARGS) * for a json object. */ Datum -json_object(PG_FUNCTION_ARGS) +pg_json_object(PG_FUNCTION_ARGS) { ArrayType *in_array = PG_GETARG_ARRAYTYPE_P(0); int ndims = ARR_NDIM(in_array); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index bd4a20242c4..e72142f0af7 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -8770,7 +8770,7 @@ prosrc => 'json_build_object_noargs' }, { oid => '3202', descr => 'map text array of key value pairs to json object', proname => 'json_object', prorettype => 'json', proargtypes => '_text', - prosrc => 'json_object' }, + prosrc => 'pg_json_object' }, { oid => '3203', descr => 'map text arrays of keys and values to json object', proname => 'json_object', prorettype => 'json', proargtypes => '_text _text', prosrc => 'json_object_two_arg' }, From 60e9089eb39ec228a0e9672f5433ad7a2acf0352 Mon Sep 17 00:00:00 2001 From: husen Date: Wed, 16 Aug 2023 10:00:44 +0800 Subject: [PATCH 4/9] disable WAL-log information required only for Hot Standby in serverless --- src/backend/postmaster/postmaster.c | 2 -- src/backend/storage/buffer/bufmgr.c | 1 - src/backend/utils/init/globals.c | 5 +++++ src/include/access/xlog.h | 2 +- src/include/miscadmin.h | 1 + src/include/postmaster/postmaster.h | 2 -- 6 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index f3fbd6248a6..7cb7a53413b 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -469,8 +469,6 @@ bool ClientAuthInProgress = false; /* T during new-client bool redirection_done = false; /* stderr redirected for syslogger? */ -bool enable_serverless = false; /* use CloudberryDB serverless architecture */ - /* received START_AUTOVAC_LAUNCHER signal */ static volatile sig_atomic_t start_autovac_launcher = false; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 810e6ed96a2..8f0bce789b9 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -51,7 +51,6 @@ #include "pg_trace.h" #include "pgstat.h" #include "postmaster/bgwriter.h" -#include "postmaster/postmaster.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/ipc.h" diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 5fafe83ca4c..f98293ff1d9 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -149,6 +149,11 @@ double hash_mem_multiplier = 1.0; int maintenance_work_mem = 65536; int max_parallel_maintenance_workers = 2; +/* + * use CloudberryDB serverless architecture + */ +bool enable_serverless = false; + /* * Primary determinants of sizes of shared-memory structures. * diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 42821d60d29..f67120f0d1d 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -237,7 +237,7 @@ extern PGDLLIMPORT int wal_level; (DataChecksumsEnabled() || FileEncryptionEnabled || wal_log_hints) /* Do we need to WAL-log information required only for Hot Standby and logical replication? */ -#define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA) +#define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA && !enable_serverless) /* Do we need to WAL-log information required only for logical replication? */ #define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL) diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 07e375dfea0..da5af4e875e 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -336,6 +336,7 @@ extern PGDLLIMPORT int work_mem; extern PGDLLIMPORT double hash_mem_multiplier; extern PGDLLIMPORT int maintenance_work_mem; extern PGDLLIMPORT int max_parallel_maintenance_workers; +extern PGDLLIMPORT bool enable_serverless; extern PGDLLIMPORT int statement_mem; extern PGDLLIMPORT int max_statement_mem; extern PGDLLIMPORT int gp_vmem_limit_per_query; diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h index 1426e626705..9df3db4c8ec 100644 --- a/src/include/postmaster/postmaster.h +++ b/src/include/postmaster/postmaster.h @@ -37,8 +37,6 @@ extern bool remove_temp_files_after_crash; extern int terminal_fd; -extern bool enable_serverless; - #ifdef WIN32 extern HANDLE PostmasterHandle; #else From adc9b32e77b3c13b20d5412b0a40114ac97d0fc6 Mon Sep 17 00:00:00 2001 From: HuSen8891 Date: Sun, 20 Aug 2023 17:09:33 +0800 Subject: [PATCH 5/9] Add support for creating cluster with single master, and only query on catalog is permitted on single master without warehouse. --- contrib/interconnect/udp/ic_udpifc.c | 20 ++++++++++++++++++++ src/backend/cdb/cdbtm.c | 2 +- src/backend/cdb/cdbutil.c | 4 ++-- src/backend/utils/cache/relcache.c | 10 ++++++++++ 4 files changed, 33 insertions(+), 3 deletions(-) diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index 187696386eb..3cb3a1338e4 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -1599,6 +1599,14 @@ initConnHashTable(ConnHashTable *ht, MemoryContext cxt) ht->cxt = cxt; ht->size = Gp_role == GP_ROLE_DISPATCH ? (getgpsegmentCount() * 2) : ic_htab_size; + + /* + * In serverless architecture, the cluster may have only one QD, skip Initialization. + * Initialization will be done later. + */ + if (enable_serverless && Gp_role == GP_ROLE_DISPATCH && ht->size == 0) + return true; + Assert(ht->size > 0); if (ht->cxt) @@ -1635,6 +1643,18 @@ connAddHash(ConnHashTable *ht, MotionConn *mConn) MemoryContext old = NULL; MotionConnUDP *conn = NULL; + /* + * Initialize connection hash table if needed. + */ + if (enable_serverless && Gp_role == GP_ROLE_DISPATCH && ht->size == 0) + { + old = MemoryContextSwitchTo(ht->cxt); + initConnHashTable(ht, ht->cxt); + MemoryContextSwitchTo(old); + + Assert(ht->size > 0); + } + conn = CONTAINER_OF(mConn, MotionConnUDP, mConn); hashcode = CONN_HASH_VALUE(&conn->conn_info) % ht->size; diff --git a/src/backend/cdb/cdbtm.c b/src/backend/cdb/cdbtm.c index ef558d05001..c6e8d4a6711 100644 --- a/src/backend/cdb/cdbtm.c +++ b/src/backend/cdb/cdbtm.c @@ -1114,7 +1114,7 @@ tmShmemInit(void) /* Initialize locks and shared memory area */ { *shmNextSnapshotId = 0; - *shmDtmStarted = false; + *shmDtmStarted = enable_serverless; *shmCleanupBackends = false; *shmDtxRecoveryPid = 0; *shmDtxRecoveryEvents = DTX_RECOVERY_EVENT_ABORT_PREPARED; diff --git a/src/backend/cdb/cdbutil.c b/src/backend/cdb/cdbutil.c index 0d6f7147d30..f8b7232d1ed 100644 --- a/src/backend/cdb/cdbutil.c +++ b/src/backend/cdb/cdbutil.c @@ -442,7 +442,7 @@ getCdbComponentInfo(void) * Validate that there exists at least one entry and one segment database * in the configuration */ - if (component_databases->total_segment_dbs == 0) + if (component_databases->total_segment_dbs == 0 && !enable_serverless) { ereport(ERROR, (errcode(ERRCODE_CARDINALITY_VIOLATION), @@ -2809,7 +2809,7 @@ getCdbComponentInfo(void) * Validate that there exists at least one entry and one segment database * in the configuration */ - if (component_databases->total_segment_dbs == 0) + if (component_databases->total_segment_dbs == 0 && !enable_serverless) { ereport(ERROR, (errcode(ERRCODE_CARDINALITY_VIOLATION), diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 65d341c6806..d06aa66261a 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -1269,6 +1269,16 @@ RelationBuildDesc(Oid targetRelId, bool insertIt) /* make sure relation is marked as having no open file yet */ relation->rd_smgr = NULL; + if (enable_serverless && !OidIsValid(GetCurrentWarehouseId()) && + Gp_role == GP_ROLE_DISPATCH && !IsSystemRelation(relation)) + { + ereport(ERROR, + (errcode(ERRCODE_GP_FEATURE_NOT_YET), + errmsg("cannot access table \"%s\" in current transaction", + get_rel_name(targetRelId)), + errhint("Switch to exist warehouse before any query."))); + } + /* * initialize Cloudberry Database partitioning info */ From 85ba4136344953ca6792be3a12b8c1b1ffc26f67 Mon Sep 17 00:00:00 2001 From: HuSen8891 Date: Wed, 23 Aug 2023 17:33:10 +0800 Subject: [PATCH 6/9] Add: Support to create hashdata table with randomly distribution. Currently, we use randomly distribution for hashdata table, and the number of segments is set to 0. When we query on hashdata table, the distribution policy's segment number is set to number of segments of current warehouse. --- contrib/interconnect/udp/ic_udpifc.c | 11 +++++++---- src/backend/cdb/cdbcat.c | 14 +++++++++++--- src/backend/utils/cache/relcache.c | 10 ---------- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index 3cb3a1338e4..38ba381d87d 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -1825,10 +1825,13 @@ destroyConnHashTable(ConnHashTable *ht) } } - if (ht->cxt) - pfree(ht->table); - else - free(ht->table); + if (ht->size > 0) + { + if (ht->cxt) + pfree(ht->table); + else + free(ht->table); + } ht->table = NULL; ht->size = 0; diff --git a/src/backend/cdb/cdbcat.c b/src/backend/cdb/cdbcat.c index dc5a93bb556..d3c81c673bc 100644 --- a/src/backend/cdb/cdbcat.c +++ b/src/backend/cdb/cdbcat.c @@ -97,7 +97,7 @@ makeGpPolicy(GpPolicyType ptype, int nattrs, int numsegments) policy->numsegments = numsegments; policy->nattrs = nattrs; - Assert(numsegments > 0 || + Assert(numsegments >= 0 || (ptype == POLICYTYPE_ENTRY && numsegments == -1)); return policy; @@ -458,8 +458,16 @@ GpPolicyFetch(Oid tbloid) } /* Create a GpPolicy object. */ - policy = makeGpPolicy(POLICYTYPE_PARTITIONED, - nattrs, policyform->numsegments); + if (policyform->numsegments == 0) + { + policy = makeGpPolicy(POLICYTYPE_PARTITIONED, + nattrs, getgpsegmentCount()); + } + else + { + policy = makeGpPolicy(POLICYTYPE_PARTITIONED, + nattrs, policyform->numsegments); + } for (i = 0; i < nattrs; i++) { diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index d06aa66261a..65d341c6806 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -1269,16 +1269,6 @@ RelationBuildDesc(Oid targetRelId, bool insertIt) /* make sure relation is marked as having no open file yet */ relation->rd_smgr = NULL; - if (enable_serverless && !OidIsValid(GetCurrentWarehouseId()) && - Gp_role == GP_ROLE_DISPATCH && !IsSystemRelation(relation)) - { - ereport(ERROR, - (errcode(ERRCODE_GP_FEATURE_NOT_YET), - errmsg("cannot access table \"%s\" in current transaction", - get_rel_name(targetRelId)), - errhint("Switch to exist warehouse before any query."))); - } - /* * initialize Cloudberry Database partitioning info */ From b5f55251d841c00e657ee2e0af1f4075bf04e1b4 Mon Sep 17 00:00:00 2001 From: HuSen8891 Date: Tue, 29 Aug 2023 16:46:45 +0800 Subject: [PATCH 7/9] 1. set distributedXid to LocalTransactionId 2. do not send FTS Probe Request --- src/backend/cdb/cdbdtxcontextinfo.c | 3 +++ src/backend/cdb/cdbfts.c | 6 ++++++ src/backend/tcop/postgres.c | 3 +-- src/include/tcop/tcopprot.h | 1 + 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/backend/cdb/cdbdtxcontextinfo.c b/src/backend/cdb/cdbdtxcontextinfo.c index 1a3c1b8f295..9227d844b74 100644 --- a/src/backend/cdb/cdbdtxcontextinfo.c +++ b/src/backend/cdb/cdbdtxcontextinfo.c @@ -23,6 +23,7 @@ #include "access/xact.h" #include "utils/guc.h" #include "utils/session_state.h" +#include "storage/proc.h" /* * process local cache used to identify "dispatch units" @@ -46,6 +47,8 @@ DtxContextInfo_CreateOnMaster(DtxContextInfo *dtxContextInfo, bool inCursor, DtxContextInfo_Reset(dtxContextInfo); dtxContextInfo->distributedXid = getDistributedTransactionId(); + if (enable_serverless) + dtxContextInfo->distributedXid = MyProc->lxid; if (dtxContextInfo->distributedXid != InvalidDistributedTransactionId) dtxContextInfo->curcid = curcid; diff --git a/src/backend/cdb/cdbfts.c b/src/backend/cdb/cdbfts.c index 754d3054cbb..8155663e984 100644 --- a/src/backend/cdb/cdbfts.c +++ b/src/backend/cdb/cdbfts.c @@ -87,6 +87,9 @@ FtsNotifyProber(void) if (am_ftsprobe) return; + if (enable_serverless) + return; + SpinLockAcquire(&ftsProbeInfo->lock); initial_started = ftsProbeInfo->start_count; SpinLockRelease(&ftsProbeInfo->lock); @@ -177,6 +180,9 @@ getFtsVersion(void) void FtsNotifyProber(void) { + if (enable_serverless) + return; + Assert(Gp_role == GP_ROLE_DISPATCH); SendPostmasterSignal(PMSIGNAL_WAKEN_FTS); SIMPLE_FAULT_INJECTOR("ftsNotify_before"); diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 6e821734f2f..3c62cc77660 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -264,7 +264,6 @@ static int errdetail_params(ParamListInfo params); static int errdetail_abort(void); static int errdetail_recovery_conflict(void); static void bind_param_error_callback(void *arg); -static void start_xact_command(void); static void finish_xact_command(void); static bool IsTransactionExitStmt(Node *parsetree); static bool IsTransactionExitStmtList(List *pstmts); @@ -3468,7 +3467,7 @@ exec_describe_portal_message(const char *portal_name) /* * Convenience routines for starting/committing a single command. */ -static void +void start_xact_command(void) { if (!xact_started) diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index 761c6c03baa..52ad2434a23 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -64,6 +64,7 @@ extern List *pg_plan_queries(List *querytrees, const char *query_string, extern bool check_max_stack_depth(int *newval, void **extra, GucSource source); extern void assign_max_stack_depth(int newval, void *extra); +extern void start_xact_command(void); extern void die(SIGNAL_ARGS); extern void quickdie(SIGNAL_ARGS) pg_attribute_noreturn(); extern void StatementCancelHandler(SIGNAL_ARGS); From d3fa198ad5587c0e6da68af8e6ec7f7f69e97cc1 Mon Sep 17 00:00:00 2001 From: HuSen8891 Date: Thu, 31 Aug 2023 10:16:52 +0800 Subject: [PATCH 8/9] Feature: support subtransaction and savepoint --- src/backend/access/transam/clog.c | 4 ++ src/backend/access/transam/subtrans.c | 9 +++ src/backend/access/transam/xact.c | 85 ++++++++++++++++++++++++--- src/backend/cdb/cdbtm.c | 3 + src/backend/storage/buffer/bufmgr.c | 3 +- src/include/access/subtrans.h | 4 ++ src/include/access/xact.h | 2 + 7 files changed, 100 insertions(+), 10 deletions(-) diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index 86e61aee5a2..a14a1f36c80 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -38,6 +38,7 @@ #include "access/xlog.h" #include "access/xloginsert.h" #include "access/xlogutils.h" +#include "cdb/cdbvars.h" #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" @@ -167,6 +168,9 @@ TransactionIdSetTreeStatus(TransactionId xid, int nsubxids, int pageno = TransactionIdToPage(xid); /* get page of parent */ int i; + if (enable_serverless && Gp_role != GP_ROLE_DISPATCH) + return; + Assert(status == TRANSACTION_STATUS_COMMITTED || status == TRANSACTION_STATUS_ABORTED); diff --git a/src/backend/access/transam/subtrans.c b/src/backend/access/transam/subtrans.c index 5339eeaa1c0..88e8bc5f8ef 100644 --- a/src/backend/access/transam/subtrans.c +++ b/src/backend/access/transam/subtrans.c @@ -379,3 +379,12 @@ SubTransPagePrecedes(int page1, int page2) return (TransactionIdPrecedes(xid1, xid2) && TransactionIdPrecedes(xid1, xid2 + SUBTRANS_XACTS_PER_PAGE - 1)); } + +/* + * Get SUBTRANS control data + */ +SlruCtl +SUBTRANS_Ctl(void) +{ + return SubTransCtl; +} \ No newline at end of file diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 19601b13c9d..d1855475d76 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5566,17 +5566,19 @@ BeginInternalSubTransaction(const char *name) if (Gp_role == GP_ROLE_DISPATCH) { - if (!doDispatchSubtransactionInternalCmd( - DTX_PROTOCOL_COMMAND_SUBTRANSACTION_BEGIN_INTERNAL)) + if (NotifySubTransaction_hook) + NotifySubTransaction_hook(TXN_PROTOCOL_COMMAND_SUB_BEGIN); + else { - elog(ERROR, - "Could not BeginInternalSubTransaction dispatch failed"); + if (!doDispatchSubtransactionInternalCmd( + DTX_PROTOCOL_COMMAND_SUBTRANSACTION_BEGIN_INTERNAL)) + { + elog(ERROR, + "Could not BeginInternalSubTransaction dispatch failed"); + } } } - if (NotifySubTransaction_hook) - NotifySubTransaction_hook(TXN_PROTOCOL_COMMAND_SUB_BEGIN); - /* * Workers synchronize transaction state at the beginning of each parallel * operation, so we can't account for new subtransactions after that @@ -5749,7 +5751,7 @@ RollbackAndReleaseCurrentSubTransaction(void) if (Gp_role == GP_ROLE_DISPATCH) { - if (!doDispatchSubtransactionInternalCmd( + if (!NotifySubTransaction_hook && !doDispatchSubtransactionInternalCmd( DTX_PROTOCOL_COMMAND_SUBTRANSACTION_ROLLBACK_INTERNAL)) { ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), @@ -7604,3 +7606,70 @@ MarkSubTransactionAssigned(void) CurrentTransactionState->assigned = true; } + +/* + * Get all xids of top level transaction and subtransactons + */ +FullTransactionId * +GetAllXids(int *nxids) +{ + FullTransactionId *xids = NULL; + int len = PGPROC_MAX_CACHED_SUBXIDS; + + *nxids = 0; + + if (FullTransactionIdIsValid(CurrentTransactionState->fullTransactionId)) + { + TransactionState xact = CurrentTransactionState; + + if (xids == NULL) + xids = (FullTransactionId *)palloc(sizeof(FullTransactionId) * len); + xids[(*nxids)++] = xact->fullTransactionId; + + while (xact->parent) + { + xact = xact->parent; + xids[(*nxids)++] = xact->fullTransactionId; + + if ((*nxids) >= len) + { + len *= 2; + xids = (FullTransactionId *)repalloc(xids, sizeof(FullTransactionId) * len); + } + } + } + + return xids; +} + +/* + * Get number of transaction and subtransactions which have no xid. + */ +int +GetNumOfTxnStatesWithoutXid(void) +{ + int nlevels = 0; + + if (!FullTransactionIdIsValid(CurrentTransactionState->fullTransactionId)) + { + TransactionState xact = CurrentTransactionState; + + nlevels++; + + while (xact->parent) + { + xact = xact->parent; + + if (!FullTransactionIdIsValid(xact->fullTransactionId)) + { + nlevels++; + } + else + { + break; + } + } + } + + return nlevels; +} \ No newline at end of file diff --git a/src/backend/cdb/cdbtm.c b/src/backend/cdb/cdbtm.c index c6e8d4a6711..6b70abd9eaf 100644 --- a/src/backend/cdb/cdbtm.c +++ b/src/backend/cdb/cdbtm.c @@ -1385,6 +1385,9 @@ dispatchDtxCommand(const char *cmd) elog(DTM_DEBUG5, "dispatchDtxCommand: '%s'", cmd); + if (enable_serverless) + return true; + if (currentGxactWriterGangLost()) { ereport(WARNING, diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 8f0bce789b9..ff821e126a4 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1263,8 +1263,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (enable_serverless && Gp_role == GP_ROLE_EXECUTE && valid) { uint32 buf_state = LockBufHdr(buf); - - buf_state &= ~BM_VALID; + buf_state &= ~(BM_VALID | BM_DIRTY); UnlockBufHdr(buf, buf_state); valid = false; diff --git a/src/include/access/subtrans.h b/src/include/access/subtrans.h index 9a54dc0fb3b..9ad1ecc31b9 100644 --- a/src/include/access/subtrans.h +++ b/src/include/access/subtrans.h @@ -20,6 +20,9 @@ typedef struct SubTransData TransactionId topMostParent; } SubTransData; +struct SlruCtlData; +typedef struct SlruCtlData *SlruCtl; + extern void SubTransSetParent(TransactionId xid, TransactionId parent); extern TransactionId SubTransGetParent(TransactionId xid); extern TransactionId SubTransGetTopmostTransaction(TransactionId xid); @@ -31,5 +34,6 @@ extern void StartupSUBTRANS(TransactionId oldestActiveXID); extern void CheckPointSUBTRANS(void); extern void ExtendSUBTRANS(TransactionId newestXact); extern void TruncateSUBTRANS(TransactionId oldestXact); +extern SlruCtl SUBTRANS_Ctl(void); #endif /* SUBTRANS_H */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index a35ceabbb29..5938e4320eb 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -548,6 +548,8 @@ extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg); extern void RecordDistributedForgetCommitted(DistributedTransactionId gxid); extern bool IsSubTransactionAssignmentPending(void); extern void MarkSubTransactionAssigned(void); +extern FullTransactionId *GetAllXids(int *nxids); +extern int GetNumOfTxnStatesWithoutXid(void); extern int xactGetCommittedChildren(TransactionId **ptr); From 8e698c16b5967c9d48ceb7e4fbdefe9ba699f972 Mon Sep 17 00:00:00 2001 From: HuSen8891 Date: Fri, 1 Sep 2023 09:15:34 +0800 Subject: [PATCH 9/9] Fix: Only master can set transaction status --- src/backend/access/transam/clog.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index a14a1f36c80..a6466683cc5 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -168,7 +168,10 @@ TransactionIdSetTreeStatus(TransactionId xid, int nsubxids, int pageno = TransactionIdToPage(xid); /* get page of parent */ int i; - if (enable_serverless && Gp_role != GP_ROLE_DISPATCH) + /* + * Only master can set transaction status + */ + if (enable_serverless && (Gp_role != GP_ROLE_DISPATCH && GpIdentity.segindex != MASTER_CONTENT_ID)) return; Assert(status == TRANSACTION_STATUS_COMMITTED ||