Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions configure
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ enable_gpcloud
enable_mapreduce
enable_orca
enable_catalog_ext
enable_serverless
autodepend
PKG_CONFIG_LIBDIR
PKG_CONFIG_PATH
Expand Down Expand Up @@ -894,6 +895,7 @@ enable_depend
enable_cassert
enable_orca
enable_catalog_ext
enable_serverless
enable_mapreduce
enable_gpcloud
enable_external_fts
Expand Down Expand Up @@ -1604,6 +1606,7 @@ Optional Features:
--enable-cassert enable assertion checks (for debugging)
--disable-orca disable ORCA optimizer
--enable-catalog-ext enable CloudberryDB catalog extension
--enable-serverless use CloudberryDB serverless architecture
--enable-mapreduce enable CloudberryDB Mapreduce support
--enable-gpcloud enable gpcloud support
--enable-external-fts enable external fts support
Expand Down Expand Up @@ -8340,6 +8343,37 @@ fi

{ $as_echo "$as_me:${as_lineno-$LINENO}: result: checking whether to build with CloudberryDB catalog extension... $enable_catalog_ext" >&5
$as_echo "checking whether to build with CloudberryDB catalog extension ... $enable_catalog_ext" >&6; }
#
# Enable serverless architecture
#


# Check whether --enable-serverless was given.
if test "${enable_serverless+set}" = set; then :
enableval=$enable_serverless;
case $enableval in
yes)

$as_echo "#define SERVERLESS 1" >>confdefs.h

;;
no)
:
;;
*)
as_fn_error $? "no argument expected for --enable-serverless option" "$LINENO" 5
;;
esac

else
enable_serverless=no

fi


{ $as_echo "$as_me:${as_lineno-$LINENO}: result: checking whether to use serverless architecture of Cloudberry ... $enable_serverless" >&5
$as_echo "checking whether to use serverless architecture of Cloudberry ... $enable_serverless" >&6; }


#
# --enable-mapreduce enables GPMapreduce support
Expand Down
10 changes: 10 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,16 @@ PGAC_ARG_BOOL(enable, catalog-ext, no, [enable Cloudberry catalog extension],
AC_MSG_RESULT([checking whether to build with catalog extension... $enable_catalog_ext])
AC_SUBST(enable_catalog_ext)

#
#
# --enable-serverless uses serverless architecture of Cloudberry
#
PGAC_ARG_BOOL(enable, serverless, no, [use serverless architecture of Cloudberry],
[AC_DEFINE([SERVERLESS], 1,
[Define to 1 to use serverless architecture of Cloudberry. (--enable-serverless)])])
AC_MSG_RESULT([checking whether to use serverless architecture of Cloudberry... $enable_serverless])
AC_SUBST(enable_serverless)

#
# --enable-mapreduce enables GPMapreduce support
#
Expand Down
31 changes: 27 additions & 4 deletions contrib/interconnect/udp/ic_udpifc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1599,6 +1599,14 @@ initConnHashTable(ConnHashTable *ht, MemoryContext cxt)

ht->cxt = cxt;
ht->size = Gp_role == GP_ROLE_DISPATCH ? (getgpsegmentCount() * 2) : ic_htab_size;

/*
* In serverless architecture, the cluster may have only one QD, skip Initialization.
* Initialization will be done later.
*/
if (enable_serverless && Gp_role == GP_ROLE_DISPATCH && ht->size == 0)
return true;

Assert(ht->size > 0);

if (ht->cxt)
Expand Down Expand Up @@ -1635,6 +1643,18 @@ connAddHash(ConnHashTable *ht, MotionConn *mConn)
MemoryContext old = NULL;
MotionConnUDP *conn = NULL;

/*
* Initialize connection hash table if needed.
*/
if (enable_serverless && Gp_role == GP_ROLE_DISPATCH && ht->size == 0)
{
old = MemoryContextSwitchTo(ht->cxt);
initConnHashTable(ht, ht->cxt);
MemoryContextSwitchTo(old);

Assert(ht->size > 0);
}

conn = CONTAINER_OF(mConn, MotionConnUDP, mConn);

hashcode = CONN_HASH_VALUE(&conn->conn_info) % ht->size;
Expand Down Expand Up @@ -1805,10 +1825,13 @@ destroyConnHashTable(ConnHashTable *ht)
}
}

