diff --git a/configure b/configure index 19b49cb9975..d1ec8649731 100755 --- a/configure +++ b/configure @@ -761,6 +761,7 @@ enable_gpcloud enable_mapreduce enable_orca enable_catalog_ext +enable_serverless autodepend PKG_CONFIG_LIBDIR PKG_CONFIG_PATH @@ -894,6 +895,7 @@ enable_depend enable_cassert enable_orca enable_catalog_ext +enable_serverless enable_mapreduce enable_gpcloud enable_external_fts @@ -1604,6 +1606,7 @@ Optional Features: --enable-cassert enable assertion checks (for debugging) --disable-orca disable ORCA optimizer --enable-catalog-ext enable CloudberryDB catalog extension + --enable-serverless use CloudberryDB serverless architecture --enable-mapreduce enable CloudberryDB Mapreduce support --enable-gpcloud enable gpcloud support --enable-external-fts enable external fts support @@ -8340,6 +8343,37 @@ fi { $as_echo "$as_me:${as_lineno-$LINENO}: result: checking whether to build with CloudberryDB catalog extension... $enable_catalog_ext" >&5 $as_echo "checking whether to build with CloudberryDB catalog extension ... $enable_catalog_ext" >&6; } +# +# Enable serverless architecture +# + + +# Check whether --enable-serverless was given. +if test "${enable_serverless+set}" = set; then : + enableval=$enable_serverless; + case $enableval in + yes) + +$as_echo "#define SERVERLESS 1" >>confdefs.h + + ;; + no) + : + ;; + *) + as_fn_error $? "no argument expected for --enable-serverless option" "$LINENO" 5 + ;; + esac + +else + enable_serverless=no + +fi + + +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: checking whether to use serverless architecture of Cloudberry ... $enable_serverless" >&5 +$as_echo "checking whether to use serverless architecture of Cloudberry ... $enable_serverless" >&6; } + # # --enable-mapreduce enables GPMapreduce support diff --git a/configure.ac b/configure.ac index dbe87362bf6..142f4ab63ca 100644 --- a/configure.ac +++ b/configure.ac @@ -846,6 +846,16 @@ PGAC_ARG_BOOL(enable, catalog-ext, no, [enable Cloudberry catalog extension], AC_MSG_RESULT([checking whether to build with catalog extension... $enable_catalog_ext]) AC_SUBST(enable_catalog_ext) +# +# +# --enable-serverless uses serverless architecture of Cloudberry +# +PGAC_ARG_BOOL(enable, serverless, no, [use serverless architecture of Cloudberry], + [AC_DEFINE([SERVERLESS], 1, + [Define to 1 to use serverless architecture of Cloudberry. (--enable-serverless)])]) +AC_MSG_RESULT([checking whether to use serverless architecture of Cloudberry... $enable_serverless]) +AC_SUBST(enable_serverless) + # # --enable-mapreduce enables GPMapreduce support # diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index 187696386eb..38ba381d87d 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -1599,6 +1599,14 @@ initConnHashTable(ConnHashTable *ht, MemoryContext cxt) ht->cxt = cxt; ht->size = Gp_role == GP_ROLE_DISPATCH ? (getgpsegmentCount() * 2) : ic_htab_size; + + /* + * In serverless architecture, the cluster may have only one QD, skip Initialization. + * Initialization will be done later. + */ + if (enable_serverless && Gp_role == GP_ROLE_DISPATCH && ht->size == 0) + return true; + Assert(ht->size > 0); if (ht->cxt) @@ -1635,6 +1643,18 @@ connAddHash(ConnHashTable *ht, MotionConn *mConn) MemoryContext old = NULL; MotionConnUDP *conn = NULL; + /* + * Initialize connection hash table if needed. + */ + if (enable_serverless && Gp_role == GP_ROLE_DISPATCH && ht->size == 0) + { + old = MemoryContextSwitchTo(ht->cxt); + initConnHashTable(ht, ht->cxt); + MemoryContextSwitchTo(old); + + Assert(ht->size > 0); + } + conn = CONTAINER_OF(mConn, MotionConnUDP, mConn); hashcode = CONN_HASH_VALUE(&conn->conn_info) % ht->size; @@ -1805,10 +1825,13 @@ destroyConnHashTable(ConnHashTable *ht) } } - if (ht->cxt) - pfree(ht->table); - else - free(ht->table); + if (ht->size > 0) + { + if (ht->cxt) + pfree(ht->table); + else + free(ht->table); + } ht->table = NULL; ht->size = 0; diff --git a/src/Makefile.global.in b/src/Makefile.global.in index 9e03d53d547..a6de4048d36 100644 --- a/src/Makefile.global.in +++ b/src/Makefile.global.in @@ -223,6 +223,7 @@ enable_strong_random = @enable_strong_random@ enable_largefile = @enable_largefile@ enable_orca = @enable_orca@ enable_catalog_ext = @enable_catalog_ext@ +enable_serverless = @enable_serverless@ enable_gpfdist = @enable_gpfdist@ enable_pxf = @enable_pxf@ enable_debug_extensions = @enable_debug_extensions@ diff --git a/src/backend/Makefile b/src/backend/Makefile index 4190d0d683c..ed0495d708d 100644 --- a/src/backend/Makefile +++ b/src/backend/Makefile @@ -32,6 +32,9 @@ ifeq ($(enable_catalog_ext),yes) SUBDIRS += catalog-extension LDFLAGS += -lprotobuf -lstdc++ endif +ifeq ($(enable_serverless),yes) +LDFLAGS += -lprotobuf -lstdc++ -ljansson +endif include $(srcdir)/common.mk diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index 86e61aee5a2..a6466683cc5 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -38,6 +38,7 @@ #include "access/xlog.h" #include "access/xloginsert.h" #include "access/xlogutils.h" +#include "cdb/cdbvars.h" #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" @@ -167,6 +168,12 @@ TransactionIdSetTreeStatus(TransactionId xid, int nsubxids, int pageno = TransactionIdToPage(xid); /* get page of parent */ int i; + /* + * Only master can set transaction status + */ + if (enable_serverless && (Gp_role != GP_ROLE_DISPATCH && GpIdentity.segindex != MASTER_CONTENT_ID)) + return; + Assert(status == TRANSACTION_STATUS_COMMITTED || status == TRANSACTION_STATUS_ABORTED); diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c index a1950fd0944..a5ce17e7a30 100644 --- a/src/backend/access/transam/slru.c +++ b/src/backend/access/transam/slru.c @@ -133,16 +133,16 @@ typedef enum static SlruErrorCause slru_errcause; static int slru_errno; +/* + * Hooks for plugins to get control in SlruPhysicalReadPage/SlruPhysicalWritePage/SimpleLruReadPage + */ +SlruPhysicalReadPage_hook_type SlruPhysicalReadPage_hook = NULL; +SlruPhysicalWritePage_hook_type SlruPhysicalWritePage_hook = NULL; +SimpleLruReadPage_hook_type SimpleLruReadPage_hook = NULL; -static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno); -static void SimpleLruWaitIO(SlruCtl ctl, int slotno); static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata); -static bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno); static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruWriteAll fdata); -static void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid); -static int SlruSelectLRUPage(SlruCtl ctl, int pageno); - static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int segpage, void *data); static void SlruInternalDeleteSegment(SlruCtl ctl, int segno); @@ -319,7 +319,7 @@ SimpleLruZeroPage(SlruCtl ctl, int pageno) * * This assumes that InvalidXLogRecPtr is bitwise-all-0. */ -static void +void SimpleLruZeroLSNs(SlruCtl ctl, int slotno) { SlruShared shared = ctl->shared; @@ -336,7 +336,7 @@ SimpleLruZeroLSNs(SlruCtl ctl, int slotno) * * Control lock must be held at entry, and will be held at exit. */ -static void +void SimpleLruWaitIO(SlruCtl ctl, int slotno) { SlruShared shared = ctl->shared; @@ -396,6 +396,9 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, { SlruShared shared = ctl->shared; + if (SimpleLruReadPage_hook) + return (*SimpleLruReadPage_hook) (ctl, pageno, write_ok, xid); + /* Outer loop handles restart if we must wait for someone else's I/O */ for (;;) { @@ -496,6 +499,12 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid) SlruShared shared = ctl->shared; int slotno; + if (SimpleLruReadPage_hook) + { + LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + return (*SimpleLruReadPage_hook) (ctl, pageno, true, xid); + } + /* Try to find the page while holding only shared lock */ LWLockAcquire(shared->ControlLock, LW_SHARED); @@ -679,7 +688,7 @@ SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno) * For now, assume it's not worth keeping a file pointer open across * read/write operations. We could cache one virtual file pointer ... */ -static bool +bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno) { SlruShared shared = ctl->shared; @@ -688,6 +697,13 @@ SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno) off_t offset = rpageno * BLCKSZ; char path[MAXPGPATH]; int fd; + bool result; + + if (SlruPhysicalReadPage_hook && + SlruPhysicalReadPage_hook(ctl, pageno, slotno, &result)) + { + return result; + } SlruFileName(ctl, path, segno); @@ -760,6 +776,7 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruWriteAll fdata) off_t offset = rpageno * BLCKSZ; char path[MAXPGPATH]; int fd = -1; + bool result; /* update the stats counter of written pages */ pgstat_count_slru_page_written(shared->slru_stats_idx); @@ -806,6 +823,12 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruWriteAll fdata) } } + if (SlruPhysicalWritePage_hook && + SlruPhysicalWritePage_hook(ctl, pageno, slotno, &result)) + { + return result; + } + /* * During a WriteAll, we may already have the desired file open. */ @@ -926,7 +949,7 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruWriteAll fdata) * Issue the error message after failure of SlruPhysicalReadPage or * SlruPhysicalWritePage. Call this after cleaning up shared-memory state. */ -static void +void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid) { int segno = pageno / SLRU_PAGES_PER_SEGMENT; @@ -1011,7 +1034,7 @@ SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid) * * Control lock must be held at entry, and will be held at exit. */ -static int +int SlruSelectLRUPage(SlruCtl ctl, int pageno) { SlruShared shared = ctl->shared; diff --git a/src/backend/access/transam/subtrans.c b/src/backend/access/transam/subtrans.c index 5339eeaa1c0..88e8bc5f8ef 100644 --- a/src/backend/access/transam/subtrans.c +++ b/src/backend/access/transam/subtrans.c @@ -379,3 +379,12 @@ SubTransPagePrecedes(int page1, int page2) return (TransactionIdPrecedes(xid1, xid2) && TransactionIdPrecedes(xid1, xid2 + SUBTRANS_XACTS_PER_PAGE - 1)); } + +/* + * Get SUBTRANS control data + */ +SlruCtl +SUBTRANS_Ctl(void) +{ + return SubTransCtl; +} \ No newline at end of file diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 90788509f8a..adc042c87db 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -43,6 +43,11 @@ int xid_warn_limit; NewSegRelfilenode_assign_hook_type NewSegRelfilenode_assign_hook = NULL; +/* + * Hook for plugins to get control in GetNewTransactionId. + */ +GetNewTransactionId_hook_type GetNewTransactionId_hook = NULL; + /* * Allocate the next FullTransactionId for a new transaction or * subtransaction. @@ -61,6 +66,9 @@ GetNewTransactionId(bool isSubXact) FullTransactionId full_xid; TransactionId xid; + if (GetNewTransactionId_hook) + return (*GetNewTransactionId_hook) (isSubXact); + /* * Workers synchronize transaction state at the beginning of each parallel * operation, so we can't account for new XIDs after that point. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 0df851f2fd0..d1855475d76 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -298,6 +298,14 @@ static TransactionId unreportedXids[PGPROC_MAX_CACHED_SUBXIDS]; static TransactionState CurrentTransactionState = &TopTransactionStateData; +/* + * Hooks for plugins to get control in Transaction Management. + */ +TransactionParticipateEnd_hook_type TransactionParticipateEnd_hook = NULL; +NotifySubTransaction_hook_type NotifySubTransaction_hook = NULL; +XactLogCommitRecord_hook_type XactLogCommitRecord_hook = NULL; +XactLogAbortRecord_hook_type XactLogAbortRecord_hook = NULL; + /* * The subtransaction ID and command ID assignment counters are global * to a whole transaction, so we do not keep them in the state stack. @@ -902,6 +910,12 @@ GetCurrentCommandId(bool used) return currentCommandId; } +void +SetCurrentCommandId(CommandId cid) +{ + currentCommandId = cid; +} + /* * SetParallelStartTimestamps * @@ -2945,6 +2959,9 @@ CommitTransaction(void) if (notifyCommittedDtxTransactionIsNeeded()) notifyCommittedDtxTransaction(); + if (TransactionParticipateEnd_hook) + TransactionParticipateEnd_hook(true); + /* * Let others know about no transaction in progress by me. Note that this * must be done _before_ releasing locks we hold and _after_ @@ -3594,6 +3611,9 @@ AbortTransaction(void) */ rollbackDtxTransaction(); + if (TransactionParticipateEnd_hook) + TransactionParticipateEnd_hook(false); + /* * Let others know about no transaction in progress by me. Note that this * must be done _before_ releasing locks we hold and _after_ @@ -5183,6 +5203,9 @@ DefineSavepoint(const char *name) { TransactionState s = CurrentTransactionState; + if (NotifySubTransaction_hook) + NotifySubTransaction_hook(TXN_PROTOCOL_COMMAND_SUB_BEGIN); + /* * Workers synchronize transaction state at the beginning of each parallel * operation, so we can't account for new subtransactions after that @@ -5543,11 +5566,16 @@ BeginInternalSubTransaction(const char *name) if (Gp_role == GP_ROLE_DISPATCH) { - if (!doDispatchSubtransactionInternalCmd( - DTX_PROTOCOL_COMMAND_SUBTRANSACTION_BEGIN_INTERNAL)) + if (NotifySubTransaction_hook) + NotifySubTransaction_hook(TXN_PROTOCOL_COMMAND_SUB_BEGIN); + else { - elog(ERROR, - "Could not BeginInternalSubTransaction dispatch failed"); + if (!doDispatchSubtransactionInternalCmd( + DTX_PROTOCOL_COMMAND_SUBTRANSACTION_BEGIN_INTERNAL)) + { + elog(ERROR, + "Could not BeginInternalSubTransaction dispatch failed"); + } } } @@ -5723,7 +5751,7 @@ RollbackAndReleaseCurrentSubTransaction(void) if (Gp_role == GP_ROLE_DISPATCH) { - if (!doDispatchSubtransactionInternalCmd( + if (!NotifySubTransaction_hook && !doDispatchSubtransactionInternalCmd( DTX_PROTOCOL_COMMAND_SUBTRANSACTION_ROLLBACK_INTERNAL)) { ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), @@ -6044,6 +6072,9 @@ CommitSubTransaction(void) /* Must CCI to ensure commands of subtransaction are seen as done */ CommandCounterIncrement(); + if (NotifySubTransaction_hook) + NotifySubTransaction_hook(TXN_PROTOCOL_COMMAND_SUB_RELEASE); + /* * Prior to 8.4 we marked subcommit in clog at this point. We now only * perform that step, if required, as part of the atomic update of the @@ -6215,6 +6246,9 @@ AbortSubTransaction(void) s->parallelModeLevel = 0; } + if (NotifySubTransaction_hook) + NotifySubTransaction_hook(TXN_PROTOCOL_COMMAND_SUB_ROLLBACK); + /* * We can skip all this stuff if the subxact failed before creating a * ResourceOwner... @@ -6834,6 +6868,12 @@ XactLogCommitRecord(TimestampTz commit_time, Assert(CritSectionCount > 0); + if (XactLogCommitRecord_hook) + return (* XactLogCommitRecord_hook) (commit_time, tablespace_oid_to_delete_on_commit, + nsubxacts, subxacts, nrels, rels, nmsgs, msgs, + ndeldbs, deldbs, relcacheInval, xactflags, + twophase_xid, twophase_gid); + xl_xinfo.xinfo = 0; /* decide between a plain and 2pc commit */ @@ -7024,6 +7064,11 @@ XactLogAbortRecord(TimestampTz abort_time, Assert(CritSectionCount > 0); + if (XactLogAbortRecord_hook) + (*XactLogAbortRecord_hook) (abort_time, tablespace_oid_to_delete_on_abort, + nsubxacts, subxacts, nrels, rels, ndeldbs, deldbs, + xactflags, twophase_xid, twophase_gid); + xl_xinfo.xinfo = 0; /* decide between a plain and 2pc abort */ @@ -7561,3 +7606,70 @@ MarkSubTransactionAssigned(void) CurrentTransactionState->assigned = true; } + +/* + * Get all xids of top level transaction and subtransactons + */ +FullTransactionId * +GetAllXids(int *nxids) +{ + FullTransactionId *xids = NULL; + int len = PGPROC_MAX_CACHED_SUBXIDS; + + *nxids = 0; + + if (FullTransactionIdIsValid(CurrentTransactionState->fullTransactionId)) + { + TransactionState xact = CurrentTransactionState; + + if (xids == NULL) + xids = (FullTransactionId *)palloc(sizeof(FullTransactionId) * len); + xids[(*nxids)++] = xact->fullTransactionId; + + while (xact->parent) + { + xact = xact->parent; + xids[(*nxids)++] = xact->fullTransactionId; + + if ((*nxids) >= len) + { + len *= 2; + xids = (FullTransactionId *)repalloc(xids, sizeof(FullTransactionId) * len); + } + } + } + + return xids; +} + +/* + * Get number of transaction and subtransactions which have no xid. + */ +int +GetNumOfTxnStatesWithoutXid(void) +{ + int nlevels = 0; + + if (!FullTransactionIdIsValid(CurrentTransactionState->fullTransactionId)) + { + TransactionState xact = CurrentTransactionState; + + nlevels++; + + while (xact->parent) + { + xact = xact->parent; + + if (!FullTransactionIdIsValid(xact->fullTransactionId)) + { + nlevels++; + } + else + { + break; + } + } + } + + return nlevels; +} \ No newline at end of file diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 4d331ed7a7c..de72cd7de48 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -101,6 +101,16 @@ extern uint32 bootstrap_data_checksum_version; extern int bootstrap_file_encryption_method; +/* + * Hook for plugins to get control in StartupXLOG. + */ +StartupXLOG_hook_type StartupXLOG_hook = NULL; + +/* + * Hook for plugins to get control in XLogFlush. + */ +XLogFlush_hook_type XLogFlush_hook = NULL; + /* Unsupported old recovery command file names (relative to $PGDATA) */ #define RECOVERY_COMMAND_FILE "recovery.conf" #define RECOVERY_COMMAND_DONE "recovery.done" @@ -2938,6 +2948,9 @@ XLogFlush(XLogRecPtr record) XLogRecPtr WriteRqstPtr; XLogwrtRqst WriteRqst; + if (XLogFlush_hook) + return (*XLogFlush_hook) (record); + /* * During REDO, we are reading not writing WAL. Therefore, instead of * trying to flush the WAL, we should update minRecoveryPoint instead. We @@ -6755,6 +6768,9 @@ StartupXLOG(void) bool promoted = false; struct stat st; + if (StartupXLOG_hook) + return (*StartupXLOG_hook) (); + /* * We should have an aux process resource owner to use, and we should not * be in a transaction that's installed some other resowner. @@ -13957,3 +13973,21 @@ XLogRequestWalReceiverReply(void) { doRequestWalReceiverReply = true; } + +/* + * Return pointer to pg_control in shared memory; + */ +ControlFileData * +GetControlFile(void) +{ + return ControlFile; +} + +/* + * Return pointer to XLogCtlData in shared memory; + */ +XLogCtlData * +GetXLogCtl(void) +{ + return XLogCtl; +} diff --git a/src/backend/cdb/cdbcat.c b/src/backend/cdb/cdbcat.c index dc5a93bb556..d3c81c673bc 100644 --- a/src/backend/cdb/cdbcat.c +++ b/src/backend/cdb/cdbcat.c @@ -97,7 +97,7 @@ makeGpPolicy(GpPolicyType ptype, int nattrs, int numsegments) policy->numsegments = numsegments; policy->nattrs = nattrs; - Assert(numsegments > 0 || + Assert(numsegments >= 0 || (ptype == POLICYTYPE_ENTRY && numsegments == -1)); return policy; @@ -458,8 +458,16 @@ GpPolicyFetch(Oid tbloid) } /* Create a GpPolicy object. */ - policy = makeGpPolicy(POLICYTYPE_PARTITIONED, - nattrs, policyform->numsegments); + if (policyform->numsegments == 0) + { + policy = makeGpPolicy(POLICYTYPE_PARTITIONED, + nattrs, getgpsegmentCount()); + } + else + { + policy = makeGpPolicy(POLICYTYPE_PARTITIONED, + nattrs, policyform->numsegments); + } for (i = 0; i < nattrs; i++) { diff --git a/src/backend/cdb/cdbdtxcontextinfo.c b/src/backend/cdb/cdbdtxcontextinfo.c index 1a3c1b8f295..9227d844b74 100644 --- a/src/backend/cdb/cdbdtxcontextinfo.c +++ b/src/backend/cdb/cdbdtxcontextinfo.c @@ -23,6 +23,7 @@ #include "access/xact.h" #include "utils/guc.h" #include "utils/session_state.h" +#include "storage/proc.h" /* * process local cache used to identify "dispatch units" @@ -46,6 +47,8 @@ DtxContextInfo_CreateOnMaster(DtxContextInfo *dtxContextInfo, bool inCursor, DtxContextInfo_Reset(dtxContextInfo); dtxContextInfo->distributedXid = getDistributedTransactionId(); + if (enable_serverless) + dtxContextInfo->distributedXid = MyProc->lxid; if (dtxContextInfo->distributedXid != InvalidDistributedTransactionId) dtxContextInfo->curcid = curcid; diff --git a/src/backend/cdb/cdbfts.c b/src/backend/cdb/cdbfts.c index 754d3054cbb..8155663e984 100644 --- a/src/backend/cdb/cdbfts.c +++ b/src/backend/cdb/cdbfts.c @@ -87,6 +87,9 @@ FtsNotifyProber(void) if (am_ftsprobe) return; + if (enable_serverless) + return; + SpinLockAcquire(&ftsProbeInfo->lock); initial_started = ftsProbeInfo->start_count; SpinLockRelease(&ftsProbeInfo->lock); @@ -177,6 +180,9 @@ getFtsVersion(void) void FtsNotifyProber(void) { + if (enable_serverless) + return; + Assert(Gp_role == GP_ROLE_DISPATCH); SendPostmasterSignal(PMSIGNAL_WAKEN_FTS); SIMPLE_FAULT_INJECTOR("ftsNotify_before"); diff --git a/src/backend/cdb/cdbtm.c b/src/backend/cdb/cdbtm.c index acd6168cc9a..6b70abd9eaf 100644 --- a/src/backend/cdb/cdbtm.c +++ b/src/backend/cdb/cdbtm.c @@ -1114,7 +1114,7 @@ tmShmemInit(void) /* Initialize locks and shared memory area */ { *shmNextSnapshotId = 0; - *shmDtmStarted = false; + *shmDtmStarted = enable_serverless; *shmCleanupBackends = false; *shmDtxRecoveryPid = 0; *shmDtxRecoveryEvents = DTX_RECOVERY_EVENT_ABORT_PREPARED; @@ -1385,6 +1385,9 @@ dispatchDtxCommand(const char *cmd) elog(DTM_DEBUG5, "dispatchDtxCommand: '%s'", cmd); + if (enable_serverless) + return true; + if (currentGxactWriterGangLost()) { ereport(WARNING, @@ -1616,7 +1619,8 @@ isDtxQueryDispatcher(void) return (Gp_role == GP_ROLE_DISPATCH && isDtmStarted && - isSharedLocalSnapshotSlotPresent); + isSharedLocalSnapshotSlotPresent && + !enable_serverless); } /* diff --git a/src/backend/cdb/cdbutil.c b/src/backend/cdb/cdbutil.c index 0d6f7147d30..f8b7232d1ed 100644 --- a/src/backend/cdb/cdbutil.c +++ b/src/backend/cdb/cdbutil.c @@ -442,7 +442,7 @@ getCdbComponentInfo(void) * Validate that there exists at least one entry and one segment database * in the configuration */ - if (component_databases->total_segment_dbs == 0) + if (component_databases->total_segment_dbs == 0 && !enable_serverless) { ereport(ERROR, (errcode(ERRCODE_CARDINALITY_VIOLATION), @@ -2809,7 +2809,7 @@ getCdbComponentInfo(void) * Validate that there exists at least one entry and one segment database * in the configuration */ - if (component_databases->total_segment_dbs == 0) + if (component_databases->total_segment_dbs == 0 && !enable_serverless) { ereport(ERROR, (errcode(ERRCODE_CARDINALITY_VIOLATION), diff --git a/src/backend/cdb/dispatcher/Makefile b/src/backend/cdb/dispatcher/Makefile index e8ac7582898..8aaf74b1970 100644 --- a/src/backend/cdb/dispatcher/Makefile +++ b/src/backend/cdb/dispatcher/Makefile @@ -11,5 +11,5 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS += -I$(libpq_srcdir) -I$(top_srcdir)/src/port -I$(top_srcdir)/src/backend/utils/misc -OBJS = cdbconn.o cdbdisp.o cdbdisp_async.o cdbdispatchresult.o cdbdisp_dtx.o cdbdisp_query.o cdbgang.o cdbgang_async.o cdbpq.o +OBJS = cdbconn.o cdbdisp.o cdbdisp_async.o cdbdispatchresult.o cdbdisp_dtx.o cdbdisp_query.o cdbgang.o cdbgang_async.o cdbpq.o cdbdisp_extra.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/cdb/dispatcher/cdbdisp_extra.c b/src/backend/cdb/dispatcher/cdbdisp_extra.c new file mode 100644 index 00000000000..8a032222a29 --- /dev/null +++ b/src/backend/cdb/dispatcher/cdbdisp_extra.c @@ -0,0 +1,170 @@ +#include "postgres.h" + +#include "cdb/cdbdisp_extra.h" +#include "libpq/pqformat.h" +#include "utils/hsearch.h" + + +static HTAB *ExtraDispTable = NULL; + +typedef struct ExtraDispEntry +{ + char extraDispName[EXTRADISPNAME_MAX_LEN]; + PackFunc packFunc; + UnpackFunc unpackFunc; +} ExtraDispEntry; + +void +RegisterExtraDispatch(const char *extraDispName, PackFunc packFunc, UnpackFunc unpackFunc) +{ + ExtraDispEntry *entry; + bool found; + + if (ExtraDispTable == NULL) + { + HASHCTL ctl; + + ctl.keysize = EXTRADISPNAME_MAX_LEN; + ctl.entrysize = sizeof(ExtraDispEntry); + + ExtraDispTable = hash_create("extra dispatch info", 8, &ctl, + HASH_ELEM | HASH_STRINGS); + } + + if (strlen(extraDispName) >= EXTRADISPNAME_MAX_LEN) + elog(ERROR, "extra dispatch name is too long"); + + entry = (ExtraDispEntry *) hash_search(ExtraDispTable, + extraDispName, + HASH_ENTER, &found); + if (found) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("extra dispatch name \"%s\" already exists", + extraDispName))); + + entry->packFunc = packFunc; + entry->unpackFunc = unpackFunc; +} + +/* Return packaged messages. each message has the same format: + * ("%d%s\0%s", totalLen, name, payload). totalLen is the message + * length not including itself. name is the name of this message, + * a following '\0' marks the end. payload is the main body of the + * message. + */ +char * +PackExtraMsgs(int *len) +{ + HASH_SEQ_STATUS status; + ExtraDispEntry *hentry; + char **payloads; + int *lengths; + int payloadLen; + char **names; + int nameLen; + char *total; + int totalLen; + char *pos; + int tmp; + int n; + int i; + + if (!ExtraDispTable) + { + *len = 0; + return NULL; + } + + n = hash_get_num_entries(ExtraDispTable); + payloads = (char **) palloc(n * sizeof(char *)); + lengths = (int *) palloc(n * sizeof(int)); + names = (char **) palloc(n * sizeof(char *)); + + i = 0; + totalLen = 0; + hash_seq_init(&status, ExtraDispTable); + while ((hentry = (ExtraDispEntry *) hash_seq_search(&status)) != NULL) + { + payloads[i] = (*(hentry->packFunc))(lengths + i); + names[i] = hentry->extraDispName; + totalLen += sizeof(int) + strlen(names[i]) + 1 + *(lengths + i); + i++; + } + Assert(i = n); + + total = palloc(totalLen); + pos = total; + + for(i=0; i < n; i++) + { + payloadLen = *(lengths + i); + nameLen = strlen(names[i]); + + /* lenth */ + tmp = htonl(payloadLen + nameLen + 1); + memcpy(pos, &tmp, sizeof(tmp)); + pos += sizeof(tmp); + + /* name */ + memcpy(pos, names[i], nameLen + 1); + pos += nameLen + 1; + + /* payload */ + memcpy(pos, payloads[i], payloadLen); + pos += payloadLen; + + pfree(payloads[i]); + } + + Assert(pos - total == totalLen); + + pfree(names); + pfree(payloads); + pfree(lengths); + + *len = totalLen; + return total; +} + +void +UnPackExtraMsgs(StringInfo inputMsgs) +{ + ExtraDispEntry *entry; + const char *name; + const char *payload; + int payloadLen; + int totalLen; + bool found; + int n; + int i; + + if (!ExtraDispTable) + return; + + n = hash_get_num_entries(ExtraDispTable); + i = n; + + while (inputMsgs->cursor < inputMsgs->len) + { + totalLen = pq_getmsgint(inputMsgs, 4); + name = pq_getmsgstring(inputMsgs); + payloadLen = totalLen - strlen(name) - 1; + payload = pq_getmsgbytes(inputMsgs, payloadLen); + + entry = (ExtraDispEntry *) hash_search(ExtraDispTable, + name, + HASH_FIND, &found); + if (!found) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("extra dispatch %s not found", name))); + + (*(entry->unpackFunc))(payload, payloadLen); + i--; + } + if (i != 0) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("extra dispatch count mismatch, registered %d, get %d", n, i))); +} diff --git a/src/backend/cdb/dispatcher/cdbdisp_query.c b/src/backend/cdb/dispatcher/cdbdisp_query.c index 9bbfd10dd2d..4aa6129ce90 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_query.c +++ b/src/backend/cdb/dispatcher/cdbdisp_query.c @@ -42,6 +42,7 @@ #include "mb/pg_wchar.h" #include "cdb/cdbdisp.h" +#include "cdb/cdbdisp_extra.h" #include "cdb/cdbdisp_query.h" #include "cdb/cdbdisp_dtx.h" /* for qdSerializeDtxContextInfo() */ #include "cdb/cdbdispatchresult.h" @@ -98,6 +99,12 @@ typedef struct DispatchCommandQueryParms int serializedDtxContextInfolen; } DispatchCommandQueryParms; +/* + * Hooks for plugins to get control in command dispatch + */ +CdbNeedDispatchCommand_hook_type CdbNeedDispatchCommand_hook = NULL; +CdbNeedDispatchUtility_hook_type CdbNeedDispatchUtility_hook = NULL; + static int fillSliceVector(SliceTable *sliceTable, int sliceIndex, SliceVec *sliceVector, @@ -391,7 +398,12 @@ CdbDispatchCommandToSegments(const char *strCommand, CdbPgResults *cdb_pgresults) { DispatchCommandQueryParms *pQueryParms; - bool needTwoPhase = flags & DF_NEED_TWO_PHASE; + bool needTwoPhase; + + if (CdbNeedDispatchCommand_hook && !CdbNeedDispatchCommand_hook(strCommand, &flags, segments, cdb_pgresults)) + return; + + needTwoPhase = flags & DF_NEED_TWO_PHASE; if (needTwoPhase) setupDtxTransaction(); @@ -429,9 +441,14 @@ CdbDispatchUtilityStatement(struct Node *stmt, CdbPgResults *cdb_pgresults) { DispatchCommandQueryParms *pQueryParms; - bool needTwoPhase = flags & DF_NEED_TWO_PHASE; + bool needTwoPhase; Assert(Gp_role == GP_ROLE_DISPATCH && ENABLE_DISPATCH()); + + if (CdbNeedDispatchUtility_hook && !CdbNeedDispatchUtility_hook(stmt, &flags)) + return; + + needTwoPhase = flags & DF_NEED_TWO_PHASE; if (needTwoPhase) setupDtxTransaction(); @@ -871,6 +888,8 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms, int total_query_len; char *shared_query, *pos; + char *extraMsgs; + int extraLen; MemoryContext oldContext; /* @@ -917,6 +936,9 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms, sizeof(resgroupInfo.len) + resgroupInfo.len; + extraMsgs = PackExtraMsgs(&extraLen); + total_query_len += extraLen; + shared_query = palloc(total_query_len); pos = shared_query; @@ -1010,6 +1032,13 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms, pos += resgroupInfo.len; } + if (extraLen > 0) + { + memcpy(pos, extraMsgs, extraLen); + pos += extraLen; + pfree(extraMsgs); + } + len = pos - shared_query - 1; /* @@ -1306,8 +1335,12 @@ CdbDispatchCopyStart(struct CdbCopy *cdbCopy, Node *stmt, int flags) CdbDispatcherState *ds; Gang *primaryGang; ErrorData *error = NULL; - bool needTwoPhase = flags & DF_NEED_TWO_PHASE; + bool needTwoPhase; + + if (CdbNeedDispatchUtility_hook && !CdbNeedDispatchUtility_hook(stmt, &flags)) + return; + needTwoPhase = flags & DF_NEED_TWO_PHASE; if (needTwoPhase) setupDtxTransaction(); diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index b21ef9c9f0f..4908b707706 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -120,6 +120,9 @@ ExecutorEnd_hook_type ExecutorEnd_hook = NULL; /* Hook for plugin to get control in ExecCheckRTPerms() */ ExecutorCheckPerms_hook_type ExecutorCheckPerms_hook = NULL; +/* Hook for plugins to get control in DtxTransaction Management */ +SetDtxFlag_hook_type SetDtxFlag_hook = NULL; + /* decls for local routines only used within this module */ static void InitPlan(QueryDesc *queryDesc, int eflags); static void CheckValidRowMarkRel(Relation rel, RowMarkType markType); @@ -562,6 +565,8 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) * work for this query. */ needDtx = ExecutorSaysTransactionDoesWrites(); + if (SetDtxFlag_hook) + needDtx = SetDtxFlag_hook(needDtx); if (needDtx) setupDtxTransaction(); diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index 36879297d9b..6da2a2b524f 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -207,6 +207,11 @@ static int default_multixact_freeze_table_age; /* Memory context for long-lived data */ static MemoryContext AutovacMemCxt; +/* + * Hook for plugins to get control in AutoVacLauncher. + */ +AutoVacLauncherMain_hook_type AutoVacLauncherMain_hook = NULL; + /* struct to keep track of databases in launcher */ typedef struct avl_dbase { @@ -489,6 +494,9 @@ AutoVacLauncherMain(int argc, char *argv[]) { sigjmp_buf local_sigjmp_buf; + if (AutoVacLauncherMain_hook) + (*AutoVacLauncherMain_hook) (argc, argv); + am_autovacuum_launcher = true; MyBackendType = B_AUTOVAC_LAUNCHER; diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 3d9a199c8cb..7cb7a53413b 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -282,6 +282,9 @@ bool remove_temp_files_after_crash = true; /* Hook for plugins to start background workers */ start_bgworkers_hook_type start_bgworkers_hook = NULL; +/* Hook for plugins to get control in StartChildProcess */ +StartChildProcess_hook_type StartChildProcess_hook = NULL; + /* * PIDs of special child processes; 0 when not running. When adding a new PID * to the list, remember to add the process title to GetServerProcessTitle() @@ -5990,6 +5993,15 @@ CountChildren(int target) */ static pid_t StartChildProcess(AuxProcType type) +{ + if (StartChildProcess_hook) + return (*StartChildProcess_hook) (type); + + return StartChildProcessInternal(type); +} + +pid_t +StartChildProcessInternal(AuxProcType type) { pid_t pid; char *av[10]; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index e796faddf31..ff821e126a4 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -43,6 +43,7 @@ #include "catalog/catalog.h" #include "catalog/storage.h" #include "catalog/storage_xlog.h" +#include "cdb/cdbvars.h" #include "crypto/bufenc.h" #include "executor/instrument.h" #include "lib/binaryheap.h" @@ -1255,6 +1256,19 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, *foundPtr = true; + /* + * The page in buffer may be out of date, we need to check the buffer + * and refresh the buffer if the page has been modified. + */ + if (enable_serverless && Gp_role == GP_ROLE_EXECUTE && valid) + { + uint32 buf_state = LockBufHdr(buf); + buf_state &= ~(BM_VALID | BM_DIRTY); + UnlockBufHdr(buf, buf_state); + + valid = false; + } + if (!valid) { /* diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 93e6152ae84..06e70b5f675 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -83,6 +83,11 @@ CountDBSession_hook_type CountDBSession_hook = NULL; +/* + * Hook for plugins to get control in GetSnapshotData + */ +GetSnapshotData_hook_type GetSnapshotData_hook = NULL; + /* Our shared memory area */ typedef struct ProcArrayStruct { @@ -297,10 +302,10 @@ static TransactionId standbySnapshotPendingXmin; * GlobalVisState for details. As shared, catalog, normal and temporary * relations can have different horizons, one such state exists for each. */ -static GlobalVisState GlobalVisSharedRels; -static GlobalVisState GlobalVisCatalogRels; -static GlobalVisState GlobalVisDataRels; -static GlobalVisState GlobalVisTempRels; +GlobalVisState GlobalVisSharedRels; +GlobalVisState GlobalVisCatalogRels; +GlobalVisState GlobalVisDataRels; +GlobalVisState GlobalVisTempRels; /* * This backend's RecentXmin at the last time the accurate xmin horizon was @@ -2942,6 +2947,9 @@ GetSnapshotData(Snapshot snapshot, DtxContext distributedTransactionContext) errmsg("out of memory"))); } + if (GetSnapshotData_hook) + return (*GetSnapshotData_hook) (snapshot, distributedTransactionContext); + /* * GP: Distributed snapshot. */ diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index c545c42298c..3c62cc77660 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -97,6 +97,7 @@ #include "cdb/cdbsrlz.h" #include "cdb/cdbtm.h" #include "cdb/cdbdtxcontextinfo.h" +#include "cdb/cdbdisp_extra.h" #include "cdb/cdbdisp_query.h" #include "cdb/cdbdispatchresult.h" #include "cdb/cdbendpoint.h" @@ -148,6 +149,11 @@ int client_connection_check_interval = 0; */ cancel_pending_hook_type cancel_pending_hook = NULL; +/* + * Hook for plugins to handle the txn message + */ +HandleTxnCommand_hook_type HandleTxnCommand_hook = NULL; + /* ---------------- * private typedefs etc * ---------------- @@ -258,7 +264,6 @@ static int errdetail_params(ParamListInfo params); static int errdetail_abort(void); static int errdetail_recovery_conflict(void); static void bind_param_error_callback(void *arg); -static void start_xact_command(void); static void finish_xact_command(void); static bool IsTransactionExitStmt(Node *parsetree); static bool IsTransactionExitStmtList(List *pstmts); @@ -509,6 +514,7 @@ SocketBackend(StringInfo inBuf) break; + case 't': case 'T': /* Cloudberry Database dispatched transaction protocol from QD */ maxmsglen = PQ_LARGE_MESSAGE_LIMIT; doing_extended_query_message = false; @@ -3461,7 +3467,7 @@ exec_describe_portal_message(const char *portal_name) /* * Convenience routines for starting/committing a single command. */ -static void +void start_xact_command(void) { if (!xact_started) @@ -5680,6 +5686,8 @@ PostgresMain(int argc, char *argv[], if (resgroupInfoLen > 0) resgroupInfoBuf = pq_getmsgbytes(&input_message, resgroupInfoLen); + UnPackExtraMsgs(&input_message); + pq_getmsgend(&input_message); elog((Debug_print_full_dtm ? LOG : DEBUG5), "MPP dispatched stmt from QD: %s.",query_string); @@ -5794,6 +5802,18 @@ PostgresMain(int argc, char *argv[], } break; + case 't': /* handle plugin's MPP dispatched txn protocol command from QD */ + { + if (HandleTxnCommand_hook) + HandleTxnCommand_hook(&input_message, &send_ready_for_query); + else + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid frontend message type %d", firstchar), + errdetail("HandleTxnCommand_hook is NULL"))); + } + break; + case 'P': /* parse */ { const char *stmt_name; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index b558ae1db4d..863ca30090c 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -87,6 +87,9 @@ /* Hook for plugins to get control in ProcessUtility() */ ProcessUtility_hook_type ProcessUtility_hook = NULL; +/* Hook for plugins to send explicit begin command */ +SendTxnExplicitBegin_hook_type SendTxnExplicitBegin_hook = NULL; + /* counter to disable dispatch */ int dispatch_nest_level = 0; @@ -668,7 +671,10 @@ standard_ProcessUtility(PlannedStmt *pstmt, /* gp_dispatch */ false); } - sendDtxExplicitBegin(); + if (SendTxnExplicitBegin_hook) + SendTxnExplicitBegin_hook(); + else + sendDtxExplicitBegin(); } break; @@ -730,7 +736,10 @@ standard_ProcessUtility(PlannedStmt *pstmt, * that the BEGIN has been dispatched * before we start dispatching our savepoint. */ - sendDtxExplicitBegin(); + if (SendTxnExplicitBegin_hook) + SendTxnExplicitBegin_hook(); + else + sendDtxExplicitBegin(); DefineDispatchSavepoint(stmt->savepoint_name); break; diff --git a/src/backend/utils/adt/json.c b/src/backend/utils/adt/json.c index 30ca2cf6c81..c29c8b7a1c7 100644 --- a/src/backend/utils/adt/json.c +++ b/src/backend/utils/adt/json.c @@ -1102,7 +1102,7 @@ json_build_array_noargs(PG_FUNCTION_ARGS) * for a json object. */ Datum -json_object(PG_FUNCTION_ARGS) +pg_json_object(PG_FUNCTION_ARGS) { ArrayType *in_array = PG_GETARG_ARRAYTYPE_P(0); int ndims = ARR_NDIM(in_array); diff --git a/src/backend/utils/cache/relmapper.c b/src/backend/utils/cache/relmapper.c index 338505d63e2..f8afdba4b5f 100644 --- a/src/backend/utils/cache/relmapper.c +++ b/src/backend/utils/cache/relmapper.c @@ -138,6 +138,10 @@ static RelMapFile active_local_updates; static RelMapFile pending_shared_updates; static RelMapFile pending_local_updates; +/* + * Hook for plugins to get control in load_relmap_file + */ +LoadRelMap_hook_type LoadRelMap_hook = NULL; /* non-export function prototypes */ static void apply_map_update(RelMapFile *map, Oid relationId, RelFileNodeId fileNode, @@ -724,6 +728,9 @@ load_relmap_file(bool shared, bool lock_held) map = &local_map; } + if (LoadRelMap_hook) + return (*LoadRelMap_hook) (shared, lock_held, map); + /* Read data ... */ fd = OpenTransientFile(mapfilename, O_RDONLY | PG_BINARY); if (fd < 0) diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 5fafe83ca4c..f98293ff1d9 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -149,6 +149,11 @@ double hash_mem_multiplier = 1.0; int maintenance_work_mem = 65536; int max_parallel_maintenance_workers = 2; +/* + * use CloudberryDB serverless architecture + */ +bool enable_serverless = false; + /* * Primary determinants of sizes of shared-memory structures. * diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index e482e303940..cf78cd717e8 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -1143,7 +1143,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, */ fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace); - if (!bootstrap) + if (!bootstrap && !enable_serverless) { if (access(fullpath, F_OK) == -1) { diff --git a/src/include/access/slru.h b/src/include/access/slru.h index 88653ae7e66..542df07f586 100644 --- a/src/include/access/slru.h +++ b/src/include/access/slru.h @@ -138,17 +138,31 @@ typedef struct SlruCtlData typedef SlruCtlData *SlruCtl; +/* + * Hooks for plugins to get control in SlruPhysicalReadPage/SlruPhysicalWritePage + */ +typedef bool (*SlruPhysicalReadPage_hook_type)(SlruCtl ctl, int pageno, int slotno, bool *result); +extern PGDLLIMPORT SlruPhysicalReadPage_hook_type SlruPhysicalReadPage_hook; + +typedef bool (*SlruPhysicalWritePage_hook_type)(SlruCtl ctl, int pageno, int slotno, bool *result); +extern PGDLLIMPORT SlruPhysicalWritePage_hook_type SlruPhysicalWritePage_hook; + +typedef int (*SimpleLruReadPage_hook_type)(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid); +extern PGDLLIMPORT SimpleLruReadPage_hook_type SimpleLruReadPage_hook; extern Size SimpleLruShmemSize(int nslots, int nlsns); extern void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler); extern int SimpleLruZeroPage(SlruCtl ctl, int pageno); +extern void SimpleLruZeroLSNs(SlruCtl ctl, int slotno); +extern void SimpleLruWaitIO(SlruCtl ctl, int slotno); extern int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid); extern int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid); extern void SimpleLruWritePage(SlruCtl ctl, int slotno); +extern int SlruSelectLRUPage(SlruCtl ctl, int pageno); extern void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied); #ifdef USE_ASSERT_CHECKING extern void SlruPagePrecedesUnitTests(SlruCtl ctl, int per_page); @@ -158,6 +172,8 @@ extern void SlruPagePrecedesUnitTests(SlruCtl ctl, int per_page); extern void SimpleLruTruncate(SlruCtl ctl, int cutoffPage); extern void SimpleLruTruncateWithLock(SlruCtl ctl, int cutoffPage); extern bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno); +extern bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno); +extern void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid); typedef bool (*SlruScanCallback) (SlruCtl ctl, char *filename, int segpage, void *data); diff --git a/src/include/access/subtrans.h b/src/include/access/subtrans.h index 9a54dc0fb3b..9ad1ecc31b9 100644 --- a/src/include/access/subtrans.h +++ b/src/include/access/subtrans.h @@ -20,6 +20,9 @@ typedef struct SubTransData TransactionId topMostParent; } SubTransData; +struct SlruCtlData; +typedef struct SlruCtlData *SlruCtl; + extern void SubTransSetParent(TransactionId xid, TransactionId parent); extern TransactionId SubTransGetParent(TransactionId xid); extern TransactionId SubTransGetTopmostTransaction(TransactionId xid); @@ -31,5 +34,6 @@ extern void StartupSUBTRANS(TransactionId oldestActiveXID); extern void CheckPointSUBTRANS(void); extern void ExtendSUBTRANS(TransactionId newestXact); extern void TruncateSUBTRANS(TransactionId oldestXact); +extern SlruCtl SUBTRANS_Ctl(void); #endif /* SUBTRANS_H */ diff --git a/src/include/access/transam.h b/src/include/access/transam.h index 5a61cdc9ebe..dd6d5f2903d 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -308,6 +308,10 @@ extern bool gp_pause_on_restore_point_replay; typedef RelFileNodeId (*NewSegRelfilenode_assign_hook_type)(void); extern PGDLLIMPORT NewSegRelfilenode_assign_hook_type NewSegRelfilenode_assign_hook; +/* Hook for plugins to get control in GetNewTransactionId */ +typedef FullTransactionId (*GetNewTransactionId_hook_type)(bool isSubXact); +extern PGDLLIMPORT GetNewTransactionId_hook_type GetNewTransactionId_hook; + /* * prototypes for functions in transam/transam.c */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 5a35d7de88f..5938e4320eb 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -189,6 +189,48 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XACT_XINFO_HAS_DISTRIB (1U << 8) #define XACT_XINFO_HAS_DELDBS (1U << 9) +typedef enum +{ + TXN_PROTOCOL_COMMAND_BEGIN = 0, + TXN_PROTOCOL_COMMAND_ABORT, + TXN_PROTOCOL_COMMAND_COMMIT, + TXN_PROTOCOL_COMMAND_POST_COMMIT, + TXN_PROTOCOL_COMMAND_SUB_BEGIN, + TXN_PROTOCOL_COMMAND_SUB_RELEASE, + TXN_PROTOCOL_COMMAND_SUB_ROLLBACK, +} TxnProtocolCommand; + +/* + * Hooks for plugins to get control in Transaction Management + */ +typedef void(*TransactionParticipateEnd_hook_type)(bool commit); +extern PGDLLIMPORT TransactionParticipateEnd_hook_type TransactionParticipateEnd_hook; + +typedef bool(*NotifySubTransaction_hook_type)(TxnProtocolCommand command); +extern PGDLLIMPORT NotifySubTransaction_hook_type NotifySubTransaction_hook; + +typedef XLogRecPtr +(*XactLogCommitRecord_hook_type) (TimestampTz commit_time, + Oid tablespace_oid_to_delete_on_commit, + int nsubxacts, TransactionId *subxacts, + int nrels, RelFileNodePendingDelete *rels, + int nmsgs, SharedInvalidationMessage *msgs, + int ndeldbs, DbDirNode *deldbs, + bool relcacheInval, + int xactflags, TransactionId twophase_xid, + const char *twophase_gid); +extern PGDLLIMPORT XactLogCommitRecord_hook_type XactLogCommitRecord_hook; + +typedef XLogRecPtr +(*XactLogAbortRecord_hook_type) (TimestampTz abort_time, + Oid tablespace_oid_to_delete_on_abort, + int nsubxacts, TransactionId *subxacts, + int nrels, RelFileNodePendingDelete *rels, + int ndeldbs, DbDirNode *deldbs, + int xactflags, TransactionId twophase_xid, + const char *twophase_gid); +extern PGDLLIMPORT XactLogAbortRecord_hook_type XactLogAbortRecord_hook; + /* * Also stored in xinfo, these indicating a variety of additional actions that * need to occur when emulating transaction effects during recovery. @@ -452,6 +494,7 @@ extern void MarkCurrentTransactionIdLoggedIfAny(void); extern void MarkTopTransactionWriteXLogOnExecutor(void); extern bool SubTransactionIsActive(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); +extern void SetCurrentCommandId(CommandId cid); extern void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts); extern TimestampTz GetCurrentTransactionStartTimestamp(void); extern TimestampTz GetCurrentStatementStartTimestamp(void); @@ -505,6 +548,8 @@ extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg); extern void RecordDistributedForgetCommitted(DistributedTransactionId gxid); extern bool IsSubTransactionAssignmentPending(void); extern void MarkSubTransactionAssigned(void); +extern FullTransactionId *GetAllXids(int *nxids); +extern int GetNumOfTxnStatesWithoutXid(void); extern int xactGetCommittedChildren(TransactionId **ptr); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index cec7f91a3ba..f67120f0d1d 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -162,10 +162,18 @@ extern bool StandbyMode; /* tde feature enable or not */ extern int FileEncryptionEnabled; -/* Hook for plugins to do some startup job */ +/* Hook for plugins to get control in StartupXLOG */ +typedef void (*StartupXLOG_hook_type) (void); +extern PGDLLIMPORT StartupXLOG_hook_type StartupXLOG_hook; + +/* Hook for plugins to do additional startup works */ typedef void (*Startup_hook_type) (void); extern PGDLLIMPORT Startup_hook_type Startup_hook; +/* Hook for plugins to get control in XLogFlush */ +typedef void (*XLogFlush_hook_type) (XLogRecPtr record); +extern PGDLLIMPORT XLogFlush_hook_type XLogFlush_hook; + /* Archive modes */ typedef enum ArchiveMode { @@ -229,7 +237,7 @@ extern PGDLLIMPORT int wal_level; (DataChecksumsEnabled() || FileEncryptionEnabled || wal_log_hints) /* Do we need to WAL-log information required only for Hot Standby and logical replication? */ -#define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA) +#define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA && !enable_serverless) /* Do we need to WAL-log information required only for logical replication? */ #define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL) @@ -306,6 +314,7 @@ typedef enum WALAvailability } WALAvailability; struct XLogRecData; +typedef struct XLogCtlData XLogCtlData; extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata, XLogRecPtr fpw_lsn, @@ -438,5 +447,7 @@ extern bool IsRoleMirror(void); extern void SignalPromote(void); extern XLogRecPtr XLogLastInsertBeginLoc(void); extern void initialize_wal_bytes_written(void); +extern ControlFileData *GetControlFile(void); +extern XLogCtlData *GetXLogCtl(void); #endif /* XLOG_H */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index bd4a20242c4..e72142f0af7 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -8770,7 +8770,7 @@ prosrc => 'json_build_object_noargs' }, { oid => '3202', descr => 'map text array of key value pairs to json object', proname => 'json_object', prorettype => 'json', proargtypes => '_text', - prosrc => 'json_object' }, + prosrc => 'pg_json_object' }, { oid => '3203', descr => 'map text arrays of keys and values to json object', proname => 'json_object', prorettype => 'json', proargtypes => '_text _text', prosrc => 'json_object_two_arg' }, diff --git a/src/include/cdb/cdbdisp_extra.h b/src/include/cdb/cdbdisp_extra.h new file mode 100644 index 00000000000..b6bac03b3cd --- /dev/null +++ b/src/include/cdb/cdbdisp_extra.h @@ -0,0 +1,15 @@ +#ifndef CDBDISP_EXTRA_H +#define CDBDISP_EXTRA_H + +#include "lib/stringinfo.h" + +#define EXTRADISPNAME_MAX_LEN 64 + +typedef char *(*PackFunc) (int *len); +typedef void (*UnpackFunc) (const char *msg, int len); + +extern void RegisterExtraDispatch(const char *extraDispName, PackFunc packFunc, UnpackFunc unpackFunc); +extern char *PackExtraMsgs(int *len); +extern void UnPackExtraMsgs(StringInfo strInfo); + +#endif /* CDBDISP_EXTRA_H */ diff --git a/src/include/cdb/cdbdisp_query.h b/src/include/cdb/cdbdisp_query.h index 34c1deea4d8..cb05c49a842 100644 --- a/src/include/cdb/cdbdisp_query.h +++ b/src/include/cdb/cdbdisp_query.h @@ -40,6 +40,18 @@ struct CdbDispatcherState; struct CdbPgResults; struct CdbCopy; +/* + * Hooks for plugins to get control in command dispatch + */ +typedef bool (*CdbNeedDispatchCommand_hook_type) (const char *strCommand, + int *flags, + List *segments, + struct CdbPgResults *cdb_pgresults); +extern PGDLLIMPORT CdbNeedDispatchCommand_hook_type CdbNeedDispatchCommand_hook; + +typedef bool (*CdbNeedDispatchUtility_hook_type) (struct Node *stmt, int *flags); +extern PGDLLIMPORT CdbNeedDispatchUtility_hook_type CdbNeedDispatchUtility_hook; + /* Compose and dispatch the MPPEXEC commands corresponding to a plan tree * within a complete parallel plan. * diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index e53ed647a93..fde52ec6e34 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -97,6 +97,9 @@ extern PGDLLIMPORT ExecutorEnd_hook_type ExecutorEnd_hook; typedef bool (*ExecutorCheckPerms_hook_type) (List *, bool); extern PGDLLIMPORT ExecutorCheckPerms_hook_type ExecutorCheckPerms_hook; +/* Hook for plugins to get control in DtxTransaction Management */ +typedef bool (*SetDtxFlag_hook_type) (bool needDxt); +extern PGDLLIMPORT SetDtxFlag_hook_type SetDtxFlag_hook; /* * prototypes from functions in execAmi.c diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 07e375dfea0..da5af4e875e 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -336,6 +336,7 @@ extern PGDLLIMPORT int work_mem; extern PGDLLIMPORT double hash_mem_multiplier; extern PGDLLIMPORT int maintenance_work_mem; extern PGDLLIMPORT int max_parallel_maintenance_workers; +extern PGDLLIMPORT bool enable_serverless; extern PGDLLIMPORT int statement_mem; extern PGDLLIMPORT int max_statement_mem; extern PGDLLIMPORT int gp_vmem_limit_per_query; diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index 5557c826fe0..9f6e22a60cc 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -966,6 +966,9 @@ RELSEG_SIZE requires an initdb. */ #undef RELSEG_SIZE +/* Define to 1 to use serverless architecture of Cloudberry. (--enable-serverless) */ +#undef SERVERLESS + /* The size of `bool', as computed by sizeof. */ #undef SIZEOF_BOOL diff --git a/src/include/postgres.h b/src/include/postgres.h index 3561e2ed40b..244024f5d73 100644 --- a/src/include/postgres.h +++ b/src/include/postgres.h @@ -45,6 +45,7 @@ #define POSTGRES_H #include "c.h" +#include "lib/stringinfo.h" #include "utils/elog.h" #include "utils/palloc.h" #include "storage/itemptr.h" @@ -460,6 +461,10 @@ typedef struct NullableDatum /* due to alignment padding this could be used for flags for free */ } NullableDatum; +/* Hook for plugins to handle the txn message */ +typedef void(*HandleTxnCommand_hook_type)(StringInfo input_message, volatile bool *send_ready_for_query); +extern PGDLLIMPORT HandleTxnCommand_hook_type HandleTxnCommand_hook; + #define SIZEOF_DATUM SIZEOF_VOID_P StaticAssertDecl(SIZEOF_DATUM == 8, "sizeof datum is not 8"); /* diff --git a/src/include/postmaster/autovacuum.h b/src/include/postmaster/autovacuum.h index aacdd0f5753..5810ce55890 100644 --- a/src/include/postmaster/autovacuum.h +++ b/src/include/postmaster/autovacuum.h @@ -55,6 +55,10 @@ extern bool IsAutoVacuumWorkerProcess(void); #define IsAnyAutoVacuumProcess() \ (IsAutoVacuumLauncherProcess() || IsAutoVacuumWorkerProcess()) +/* Hook for plugins to get control in AutoVacLauncher */ +typedef void (*AutoVacLauncherMain_hook_type)(int argc, char *argv[]); +extern PGDLLIMPORT AutoVacLauncherMain_hook_type AutoVacLauncherMain_hook; + /* Functions to start autovacuum process, called from postmaster */ extern void autovac_init(void); extern int StartAutoVacLauncher(void); diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h index 8d4d84582da..9df3db4c8ec 100644 --- a/src/include/postmaster/postmaster.h +++ b/src/include/postmaster/postmaster.h @@ -14,6 +14,8 @@ #ifndef _POSTMASTER_H #define _POSTMASTER_H +#include "miscadmin.h" + /* GUC options */ extern bool EnableSSL; extern int ReservedBackends; @@ -57,6 +59,10 @@ extern int postmaster_alive_fds[2]; extern PGDLLIMPORT const char *progname; +/* Hook for plugins to get control in StartChildProcess */ +typedef pid_t (*StartChildProcess_hook_type) (AuxProcType type); +extern PGDLLIMPORT StartChildProcess_hook_type StartChildProcess_hook; + extern void PostmasterMain(int argc, char *argv[]) pg_attribute_noreturn(); extern void ClosePostmasterPorts(bool am_syslogger); extern void InitProcessGlobals(void); @@ -78,6 +84,7 @@ extern void ShmemBackendArrayAllocation(void); extern void load_auxiliary_libraries(void); extern bool amAuxiliaryBgWorker(void); +extern pid_t StartChildProcessInternal(AuxProcType type); #ifdef ENABLE_IC_PROXY # define IC_PROXY_NUM_BGWORKER 1 #else /* ENABLE_IC_PROXY */ diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index a7b0eff7b22..9f1e60f2f2a 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -48,6 +48,10 @@ extern void ExpireOldKnownAssignedTransactionIds(TransactionId xid); extern int GetMaxSnapshotXidCount(void); extern int GetMaxSnapshotSubxidCount(void); +/* Hook for plugins to get control in GetSnapshotData */ +typedef Snapshot (*GetSnapshotData_hook_type)(Snapshot snapshot, DtxContext distributedTransactionContext); +extern PGDLLIMPORT GetSnapshotData_hook_type GetSnapshotData_hook; + extern Snapshot GetSnapshotData(Snapshot snapshot, DtxContext distributedTransactionContext); extern bool ProcArrayInstallImportedXmin(TransactionId xmin, diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index 761c6c03baa..52ad2434a23 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -64,6 +64,7 @@ extern List *pg_plan_queries(List *querytrees, const char *query_string, extern bool check_max_stack_depth(int *newval, void **extra, GucSource source); extern void assign_max_stack_depth(int newval, void *extra); +extern void start_xact_command(void); extern void die(SIGNAL_ARGS); extern void quickdie(SIGNAL_ARGS) pg_attribute_noreturn(); extern void StatementCancelHandler(SIGNAL_ARGS); diff --git a/src/include/tcop/utility.h b/src/include/tcop/utility.h index 212e9b32806..3a7bf2c390c 100644 --- a/src/include/tcop/utility.h +++ b/src/include/tcop/utility.h @@ -77,6 +77,10 @@ typedef void (*ProcessUtility_hook_type) (PlannedStmt *pstmt, DestReceiver *dest, QueryCompletion *qc); extern PGDLLIMPORT ProcessUtility_hook_type ProcessUtility_hook; +/* Hook for plugins to send explicit begin command */ +typedef void (*SendTxnExplicitBegin_hook_type)(void); +extern PGDLLIMPORT SendTxnExplicitBegin_hook_type SendTxnExplicitBegin_hook; + extern void ProcessUtility(PlannedStmt *pstmt, const char *queryString, bool readOnlyTree, ProcessUtilityContext context, ParamListInfo params, diff --git a/src/include/utils/relmapper.h b/src/include/utils/relmapper.h index c0c772cce14..419730c8a9e 100644 --- a/src/include/utils/relmapper.h +++ b/src/include/utils/relmapper.h @@ -35,6 +35,10 @@ typedef struct xl_relmap_update #define MinSizeOfRelmapUpdate offsetof(xl_relmap_update, data) +typedef struct RelMapFile RelMapFile; +/* Hook for plugins to get control in load_relmap_file */ +typedef void (*LoadRelMap_hook_type)(bool shared, bool lock_held, RelMapFile *map); +extern PGDLLIMPORT LoadRelMap_hook_type LoadRelMap_hook; extern RelFileNodeId RelationMapOidToFilenode(Oid relationId, bool shared); diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index 6b959fa04fa..eaaddc0c7ea 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -162,6 +162,11 @@ extern TransactionId GlobalVisTestNonRemovableHorizon(GlobalVisState *state); extern bool GlobalVisCheckRemovableXid(Relation rel, TransactionId xid); extern bool GlobalVisCheckRemovableFullXid(Relation rel, FullTransactionId fxid); +extern GlobalVisState GlobalVisSharedRels; +extern GlobalVisState GlobalVisCatalogRels; +extern GlobalVisState GlobalVisDataRels; +extern GlobalVisState GlobalVisTempRels; + /* * Utility functions for implementing visibility routines in table AMs. */