diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 7eaa9cad9b1..bd61286e042 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -10881,8 +10881,9 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
the supported resource managers are heap,
heap2, btree, hash,
gin, gist, sequence,
- spgist, brin, and generic. Only
- superusers can change this setting.
+ spgist, brin, and generic.
+ Extensions may define additional resource managers. Only superusers and users with
+ the appropriate SET privilege can change this setting.
diff --git a/doc/src/sgml/custom-rmgr.sgml b/doc/src/sgml/custom-rmgr.sgml
new file mode 100644
index 00000000000..17a4f1dfbde
--- /dev/null
+++ b/doc/src/sgml/custom-rmgr.sgml
@@ -0,0 +1,98 @@
+
+
+
+ Custom WAL Resource Managers
+
+
+ This chapter explains the interface between the core
+ PostgreSQL system and custom WAL resource
+ managers, which enable extensions to integrate directly with the WAL.
+
+
+ An extension, especially a Table Access
+ Method or Index Access Method, may
+ need to use WAL for recovery, replication, and/or Logical Decoding. Custom resource managers
+ are a more flexible alternative to Generic
+ WAL (which does not support logical decoding), but more complex for
+ an extension to implement.
+
+
+ To create a new custom WAL resouce manager, first define an
+ RmgrData structure with implementations for the
+ resource manager methods. Refer to
+ src/backend/access/transam/README and
+ src/include/access/xlog_internal.h in the
+ PostgreSQL source.
+
+/*
+ * Method table for resource managers.
+ *
+ * This struct must be kept in sync with the PG_RMGR definition in
+ * rmgr.c.
+ *
+ * rm_identify must return a name for the record based on xl_info (without
+ * reference to the rmid). For example, XLOG_BTREE_VACUUM would be named
+ * "VACUUM". rm_desc can then be called to obtain additional detail for the
+ * record, if available (e.g. the last block).
+ *
+ * rm_mask takes as input a page modified by the resource manager and masks
+ * out bits that shouldn't be flagged by wal_consistency_checking.
+ *
+ * RmgrTable[] is indexed by RmgrId values (see rmgrlist.h). If rm_name is
+ * NULL, the corresponding RmgrTable entry is considered invalid.
+ */
+typedef struct RmgrData
+{
+ const char *rm_name;
+ void (*rm_redo) (XLogReaderState *record);
+ void (*rm_desc) (StringInfo buf, XLogReaderState *record);
+ const char *(*rm_identify) (uint8 info);
+ void (*rm_startup) (void);
+ void (*rm_cleanup) (void);
+ void (*rm_mask) (char *pagedata, BlockNumber blkno);
+ void (*rm_decode) (struct LogicalDecodingContext *ctx,
+ struct XLogRecordBuffer *buf);
+} RmgrData;
+
+
+
+ Then, register your new resource
+ manager.
+
+
+/*
+ * Register a new custom WAL resource manager.
+ *
+ * Resource manager IDs must be globally unique across all extensions. Refer
+ * to https://wiki.postgresql.org/wiki/CustomWALResourceManager to reserve a
+ * unique RmgrId for your extension, to avoid conflicts with other extension
+ * developers. During development, use RM_EXPERIMENTAL_ID to avoid needlessly
+ * reserving a new ID.
+ */
+extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr);
+
+ RegisterCustomRmgr must be called from the
+ extension module's _PG_init function.
+ While developing a new extension, use RM_EXPERIMENTAL_ID
+ for rmid. When you ready to release the extension to
+ users, reserve a new resource manager ID at the Custom WAL
+ Resource Manager page.
+
+
+
+ Place the extension module implementing the custom resource manager in so that it will be loaded early
+ during PostgreSQL startup.
+
+
+
+ The extension must remain in shared_preload_libraries as long as any
+ custom WAL records may exist in the system. Otherwise
+ PostgreSQL will not be able to apply or decode
+ the custom WAL records, which may prevent the server from starting.
+
+
+
diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml
index ac4c60c46de..04d8b235752 100644
--- a/doc/src/sgml/filelist.sgml
+++ b/doc/src/sgml/filelist.sgml
@@ -105,6 +105,7 @@
+
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 9a177a4e8c8..8266615ea47 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -25559,6 +25559,25 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
without recovery, the function returns NULL.
+
+
+
+
+ pg_get_wal_resource_managers
+
+ pg_get_wal_resource_managers ()
+ setof record
+ ( rm_idinteger,
+ rm_nametext,
+ rm_builtinboolean )
+
+
+ Returns the currently-loaded WAL resource managers in the system. The
+ column rm_builtin indicates whether it's a
+ built-in resource manager, or a custom resource manager loaded by an
+ extension.
+
+
diff --git a/doc/src/sgml/generic-wal.sgml b/doc/src/sgml/generic-wal.sgml
new file mode 100644
index 00000000000..a028856d2eb
--- /dev/null
+++ b/doc/src/sgml/generic-wal.sgml
@@ -0,0 +1,174 @@
+
+
+
+ Generic WAL Records
+
+
+ Although all built-in WAL-logged modules have their own types of WAL
+ records, there is also a generic WAL record type, which describes changes
+ to pages in a generic way. This is useful for extensions that provide
+ custom access methods.
+
+
+
+ In comparison with Custom WAL Resource
+ Managers, Generic WAL is simpler for an extension to implement and
+ does not require the extension library to be loaded in order to apply the
+ records.
+
+
+
+
+ Generic WAL records are ignored during Logical Decoding. If logical decoding is
+ required for your extension, consider a Custom WAL Resource Manager.
+
+
+
+
+ The API for constructing generic WAL records is defined in
+ access/generic_xlog.h and implemented
+ in access/transam/generic_xlog.c.
+
+
+
+ To perform a WAL-logged data update using the generic WAL record
+ facility, follow these steps:
+
+
+
+
+ state = GenericXLogStart(relation) — start
+ construction of a generic WAL record for the given relation.
+
+
+
+
+
+ page = GenericXLogRegisterBuffer(state, buffer, flags)
+ — register a buffer to be modified within the current generic WAL
+ record. This function returns a pointer to a temporary copy of the
+ buffer's page, where modifications should be made. (Do not modify the
+ buffer's contents directly.) The third argument is a bit mask of flags
+ applicable to the operation. Currently the only such flag is
+ GENERIC_XLOG_FULL_IMAGE, which indicates that a full-page
+ image rather than a delta update should be included in the WAL record.
+ Typically this flag would be set if the page is new or has been
+ rewritten completely.
+ GenericXLogRegisterBuffer can be repeated if the
+ WAL-logged action needs to modify multiple pages.
+
+
+
+
+
+ Apply modifications to the page images obtained in the previous step.
+
+
+
+
+
+ GenericXLogFinish(state) — apply the changes to
+ the buffers and emit the generic WAL record.
+
+
+
+
+
+
+ WAL record construction can be canceled between any of the above steps by
+ calling GenericXLogAbort(state). This will discard all
+ changes to the page image copies.
+
+
+
+ Please note the following points when using the generic WAL record
+ facility:
+
+
+
+
+ No direct modifications of buffers are allowed! All modifications must
+ be done in copies acquired from GenericXLogRegisterBuffer().
+ In other words, code that makes generic WAL records should never call
+ BufferGetPage() for itself. However, it remains the
+ caller's responsibility to pin/unpin and lock/unlock the buffers at
+ appropriate times. Exclusive lock must be held on each target buffer
+ from before GenericXLogRegisterBuffer() until after
+ GenericXLogFinish().
+
+
+
+
+
+ Registrations of buffers (step 2) and modifications of page images
+ (step 3) can be mixed freely, i.e., both steps may be repeated in any
+ sequence. Keep in mind that buffers should be registered in the same
+ order in which locks are to be obtained on them during replay.
+
+
+
+
+
+ The maximum number of buffers that can be registered for a generic WAL
+ record is MAX_GENERIC_XLOG_PAGES. An error will be thrown
+ if this limit is exceeded.
+
+
+
+
+
+ Generic WAL assumes that the pages to be modified have standard
+ layout, and in particular that there is no useful data between
+ pd_lower and pd_upper.
+
+
+
+
+
+ Since you are modifying copies of buffer
+ pages, GenericXLogStart() does not start a critical
+ section. Thus, you can safely do memory allocation, error throwing,
+ etc. between GenericXLogStart() and
+ GenericXLogFinish(). The only actual critical section is
+ present inside GenericXLogFinish(). There is no need to
+ worry about calling GenericXLogAbort() during an error
+ exit, either.
+
+
+
+
+
+ GenericXLogFinish() takes care of marking buffers dirty
+ and setting their LSNs. You do not need to do this explicitly.
+
+
+
+
+
+ For unlogged relations, everything works the same except that no
+ actual WAL record is emitted. Thus, you typically do not need to do
+ any explicit checks for unlogged relations.
+
+
+
+
+
+ The generic WAL redo function will acquire exclusive locks to buffers
+ in the same order as they were registered. After redoing all changes,
+ the locks will be released in the same order.
+
+
+
+
+
+ If GENERIC_XLOG_FULL_IMAGE is not specified for a
+ registered buffer, the generic WAL record contains a delta between
+ the old and the new page images. This delta is based on byte-by-byte
+ comparison. This is not very compact for the case of moving data
+ within a page, and might be improved in the future.
+
+
+
+
+
diff --git a/doc/src/sgml/postgres.sgml b/doc/src/sgml/postgres.sgml
index 2a3b8f29d4b..e7260e573c1 100644
--- a/doc/src/sgml/postgres.sgml
+++ b/doc/src/sgml/postgres.sgml
@@ -262,6 +262,7 @@ break is not needed in a wider output rendering.
&tableam;
&indexam;
&generic-wal;
+ &custom-rmgr;
&btree;
&gist;
&spgist;
diff --git a/doc/src/sgml/ref/pg_waldump.sgml b/doc/src/sgml/ref/pg_waldump.sgml
index 5fcdfe210ac..43085ac9ac8 100644
--- a/doc/src/sgml/ref/pg_waldump.sgml
+++ b/doc/src/sgml/ref/pg_waldump.sgml
@@ -146,6 +146,14 @@ PostgreSQL documentation
If list is passed as name, print a list of valid resource manager
names, and exit.
+
+ Extensions may define custom resource managers, but pg_waldump does
+ not load the extension module and therefore does not recognize custom
+ resource managers by name. Instead, you can specify the custom
+ resource managers as custom### where
+ "###" is the three-digit resource manager ID. Names
+ of this form will always be considered valid.
+
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 180615873cd..1a6cdb2de57 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,9 +24,14 @@
#include "commands/dbcommands_xlog.h"
#include "commands/sequence.h"
#include "commands/tablespace.h"
+#include "fmgr.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "replication/decode.h"
#include "replication/message.h"
#include "replication/origin.h"
#include "storage/standby.h"
+#include "utils/builtins.h"
#include "utils/relmapper.h"
#include "access/bitmap_xlog.h"
@@ -35,9 +40,127 @@
/* must be kept in sync with RmgrData definition in xlog_internal.h */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
- { name, redo, desc, identify, startup, cleanup, mask },
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
+ { name, redo, desc, identify, startup, cleanup, mask, decode },
-const RmgrData RmgrTable[RM_MAX_ID + 1] = {
+RmgrData RmgrTable[RM_MAX_ID + 1] = {
#include "access/rmgrlist.h"
};
+
+/*
+ * Start up all resource managers.
+ */
+void
+RmgrStartup(void)
+{
+ for (int rmid = 0; rmid <= RM_MAX_ID; rmid++)
+ {
+ if (!RmgrIdExists(rmid))
+ continue;
+
+ if (RmgrTable[rmid].rm_startup != NULL)
+ RmgrTable[rmid].rm_startup();
+ }
+}
+
+/*
+ * Clean up all resource managers.
+ */
+void
+RmgrCleanup(void)
+{
+ for (int rmid = 0; rmid <= RM_MAX_ID; rmid++)
+ {
+ if (!RmgrIdExists(rmid))
+ continue;
+
+ if (RmgrTable[rmid].rm_cleanup != NULL)
+ RmgrTable[rmid].rm_cleanup();
+ }
+}
+
+/*
+ * Emit ERROR when we encounter a record with an RmgrId we don't
+ * recognize.
+ */
+void
+RmgrNotFound(RmgrId rmid)
+{
+ ereport(ERROR, (errmsg("resource manager with ID %d not registered", rmid),
+ errhint("Include the extension module that implements this resource manager in shared_preload_libraries.")));
+}
+
+/*
+ * Register a new custom WAL resource manager.
+ *
+ * Resource manager IDs must be globally unique across all extensions. Refer
+ * to https://wiki.postgresql.org/wiki/CustomWALResourceManager to reserve a
+ * unique RmgrId for your extension, to avoid conflicts with other extension
+ * developers. During development, use RM_EXPERIMENTAL_ID to avoid needlessly
+ * reserving a new ID.
+ */
+void
+RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
+{
+ if (rmgr->rm_name == NULL || strlen(rmgr->rm_name) == 0)
+ ereport(ERROR, (errmsg("custom resource manager name is invalid"),
+ errhint("Provide a non-empty name for the custom resource manager.")));
+
+ if (!RmgrIdIsCustom(rmid))
+ ereport(ERROR, (errmsg("custom resource manager ID %d is out of range", rmid),
+ errhint("Provide a custom resource manager ID between %d and %d.",
+ RM_MIN_CUSTOM_ID, RM_MAX_CUSTOM_ID)));
+
+ if (!process_shared_preload_libraries_in_progress)
+ ereport(ERROR,
+ (errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid),
+ errdetail("Custom resource manager must be registered while initializing modules in shared_preload_libraries.")));
+
+ if (RmgrTable[rmid].rm_name != NULL)
+ ereport(ERROR,
+ (errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid),
+ errdetail("Custom resource manager \"%s\" already registered with the same ID.",
+ RmgrTable[rmid].rm_name)));
+
+ /* check for existing rmgr with the same name */
+ for (int existing_rmid = 0; existing_rmid <= RM_MAX_ID; existing_rmid++)
+ {
+ if (!RmgrIdExists(existing_rmid))
+ continue;
+
+ if (!pg_strcasecmp(RmgrTable[existing_rmid].rm_name, rmgr->rm_name))
+ ereport(ERROR,
+ (errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid),
+ errdetail("Existing resource manager with ID %d has the same name.", existing_rmid)));
+ }
+
+ /* register it */
+ RmgrTable[rmid] = *rmgr;
+ ereport(LOG,
+ (errmsg("registered custom resource manager \"%s\" with ID %d",
+ rmgr->rm_name, rmid)));
+}
+
+/* SQL SRF showing loaded resource managers */
+Datum
+pg_get_wal_resource_managers(PG_FUNCTION_ARGS)
+{
+#define PG_GET_RESOURCE_MANAGERS_COLS 3
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ Datum values[PG_GET_RESOURCE_MANAGERS_COLS];
+ bool nulls[PG_GET_RESOURCE_MANAGERS_COLS] = {0};
+
+ InitMaterializedSRF(fcinfo, 0);
+
+ for (int rmid = 0; rmid <= RM_MAX_ID; rmid++)
+ {
+ if (!RmgrIdExists(rmid))
+ continue;
+ values[0] = Int32GetDatum(rmid);
+ values[1] = CStringGetTextDatum(GetRmgr(rmid).rm_name);
+ values[2] = BoolGetDatum(RmgrIdIsBuiltin(rmid));
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+ }
+
+ return (Datum) 0;
+}
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 93feefef2ff..ba34d5ca6fa 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -743,7 +743,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
(uint32) SizeOfXLogRecord, record->xl_tot_len);
return false;
}
- if (record->xl_rmid > RM_MAX_ID)
+ if (!RmgrIdIsValid(record->xl_rmid))
{
report_invalid_record(state,
"invalid resource manager ID %u at %X/%X",
diff --git a/src/backend/mock.mk b/src/backend/mock.mk
index 001c0372bae..11b02242aab 100644
--- a/src/backend/mock.mk
+++ b/src/backend/mock.mk
@@ -23,7 +23,6 @@ MOCK_LIBS := -ldl $(filter-out -ledit, $(LIBS)) $(LDAP_LIBS_BE) $(ICU_LIBS) $(ZS
# These files are not linked into test programs.
EXCL_OBJS=\
src/backend/main/main.o \
- src/backend/access/transam/rmgr.o \
src/backend/utils/fmgrtab.o \
src/backend/gpopt/%.o \
src/backend/gpopt/config/%.o \
@@ -87,7 +86,6 @@ EXCL_OBJS+=\
# These files are linked into every test program.
MOCK_OBJS=\
$(top_srcdir)/src/test/unit/mock/fmgrtab_mock.o \
- $(top_srcdir)/src/test/unit/mock/rmgr_mock.o \
$(top_srcdir)/src/test/unit/mock/main_mock.o
# No test programs currently exercise the ORCA translator library, so
# mock that instead of linking with the real library.
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 3d9a199c8cb..2cd37aa5837 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1249,6 +1249,12 @@ PostmasterMain(int argc, char *argv[])
*/
InitializeMaxBackends();
+ /*
+ * Now that modules have been loaded, we can process any custom resource
+ * managers specified in the wal_consistency_checking GUC.
+ */
+ InitializeWalConsistencyChecking();
+
/*
* Set up shared memory and semaphores.
*/
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 891282bcf85..1a835983222 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -43,21 +43,6 @@
#include "replication/snapbuild.h"
#include "storage/standby.h"
-typedef struct XLogRecordBuffer
-{
- XLogRecPtr origptr;
- XLogRecPtr endptr;
- XLogReaderState *record;
-} XLogRecordBuffer;
-
-/* RMGR Handlers */
-static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-
/* individual record(group)'s handlers */
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
{
XLogRecordBuffer buf;
TransactionId txid;
+ RmgrData rmgr;
buf.origptr = ctx->reader->ReadRecPtr;
buf.endptr = ctx->reader->EndRecPtr;
@@ -127,82 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
buf.origptr);
}
- /* cast so we get a warning when new rmgrs are added */
- switch ((RmgrId) XLogRecGetRmid(record))
- {
- /*
- * Rmgrs we care about for logical decoding. Add new rmgrs in
- * rmgrlist.h's order.
- */
- case RM_XLOG_ID:
- DecodeXLogOp(ctx, &buf);
- break;
-
- case RM_XACT_ID:
- DecodeXactOp(ctx, &buf);
- break;
-
- case RM_STANDBY_ID:
- DecodeStandbyOp(ctx, &buf);
- break;
-
- case RM_HEAP2_ID:
- DecodeHeap2Op(ctx, &buf);
- break;
+ rmgr = GetRmgr(XLogRecGetRmid(record));
- case RM_HEAP_ID:
- DecodeHeapOp(ctx, &buf);
- break;
-
- case RM_LOGICALMSG_ID:
- DecodeLogicalMsgOp(ctx, &buf);
- break;
-
- /*
- * Rmgrs irrelevant for logical decoding; they describe stuff not
- * represented in logical decoding. Add new rmgrs in rmgrlist.h's
- * order.
- */
- case RM_SMGR_ID:
- case RM_CLOG_ID:
- case RM_DBASE_ID:
- case RM_TBLSPC_ID:
- case RM_MULTIXACT_ID:
- case RM_RELMAP_ID:
- case RM_BTREE_ID:
- case RM_HASH_ID:
- case RM_GIN_ID:
- case RM_GIST_ID:
- case RM_SEQ_ID:
- case RM_SPGIST_ID:
- case RM_BRIN_ID:
- case RM_COMMIT_TS_ID:
- case RM_REPLORIGIN_ID:
- case RM_GENERIC_ID:
- case RM_BITMAP_ID:
- case RM_DISTRIBUTEDLOG_ID:
- /* just deal with xid, and done */
- ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
- buf.origptr);
- break;
-
- case RM_APPEND_ONLY_ID:
- /*
- * GPDB_94_MERGE_FIXME: logical decoding hasn't been implemented for
- * append-only tables yet.
- */
- break;
-
- case RM_NEXT_ID:
- elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
+ if (rmgr.rm_decode != NULL)
+ rmgr.rm_decode(ctx, &buf);
+ else
+ {
+ /* just deal with xid, and done */
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
+ buf.origptr);
}
}
/*
* Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
*/
-static void
-DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
@@ -248,8 +175,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
*/
-static void
-DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
ReorderBuffer *reorder = ctx->reorder;
@@ -406,8 +333,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
*/
-static void
-DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
XLogReaderState *r = buf->record;
@@ -452,8 +379,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
*/
-static void
-DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
TransactionId xid = XLogRecGetXid(buf->record);
@@ -512,8 +439,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
*/
-static void
-DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
TransactionId xid = XLogRecGetXid(buf->record);
@@ -634,8 +561,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
/*
* Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
*/
-static void
-DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
XLogReaderState *r = buf->record;
@@ -1328,3 +1255,13 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
ctx->fast_forward || FilterByOrigin(ctx, origin_id));
}
+
+void
+appendonly_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ /*
+ * GPDB_94_MERGE_FIXME: logical decoding hasn't been implemented for
+ * append-only tables yet.
+ */
+
+}
diff --git a/src/backend/utils/fmgr/funcapi.c b/src/backend/utils/fmgr/funcapi.c
index 8510b62c94a..487a46c30f7 100644
--- a/src/backend/utils/fmgr/funcapi.c
+++ b/src/backend/utils/fmgr/funcapi.c
@@ -55,6 +55,73 @@ static bool resolve_polymorphic_tupdesc(TupleDesc tupdesc,
static TypeFuncClass get_type_func_class(Oid typid, Oid *base_typeid);
+/*
+ * InitMaterializedSRF
+ *
+ * Helper function to build the state of a set-returning function used
+ * in the context of a single call with materialize mode. This code
+ * includes sanity checks on ReturnSetInfo, creates the Tuplestore and
+ * the TupleDesc used with the function and stores them into the
+ * function's ReturnSetInfo.
+ *
+ * "flags" can be set to MAT_SRF_USE_EXPECTED_DESC, to use the tuple
+ * descriptor coming from expectedDesc, which is the tuple descriptor
+ * expected by the caller. MAT_SRF_BLESS can be set to complete the
+ * information associated to the tuple descriptor, which is necessary
+ * in some cases where the tuple descriptor comes from a transient
+ * RECORD datatype.
+ */
+void
+InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
+{
+ bool random_access;
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ Tuplestorestate *tupstore;
+ MemoryContext old_context,
+ per_query_ctx;
+ TupleDesc stored_tupdesc;
+
+ /* check to see if caller supports returning a tuplestore */
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ if (!(rsinfo->allowedModes & SFRM_Materialize) ||
+ ((flags & MAT_SRF_USE_EXPECTED_DESC) != 0 && rsinfo->expectedDesc == NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not allowed in this context")));
+
+ /*
+ * Store the tuplestore and the tuple descriptor in ReturnSetInfo. This
+ * must be done in the per-query memory context.
+ */
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ old_context = MemoryContextSwitchTo(per_query_ctx);
+
+ /* build a tuple descriptor for our result type */
+ if ((flags & MAT_SRF_USE_EXPECTED_DESC) != 0)
+ stored_tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
+ else
+ {
+ if (get_call_result_type(fcinfo, NULL, &stored_tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+ }
+
+ /* If requested, bless the tuple descriptor */
+ if ((flags & MAT_SRF_BLESS) != 0)
+ BlessTupleDesc(stored_tupdesc);
+
+ random_access = (rsinfo->allowedModes & SFRM_Materialize_Random) != 0;
+
+ tupstore = tuplestore_begin_heap(random_access, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = stored_tupdesc;
+ MemoryContextSwitchTo(old_context);
+}
+
+
/*
* init_MultiFuncCall
* Create an empty FuncCallContext data structure
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index a4214ad0662..f3044a983f2 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -1705,6 +1705,7 @@ char *local_preload_libraries_string = NULL;
/* Flag telling that we are loading shared_preload_libraries */
bool process_shared_preload_libraries_in_progress = false;
+bool process_shared_preload_libraries_done = false;
/*
* load the shared libraries listed in 'libraries'
@@ -1780,6 +1781,7 @@ process_shared_preload_libraries(void)
#endif
process_shared_preload_libraries_in_progress = false;
+ process_shared_preload_libraries_done = true;
}
/*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e9881d2bc7d..e120e661ea2 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -252,6 +252,11 @@ static double defunct_double = 0;
static ConfigVariable *ProcessConfigFileInternal(GucContext context,
bool applySettings, int elevel);
+/*
+ * Track whether there were any deferred checks for custom resource managers
+ * specified in wal_consistency_checking.
+ */
+static bool check_wal_consistency_checking_deferred = false;
/*
* Options for enum values defined in this module.
@@ -5845,6 +5850,36 @@ InitializeGUCOptions(void)
InitializeGUCOptionsFromEnvironment();
}
+/*
+ * If any custom resource managers were specified in the
+ * wal_consistency_checking GUC, processing was deferred. Now that
+ * shared_preload_libraries have been loaded, process wal_consistency_checking
+ * again.
+ */
+void
+InitializeWalConsistencyChecking(void)
+{
+ Assert(process_shared_preload_libraries_done);
+
+ if (check_wal_consistency_checking_deferred)
+ {
+ struct config_generic *guc;
+
+ guc = find_option("wal_consistency_checking", false, false, ERROR);
+
+ check_wal_consistency_checking_deferred = false;
+
+ set_config_option("wal_consistency_checking",
+ wal_consistency_checking_string,
+ PGC_POSTMASTER, guc->source,
+ GUC_ACTION_SET, true, ERROR, false);
+
+ /* checking should not be deferred again */
+ Assert(!check_wal_consistency_checking_deferred);
+ }
+
+}
+
/*
* Assign any GUC values that can come from the server's environment.
*
@@ -12084,13 +12119,13 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
{
char *tok = (char *) lfirst(l);
bool found = false;
- RmgrId rmid;
+ int rmid;
/* Check for 'all'. */
if (pg_strcasecmp(tok, "all") == 0)
{
for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
- if (RmgrTable[rmid].rm_mask != NULL)
+ if (RmgrIdExists(rmid) && GetRmgr(rmid).rm_mask != NULL)
newwalconsistency[rmid] = true;
found = true;
}
@@ -12102,8 +12137,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
*/
for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
{
- if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
- RmgrTable[rmid].rm_mask != NULL)
+ if (RmgrIdExists(rmid) && GetRmgr(rmid).rm_mask != NULL &&
+ pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0)
{
newwalconsistency[rmid] = true;
found = true;
@@ -12114,10 +12149,21 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
/* If a valid resource manager is found, check for the next one. */
if (!found)
{
- GUC_check_errdetail("Unrecognized key word: \"%s\".", tok);
- pfree(rawstring);
- list_free(elemlist);
- return false;
+ /*
+ * Perhaps it's a custom resource manager. If so, defer checking
+ * until InitializeWalConsistencyChecking().
+ */
+ if (!process_shared_preload_libraries_done)
+ {
+ check_wal_consistency_checking_deferred = true;
+ }
+ else
+ {
+ GUC_check_errdetail("Unrecognized key word: \"%s\".", tok);
+ pfree(rawstring);
+ list_free(elemlist);
+ return false;
+ }
}
}
@@ -12133,7 +12179,20 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
static void
assign_wal_consistency_checking(const char *newval, void *extra)
{
- wal_consistency_checking = (bool *) extra;
+ /*
+ * If some checks were deferred, it's possible that the checks will fail
+ * later during InitializeWalConsistencyChecking(). But in that case, the
+ * postmaster will exit anyway, so it's safe to proceed with the
+ * assignment.
+ *
+ * Any built-in resource managers specified are assigned immediately,
+ * which affects WAL created before shared_preload_libraries are
+ * processed. Any custom resource managers specified won't be assigned
+ * until after shared_preload_libraries are processed, but that's OK
+ * because WAL for a custom resource manager can't be written before the
+ * module is loaded anyway.
+ */
+ wal_consistency_checking = extra;
}
static bool
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 4ee8d416bf3..ce2b608a02f 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -28,16 +28,19 @@
#include "cdb/cdbappendonlyxlog.h"
/*
- * RmgrNames is an array of resource manager names, to make error messages
- * a bit nicer.
+ * RmgrNames is an array of the built-in resource manager names, to make error
+ * messages a bit nicer.
*/
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
name,
static const char *RmgrNames[RM_MAX_ID + 1] = {
#include "access/rmgrlist.h"
};
+#define RmgrName(rmid) (((rmid) <= RM_MAX_BUILTIN_ID) ? \
+ RmgrNames[rmid] : "custom")
+
static void extractPageInfo(XLogReaderState *record);
static int xlogreadfd = -1;
@@ -432,9 +435,9 @@ extractPageInfo(XLogReaderState *record)
* track that change.
*/
pg_fatal("WAL record modifies a relation, but record type is not recognized: "
- "lsn: %X/%X, rmgr: %s, info: %02X",
+ "lsn: %X/%X, rmid: %d, rmgr: %s, info: %02X",
LSN_FORMAT_ARGS(record->ReadRecPtr),
- RmgrNames[rmid], info);
+ rmid, RmgrName(rmid), info);
}
else if (rmid == RM_APPEND_ONLY_ID)
{
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 162d1d1f2c6..c30af01112d 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -66,8 +66,8 @@ typedef struct Stats
typedef struct XLogDumpStats
{
uint64 count;
- Stats rmgr_stats[RM_NEXT_ID];
- Stats record_stats[RM_NEXT_ID][MAX_XLINFO_TYPES];
+ Stats rmgr_stats[RM_MAX_ID + 1];
+ Stats record_stats[RM_MAX_ID + 1][MAX_XLINFO_TYPES];
} XLogDumpStats;
#define fatal_error(...) do { pg_log_fatal(__VA_ARGS__); exit(EXIT_FAILURE); } while(0)
@@ -77,9 +77,9 @@ print_rmgr_list(void)
{
int i;
- for (i = 0; i <= RM_MAX_ID; i++)
+ for (i = 0; i <= RM_MAX_BUILTIN_ID; i++)
{
- printf("%s\n", RmgrDescTable[i].rm_name);
+ printf("%s\n", GetRmgrDesc(i)->rm_name);
}
}
@@ -456,7 +456,7 @@ static void
XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
{
const char *id;
- const RmgrDescData *desc = &RmgrDescTable[XLogRecGetRmid(record)];
+ const RmgrDescData *desc = GetRmgrDesc(XLogRecGetRmid(record));
uint32 rec_len;
uint32 fpi_len;
RelFileNode rnode;
@@ -622,7 +622,7 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
* calculate column totals.
*/
- for (ri = 0; ri < RM_NEXT_ID; ri++)
+ for (ri = 0; ri < RM_MAX_ID; ri++)
{
total_count += stats->rmgr_stats[ri].count;
total_rec_len += stats->rmgr_stats[ri].rec_len;
@@ -640,13 +640,18 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
"Type", "N", "(%)", "Record size", "(%)", "FPI size", "(%)", "Combined size", "(%)",
"----", "-", "---", "-----------", "---", "--------", "---", "-------------", "---");
- for (ri = 0; ri < RM_NEXT_ID; ri++)
+ for (ri = 0; ri <= RM_MAX_ID; ri++)
{
uint64 count,
rec_len,
fpi_len,
tot_len;
- const RmgrDescData *desc = &RmgrDescTable[ri];
+ const RmgrDescData *desc;
+
+ if (!RmgrIdIsValid(ri))
+ continue;
+
+ desc = GetRmgrDesc(ri);
if (!config->stats_per_record)
{
@@ -655,6 +660,9 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
fpi_len = stats->rmgr_stats[ri].fpi_len;
tot_len = rec_len + fpi_len;
+ if (RmgrIdIsCustom(ri) && count == 0)
+ continue;
+
XLogDumpStatsRow(desc->rm_name,
count, total_count, rec_len, total_rec_len,
fpi_len, total_fpi_len, tot_len, total_len);
@@ -858,7 +866,7 @@ main(int argc, char **argv)
break;
case 'r':
{
- int i;
+ int rmid;
if (pg_strcasecmp(optarg, "list") == 0)
{
@@ -866,20 +874,40 @@ main(int argc, char **argv)
exit(EXIT_SUCCESS);
}
- for (i = 0; i <= RM_MAX_ID; i++)
+ /*
+ * First look for the generated name of a custom rmgr, of
+ * the form "custom###". We accept this form, because the
+ * custom rmgr module is not loaded, so there's no way to
+ * know the real name. This convention should be
+ * consistent with that in rmgrdesc.c.
+ */
+ if (sscanf(optarg, "custom%03d", &rmid) == 1)
{
- if (pg_strcasecmp(optarg, RmgrDescTable[i].rm_name) == 0)
+ if (!RmgrIdIsCustom(rmid))
{
- config.filter_by_rmgr = i;
- break;
+ pg_log_error("custom resource manager \"%s\" does not exist",
+ optarg);
+ goto bad_argument;
}
+ config.filter_by_rmgr = rmid;
}
-
- if (config.filter_by_rmgr == -1)
+ else
{
- pg_log_error("resource manager \"%s\" does not exist",
- optarg);
- goto bad_argument;
+ /* then look for builtin rmgrs */
+ for (rmid = 0; rmid <= RM_MAX_BUILTIN_ID; rmid++)
+ {
+ if (pg_strcasecmp(optarg, GetRmgrDesc(rmid)->rm_name) == 0)
+ {
+ config.filter_by_rmgr = rmid;
+ break;
+ }
+ }
+ if (rmid > RM_MAX_BUILTIN_ID)
+ {
+ pg_log_error("resource manager \"%s\" does not exist",
+ optarg);
+ goto bad_argument;
+ }
}
}
break;
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index da4e44e4328..79506960b4b 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -36,9 +36,68 @@
#include "access/distributedlog.h"
#include "cdb/cdbappendonlyxlog.h"
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
{ name, desc, identify},
-const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {
+static const RmgrDescData RmgrDescTable[RM_N_BUILTIN_IDS] = {
#include "access/rmgrlist.h"
};
+
+#define CUSTOM_NUMERIC_NAME_LEN sizeof("custom###")
+
+static char CustomNumericNames[RM_N_CUSTOM_IDS][CUSTOM_NUMERIC_NAME_LEN] = {0};
+static RmgrDescData CustomRmgrDesc[RM_N_CUSTOM_IDS] = {0};
+static bool CustomRmgrDescInitialized = false;
+
+/*
+ * No information on custom resource managers; just print the ID.
+ */
+static void
+default_desc(StringInfo buf, XLogReaderState *record)
+{
+ appendStringInfo(buf, "rmid: %d", XLogRecGetRmid(record));
+}
+
+/*
+ * No information on custom resource managers; just return NULL and let the
+ * caller handle it.
+ */
+static const char *
+default_identify(uint8 info)
+{
+ return NULL;
+}
+
+/*
+ * We are unable to get the real name of a custom rmgr because the module is
+ * not loaded. Generate a table of rmgrs with numeric names of the form
+ * "custom###", where "###" is the 3-digit resource manager ID.
+ */
+static void
+initialize_custom_rmgrs(void)
+{
+ for (int i = 0; i < RM_N_CUSTOM_IDS; i++)
+ {
+ snprintf(CustomNumericNames[i], CUSTOM_NUMERIC_NAME_LEN,
+ "custom%03d", i + RM_MIN_CUSTOM_ID);
+ CustomRmgrDesc[i].rm_name = CustomNumericNames[i];
+ CustomRmgrDesc[i].rm_desc = default_desc;
+ CustomRmgrDesc[i].rm_identify = default_identify;
+ }
+ CustomRmgrDescInitialized = true;
+}
+
+const RmgrDescData *
+GetRmgrDesc(RmgrId rmid)
+{
+ Assert(RmgrIdIsValid(rmid));
+
+ if (RmgrIdIsBuiltin(rmid))
+ return &RmgrDescTable[rmid];
+ else
+ {
+ if (!CustomRmgrDescInitialized)
+ initialize_custom_rmgrs();
+ return &CustomRmgrDesc[rmid - RM_MIN_CUSTOM_ID];
+ }
+}
diff --git a/src/bin/pg_waldump/rmgrdesc.h b/src/bin/pg_waldump/rmgrdesc.h
index 42f8483b482..f733cd467d5 100644
--- a/src/bin/pg_waldump/rmgrdesc.h
+++ b/src/bin/pg_waldump/rmgrdesc.h
@@ -18,6 +18,6 @@ typedef struct RmgrDescData
const char *(*rm_identify) (uint8 info);
} RmgrDescData;
-extern const RmgrDescData RmgrDescTable[];
+extern const RmgrDescData *GetRmgrDesc(RmgrId rmid);
#endif /* RMGRDESC_H */
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index c9b5c56a4c6..e465800e445 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -19,7 +19,7 @@ typedef uint8 RmgrId;
* Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG
* file format.
*/
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
symname,
typedef enum RmgrIds
@@ -30,6 +30,33 @@ typedef enum RmgrIds
#undef PG_RMGR
-#define RM_MAX_ID (RM_NEXT_ID - 1)
+#define RM_MAX_ID UINT8_MAX
+#define RM_MAX_BUILTIN_ID (RM_NEXT_ID - 1)
+#define RM_MIN_CUSTOM_ID 128
+#define RM_MAX_CUSTOM_ID UINT8_MAX
+#define RM_N_IDS (UINT8_MAX + 1)
+#define RM_N_BUILTIN_IDS (RM_MAX_BUILTIN_ID + 1)
+#define RM_N_CUSTOM_IDS (RM_MAX_CUSTOM_ID - RM_MIN_CUSTOM_ID + 1)
+
+static inline bool
+RmgrIdIsBuiltin(int rmid)
+{
+ return rmid <= RM_MAX_BUILTIN_ID;
+}
+
+static inline bool
+RmgrIdIsCustom(int rmid)
+{
+ return rmid >= RM_MIN_CUSTOM_ID && rmid <= RM_MAX_CUSTOM_ID;
+}
+
+#define RmgrIdIsValid(rmid) (RmgrIdIsBuiltin((rmid)) || RmgrIdIsCustom((rmid)))
+
+/*
+ * RmgrId to use for extensions that require an RmgrId, but are still in
+ * development and have not reserved their own unique RmgrId yet. See:
+ * https://wiki.postgresql.org/wiki/CustomWALResourceManagers
+ */
+#define RM_EXPERIMENTAL_ID 128
#endif /* RMGR_H */
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 2fcf7aef278..a8c07b295b4 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -23,31 +23,31 @@
*
* Changes to this list possibly need an XLOG_PAGE_MAGIC bump.
*/
-/* symbol name, textual name, redo, desc, identify, startup, cleanup */
-PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL)
-PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL)
-PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL)
-PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL)
-PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL)
-PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask)
-PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask)
-PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask)
-PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask)
-PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask)
-PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask)
-PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask)
-PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL)
-PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
-PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
+/* symbol name, textual name, redo, desc, identify, startup, cleanup, decode */
+PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode)
+PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode)
+PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode)
+PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode)
+PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode)
+PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL)
+PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL)
+PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL)
+PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL)
+PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL)
+PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL)
+PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL)
+PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
/* Cloudberry-specific resource managers */
-PG_RMGR(RM_BITMAP_ID, "Bitmap", bitmap_redo, bitmap_desc, bitmap_identify, NULL, NULL, NULL)
-PG_RMGR(RM_DISTRIBUTEDLOG_ID, "DistributedLog", DistributedLog_redo, DistributedLog_desc, DistributedLog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_APPEND_ONLY_ID, "Appendonly", appendonly_redo, appendonly_desc, appendonly_identify, NULL, NULL, NULL)
+PG_RMGR(RM_BITMAP_ID, "Bitmap", bitmap_redo, bitmap_desc, bitmap_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_DISTRIBUTEDLOG_ID, "DistributedLog", DistributedLog_redo, DistributedLog_desc, DistributedLog_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_APPEND_ONLY_ID, "Appendonly", appendonly_redo, appendonly_desc, appendonly_identify, NULL, NULL, NULL, appendonly_decode)
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 5ccedaf4db2..4ccdfef94fc 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -299,6 +299,9 @@ typedef enum
RECOVERY_TARGET_ACTION_SHUTDOWN
} RecoveryTargetAction;
+struct LogicalDecodingContext;
+struct XLogRecordBuffer;
+
/*
* Method table for resource managers.
*
@@ -313,7 +316,8 @@ typedef enum
* rm_mask takes as input a page modified by the resource manager and masks
* out bits that shouldn't be flagged by wal_consistency_checking.
*
- * RmgrTable[] is indexed by RmgrId values (see rmgrlist.h).
+ * RmgrTable[] is indexed by RmgrId values (see rmgrlist.h). If rm_name is
+ * NULL, the corresponding RmgrTable entry is considered invalid.
*/
typedef struct RmgrData
{
@@ -324,9 +328,29 @@ typedef struct RmgrData
void (*rm_startup) (void);
void (*rm_cleanup) (void);
void (*rm_mask) (char *pagedata, BlockNumber blkno);
+ void (*rm_decode) (struct LogicalDecodingContext *ctx,
+ struct XLogRecordBuffer *buf);
} RmgrData;
-extern const RmgrData RmgrTable[];
+extern RmgrData RmgrTable[];
+extern void RmgrStartup(void);
+extern void RmgrCleanup(void);
+extern void RmgrNotFound(RmgrId rmid);
+extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr);
+
+static inline bool
+RmgrIdExists(RmgrId rmid)
+{
+ return RmgrTable[rmid].rm_name != NULL;
+}
+
+static inline RmgrData
+GetRmgr(RmgrId rmid)
+{
+ if (unlikely(!RmgrIdExists(rmid)))
+ RmgrNotFound(rmid);
+ return RmgrTable[rmid];
+}
/*
* Exported to support xlog switching from checkpointer
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index e72142f0af7..b3e46448b4c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6347,6 +6347,13 @@
prorettype => 'text', proargtypes => '',
prosrc => 'pg_get_wal_replay_pause_state' },
+{ oid => '8189', descr => 'get resource managers loaded in system',
+ proname => 'pg_get_wal_resource_managers', prorows => '50', proretset => 't',
+ provolatile => 'v', prorettype => 'record', proargtypes => '',
+ proallargtypes => '{int4,text,bool}', proargmodes => '{o,o,o}',
+ proargnames => '{rm_id, rm_name, rm_builtin}',
+ prosrc => 'pg_get_wal_resource_managers' },
+
{ oid => '2621', descr => 'reload configuration files',
proname => 'pg_reload_conf', provolatile => 'v', prorettype => 'bool',
proargtypes => '', prosrc => 'pg_reload_conf' },
diff --git a/src/include/funcapi.h b/src/include/funcapi.h
index 92015f21852..2e6c5320a44 100644
--- a/src/include/funcapi.h
+++ b/src/include/funcapi.h
@@ -287,6 +287,13 @@ extern Datum HeapTupleHeaderGetDatum(HeapTupleHeader tuple);
*/
/* from funcapi.c */
+
+/* flag bits for InitMaterializedSRF() */
+#define MAT_SRF_USE_EXPECTED_DESC 0x01 /* use expectedDesc as tupdesc. */
+#define MAT_SRF_BLESS 0x02 /* "Bless" a tuple descriptor with
+ * BlessTupleDesc(). */
+extern void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags);
+
extern FuncCallContext *init_MultiFuncCall(PG_FUNCTION_ARGS);
extern FuncCallContext *per_MultiFuncCall(PG_FUNCTION_ARGS);
extern void end_MultiFuncCall(PG_FUNCTION_ARGS, FuncCallContext *funcctx);
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 07e375dfea0..5271e22a471 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -565,6 +565,7 @@ extern void BaseInit(void);
/* in utils/init/miscinit.c */
extern bool IgnoreSystemIndexes;
extern PGDLLIMPORT bool process_shared_preload_libraries_in_progress;
+extern bool process_shared_preload_libraries_done;
extern char *session_preload_libraries_string;
extern char *shared_preload_libraries_string;
extern char *local_preload_libraries_string;
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 69918080bb5..c129fb5e001 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -14,7 +14,22 @@
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
-void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
+typedef struct XLogRecordBuffer
+{
+ XLogRecPtr origptr;
+ XLogRecPtr endptr;
+ XLogReaderState *record;
+} XLogRecordBuffer;
+
+extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void appendonly_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
+extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
XLogReaderState *record);
#endif
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 0e1c15bcf64..fdaef0c4de5 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -716,6 +716,7 @@ extern const char *GetConfigOptionResetString(const char *name);
extern int GetConfigOptionFlags(const char *name, bool missing_ok);
extern void ProcessConfigFile(GucContext context);
extern void InitializeGUCOptions(void);
+extern void InitializeWalConsistencyChecking(void);
extern bool SelectConfigFiles(const char *userDoption, const char *progname);
extern void ResetAllOptions(void);
extern void AtStart_GUC(void);
diff --git a/src/test/unit/mock/gpopt_mock.c b/src/test/unit/mock/gpopt_mock.c
index e9b16001a6f..39bf2c49e82 100644
--- a/src/test/unit/mock/gpopt_mock.c
+++ b/src/test/unit/mock/gpopt_mock.c
@@ -22,21 +22,21 @@ Datum
LibraryVersion(void)
{
elog(ERROR, "mock implementation of LibraryVersion called");
- return NULL;
+ PG_RETURN_VOID();
}
Datum
EnableXform(PG_FUNCTION_ARGS)
{
elog(ERROR, "mock implementation of EnableXform called");
- return (Datum) 0;
+ PG_RETURN_VOID();
}
Datum
DisableXform(PG_FUNCTION_ARGS)
{
elog(ERROR, "mock implementation of EnableXform called");
- return (Datum) 0;
+ PG_RETURN_VOID();
}
void
diff --git a/src/test/unit/mock/rmgr_mock.c b/src/test/unit/mock/rmgr_mock.c
deleted file mode 100644
index 0de0ed113b3..00000000000
--- a/src/test/unit/mock/rmgr_mock.c
+++ /dev/null
@@ -1,12 +0,0 @@
-/*
- * This is a mock version of src/backend/access/transam/rmgr.c. The real
- * rmgr.c contains a table of the WAL redo/desc functions for all the WAL
- * record types. Leave out the table, so that we can leave out the AM
- * object files, which helps to cut down the size of the test programs.
- */
-#include "postgres.h"
-
-#include "access/rmgr.h"
-#include "access/xlog_internal.h"
-
-const RmgrData RmgrTable[RM_MAX_ID + 1];