if (ht->cxt)
pfree(ht->table);
else
free(ht->table);
if (ht->size > 0)
{
if (ht->cxt)
pfree(ht->table);
else
free(ht->table);
}

ht->table = NULL;
ht->size = 0;
Expand Down
1 change: 1 addition & 0 deletions src/Makefile.global.in
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ enable_strong_random = @enable_strong_random@
enable_largefile = @enable_largefile@
enable_orca = @enable_orca@
enable_catalog_ext = @enable_catalog_ext@
enable_serverless = @enable_serverless@
enable_gpfdist = @enable_gpfdist@
enable_pxf = @enable_pxf@
enable_debug_extensions = @enable_debug_extensions@
Expand Down
3 changes: 3 additions & 0 deletions src/backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ ifeq ($(enable_catalog_ext),yes)
SUBDIRS += catalog-extension
LDFLAGS += -lprotobuf -lstdc++
endif
ifeq ($(enable_serverless),yes)
LDFLAGS += -lprotobuf -lstdc++ -ljansson
Comment thread
my-ship-it marked this conversation as resolved.
endif

include $(srcdir)/common.mk

Expand Down
7 changes: 7 additions & 0 deletions src/backend/access/transam/clog.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "access/xlog.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
#include "cdb/cdbvars.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "pgstat.h"
Expand Down Expand Up @@ -167,6 +168,12 @@ TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
int pageno = TransactionIdToPage(xid); /* get page of parent */
int i;

/*
* Only master can set transaction status
*/
if (enable_serverless && (Gp_role != GP_ROLE_DISPATCH && GpIdentity.segindex != MASTER_CONTENT_ID))
return;

Assert(status == TRANSACTION_STATUS_COMMITTED ||
status == TRANSACTION_STATUS_ABORTED);

Expand Down
45 changes: 34 additions & 11 deletions src/backend/access/transam/slru.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,16 @@ typedef enum
static SlruErrorCause slru_errcause;
static int slru_errno;

/*
* Hooks for plugins to get control in SlruPhysicalReadPage/SlruPhysicalWritePage/SimpleLruReadPage
*/
SlruPhysicalReadPage_hook_type SlruPhysicalReadPage_hook = NULL;
SlruPhysicalWritePage_hook_type SlruPhysicalWritePage_hook = NULL;
SimpleLruReadPage_hook_type SimpleLruReadPage_hook = NULL;

static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno);
static void SimpleLruWaitIO(SlruCtl ctl, int slotno);
static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata);
static bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno);
static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno,
SlruWriteAll fdata);
static void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid);
static int SlruSelectLRUPage(SlruCtl ctl, int pageno);

static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename,
int segpage, void *data);
static void SlruInternalDeleteSegment(SlruCtl ctl, int segno);
Expand Down Expand Up @@ -319,7 +319,7 @@ SimpleLruZeroPage(SlruCtl ctl, int pageno)
*
* This assumes that InvalidXLogRecPtr is bitwise-all-0.
*/
static void
void
SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
{
SlruShared shared = ctl->shared;
Expand All @@ -336,7 +336,7 @@ SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
*
* Control lock must be held at entry, and will be held at exit.
*/
static void
void
SimpleLruWaitIO(SlruCtl ctl, int slotno)
{
SlruShared shared = ctl->shared;
Expand Down Expand Up @@ -396,6 +396,9 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
{
SlruShared shared = ctl->shared;

if (SimpleLruReadPage_hook)
return (*SimpleLruReadPage_hook) (ctl, pageno, write_ok, xid);

/* Outer loop handles restart if we must wait for someone else's I/O */
for (;;)
{
Expand Down Expand Up @@ -496,6 +499,12 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
SlruShared shared = ctl->shared;
int slotno;

if (SimpleLruReadPage_hook)
{
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
return (*SimpleLruReadPage_hook) (ctl, pageno, true, xid);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where release lock ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock released by the caller.

}

/* Try to find the page while holding only shared lock */
LWLockAcquire(shared->ControlLock, LW_SHARED);

Expand Down Expand Up @@ -679,7 +688,7 @@ SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
* For now, assume it's not worth keeping a file pointer open across
* read/write operations. We could cache one virtual file pointer ...
*/
static bool
bool
SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
{
SlruShared shared = ctl->shared;
Expand All @@ -688,6 +697,13 @@ SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
off_t offset = rpageno * BLCKSZ;
char path[MAXPGPATH];
int fd;
bool result;

if (SlruPhysicalReadPage_hook &&
SlruPhysicalReadPage_hook(ctl, pageno, slotno, &result))
{
return result;
}

SlruFileName(ctl, path, segno);

Expand Down Expand Up @@ -760,6 +776,7 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruWriteAll fdata)
off_t offset = rpageno * BLCKSZ;
char path[MAXPGPATH];
int fd = -1;
bool result;

/* update the stats counter of written pages */
pgstat_count_slru_page_written(shared->slru_stats_idx);
Expand Down Expand Up @@ -806,6 +823,12 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruWriteAll fdata)
}
}

if (SlruPhysicalWritePage_hook &&
SlruPhysicalWritePage_hook(ctl, pageno, slotno, &result))
{
return result;
}

/*
* During a WriteAll, we may already have the desired file open.
*/
Expand Down Expand Up @@ -926,7 +949,7 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruWriteAll fdata)
* Issue the error message after failure of SlruPhysicalReadPage or
* SlruPhysicalWritePage. Call this after cleaning up shared-memory state.
*/
static void
void
SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid)
{
int segno = pageno / SLRU_PAGES_PER_SEGMENT;
Expand Down Expand Up @@ -1011,7 +1034,7 @@ SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid)
*
* Control lock must be held at entry, and will be held at exit.
*/
static int
int
SlruSelectLRUPage(SlruCtl ctl, int pageno)
{
SlruShared shared = ctl->shared;
Expand Down
9 changes: 9 additions & 0 deletions src/backend/access/transam/subtrans.c
Original file line number Diff line number Diff line change
Expand Up @@ -379,3 +379,12 @@ SubTransPagePrecedes(int page1, int page2)
return (TransactionIdPrecedes(xid1, xid2) &&
TransactionIdPrecedes(xid1, xid2 + SUBTRANS_XACTS_PER_PAGE - 1));
}

/*
* Get SUBTRANS control data
*/
SlruCtl
SUBTRANS_Ctl(void)
{
return SubTransCtl;
}
8 changes: 8 additions & 0 deletions src/backend/access/transam/varsup.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ int xid_warn_limit;

NewSegRelfilenode_assign_hook_type NewSegRelfilenode_assign_hook = NULL;

/*
* Hook for plugins to get control in GetNewTransactionId.
*/
GetNewTransactionId_hook_type GetNewTransactionId_hook = NULL;

/*
* Allocate the next FullTransactionId for a new transaction or
* subtransaction.
Expand All @@ -61,6 +66,9 @@ GetNewTransactionId(bool isSubXact)
FullTransactionId full_xid;
TransactionId xid;

if (GetNewTransactionId_hook)
return (*GetNewTransactionId_hook) (isSubXact);

/*
* Workers synchronize transaction state at the beginning of each parallel
* operation, so we can't account for new XIDs after that point.
Expand Down
Loading