Skip to content

Support schema property client connect to the schema server#979

Merged
mrproliu merged 10 commits intoapache:mainfrom
mrproliu:property-client
Mar 2, 2026
Merged

Support schema property client connect to the schema server#979
mrproliu merged 10 commits intoapache:mainfrom
mrproliu:property-client

Conversation

@mrproliu
Copy link
Copy Markdown
Contributor

@mrproliu mrproliu added this to the 0.10.0 milestone Feb 24, 2026
@mrproliu mrproliu added the enhancement New feature or request label Feb 24, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds support for a property-based schema server and client so components can connect to a dedicated schema server (instead of relying solely on etcd), and wires this into node discovery and service startup ordering.

Changes:

  • Introduce a property-based schema registry client (with sync/repair logic, TLS support, and caching) and a standalone schema server service.
  • Extend metadata/node discovery and command setup to advertise/connect to the schema server endpoint and support schema registry mode selection.
  • Adjust observability/native meter initialization and property DB metrics scoping to work cleanly with the new startup lifecycle.

Reviewed changes

Copilot reviewed 46 out of 46 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
scripts/build/lint.mk Runs golangci-lint with auto-fix enabled in lint target.
pkg/test/setup/setup.go Adds schema server root path flag in test server/node startup.
pkg/meter/native/provider.go Defers schema creation until metadata service is ready; queues measures.
pkg/grpchelper/connmanager.go Adds configurable health check timeout for gRPC connections.
pkg/cmdsetup/standalone.go Reorders unit startup to ensure metadata service ordering.
pkg/cmdsetup/liaison.go Reorders unit startup to ensure metadata service ordering.
pkg/cmdsetup/data.go Adds schema server unit and advertises its port in generated node info.
banyand/property/service.go Sets metrics scope name for property DB instance.
banyand/property/db/test_helper.go Sets metrics scope name for property DB tests.
banyand/property/db/shard_test.go Sets metrics scope name for property DB tests.
banyand/property/db/shard.go Uses DB-level metrics scope (instead of package-level) for shard labels.
banyand/property/db/repair_test.go Sets metrics scope name for property DB tests.
banyand/property/db/repair_gossip_test.go Sets metrics scope name for gossip repair tests.
banyand/property/db/repair.go Threads metrics scope into repair scheduler metrics creation.
banyand/property/db/db.go Adds required MetricsScopeName and standardizes error wrapping aliasing.
banyand/observability/services/service.go Initializes native schema creation during Serve when native mode is enabled.
banyand/observability/services/meter_native.go Tracks native providers and initializes schemas later.
banyand/metadata/service/server.go Adds node discovery mode plumbing and context enrichment for schema address.
banyand/metadata/schema/watcher_test.go Updates tests to use exported test helpers for watcher/compact/node ops.
banyand/metadata/schema/schemaserver/service.go Starts schema-server gRPC listener during PreRun; adds metrics scope name.
banyand/metadata/schema/schemaserver/grpc_test.go Uses schema.SchemaGroup constant in tests.
banyand/metadata/schema/schemaserver/grpc.go Uses schema.SchemaGroup constant in implementation.
banyand/metadata/schema/schema.go Exposes SchemaGroup, exports ErrUnsupportedEntityType, splits NodeDiscovery interface, adds Start(ctx) to Registry.
banyand/metadata/schema/register_test.go Updates node registration tests to use exported helpers / NodeDiscovery casting.
banyand/metadata/schema/property/testdata/certs/server.key Adds TLS test key material for property schema client/server tests.
banyand/metadata/schema/property/testdata/certs/server.crt Adds TLS test certificate for property schema client/server tests.
banyand/metadata/schema/property/testdata/certs/cert.conf.template Adds cert template for generating SANs in tests.
banyand/metadata/schema/property/testdata/certs/ca.key Adds TLS test CA key material.
banyand/metadata/schema/property/testdata/certs/ca.crt Adds TLS test CA certificate.
banyand/metadata/schema/property/schema_client.go Implements ConnManager client/handler for schema server gRPC clients (TLS optional).
banyand/metadata/schema/property/converter_test.go Adds conversion unit tests between schema protos and property records.
banyand/metadata/schema/property/converter.go Implements schema↔property conversion, IDs, queries, tag parsing, CRC32 IDs.
banyand/metadata/schema/property/client_test.go Adds comprehensive integration tests for property schema registry client and TLS.
banyand/metadata/schema/property/client.go Implements property-based schema registry client with broadcast, repair, sync loop, and handler notification.
banyand/metadata/schema/property/cache_test.go Adds tests for schema cache behavior (update/delete/revisions/kinds).
banyand/metadata/schema/property/cache.go Adds in-memory cache keyed by property ID with revision tracking.
banyand/metadata/schema/kind.go Exports Kind key prefix via Key() and adds AllKinds() / AllKeys().
banyand/metadata/schema/group.go Updates group deletion logic to use AllKeys().
banyand/metadata/schema/export_test.go Exposes test helpers for watcher/compact/register against etcd registry.
banyand/metadata/schema/etcd.go Makes watcher start/close idempotent and implements new Start(ctx) contract; updates key prefix usage.
banyand/metadata/discovery/file/file_test.go Updates handler registration signature to include kind.
banyand/metadata/discovery/file/file.go Adds optional metrics wiring and replays cached nodes to handlers on Start.
banyand/metadata/discovery/dns/dns_test.go Updates handler registration signature to include kind.
banyand/metadata/discovery/dns/dns.go Adds optional metrics wiring, replays cached nodes to handlers on Start, and always refreshes DNS on ListNode.
banyand/metadata/discovery/common/cache.go Adds StartForNotification replay and updates RegisterHandler signature.
banyand/metadata/client.go Adds schema registry mode (etcd/property), property client flags, refactors node discovery plumbing, and starts/stops registry/discovery via new interfaces.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +87 to +91
np.initialized.Store(true)
groupErr := np.createNativeObservabilityGroup(ctx)
if groupErr != nil && !errors.Is(groupErr, schema.ErrGRPCAlreadyExists) {
log.Error().Err(groupErr).Msg("Failed to create native observability group")
}
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InitSchema sets initialized to true before createNativeObservabilityGroup and before flushing pendingMeasures. If a metric is registered concurrently after initialized flips, registerOrDefer may try to create measures before the observability group exists, causing avoidable failures. Consider setting initialized only after the group is created (and/or after pending measures are processed).

Copilot uses AI. Check for mistakes.
Comment on lines 84 to 86
fs.StringVar(&s.nodeDiscoveryMode, "node-discovery-mode", metadata.NodeDiscoveryModeEtcd,
"Node discovery mode: 'etcd' for etcd-based, 'dns' for DNS-based, 'file' for file-based")
if s.propServer != nil {
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlagSet() defines --node-discovery-mode on this server FlagSet, but the embedded s.Service.FlagSet() (metadata client) also defines the same flag. Adding both flagsets will likely cause a duplicate-flag error/panic at startup. Consider removing this flag from the outer server FlagSet and relying on the embedded service flag (or rename one of them).

Copilot uses AI. Check for mistakes.
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Feb 24, 2026

Codecov Report

❌ Patch coverage is 69.25746% with 443 lines in your changes missing coverage. Please review.
✅ Project coverage is 47.21%. Comparing base (3530dd9) to head (f8696b3).
⚠️ Report is 139 commits behind head on main.

Files with missing lines Patch % Lines
banyand/metadata/schema/property/client.go 74.93% 122 Missing and 79 partials ⚠️
banyand/metadata/service/server.go 0.00% 101 Missing ⚠️
banyand/metadata/client.go 29.83% 75 Missing and 12 partials ⚠️
banyand/metadata/schema/property/converter.go 95.39% 8 Missing and 3 partials ⚠️
banyand/metadata/schema/etcd.go 50.00% 8 Missing and 1 partial ⚠️
banyand/metadata/schema/schemaserver/service.go 35.71% 8 Missing and 1 partial ⚠️
banyand/metadata/discovery/file/file.go 33.33% 4 Missing and 2 partials ⚠️
banyand/metadata/schema/property/schema_client.go 78.26% 4 Missing and 1 partial ⚠️
banyand/property/db/db.go 37.50% 4 Missing and 1 partial ⚠️
banyand/metadata/discovery/dns/dns.go 42.85% 3 Missing and 1 partial ⚠️
... and 4 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #979      +/-   ##
==========================================
+ Coverage   45.97%   47.21%   +1.23%     
==========================================
  Files         328      394      +66     
  Lines       55505    63158    +7653     
==========================================
+ Hits        25520    29821    +4301     
- Misses      27909    30518    +2609     
- Partials     2076     2819     +743     
Flag Coverage Δ
banyand 49.79% <69.04%> (?)
bydbctl 81.91% <ø> (?)
fodc 67.65% <ø> (?)
integration-distributed 80.00% <ø> (?)
pkg 30.03% <92.30%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

cache *schemaCache
caCertReloader *pkgtls.Reloader
handlers map[schema.Kind][]schema.EventHandler
timeout time.Duration
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout field is never used.

}
delete(c.entries, propID)
if revision > c.latestUpdateAt {
c.latestUpdateAt = revision
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bumps latestUpdateAt to the current wall-clock time in nanoseconds. But server-side UpdatedAt values are also nanosecond timestamps from their own clocks. After a synthetic deletion, latestUpdateAt jumps to "now". In the next incremental sync, The query asks for schemas with UpdatedAt > latestUpdateAt. Since latestUpdateAt is now "now" (the time of the deletion), any schema created/updated between the last real sync and now will be missed because its UpdatedAt is less than the inflated latestUpdateAt. These schemas will only be picked up on the next full reconcile (every 5 rounds = ~2.5 minutes by default).

Comment on lines +299 to +322
func (c *syncCollector) add(serverName string, kind schema.Kind, schemas []*schemaWithDeleteTime) {
c.mu.Lock()
defer c.mu.Unlock()
if c.entries[kind] == nil {
c.entries[kind] = make(map[string]*schemaWithDeleteTime)
}
if c.queriedServers[kind] == nil {
c.queriedServers[kind] = make(map[string]bool)
}
c.queriedServers[kind][serverName] = true
for _, s := range schemas {
propID := s.property.GetId()
existing, exists := c.entries[kind][propID]
if !exists {
c.entries[kind][propID] = s
continue
}
existingRev := ParseTags(existing.property.GetTags()).UpdatedAt
newRev := ParseTags(s.property.GetTags()).UpdatedAt
if newRev > existingRev {
c.entries[kind][propID] = s
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When two servers report the same property with equal UpdatedAt but different deleteTime values (e.g., one server has deleteTime=0 meaning alive, another has deleteTime>0 meaning deleted), the first one processed wins because newRev > existingRev is false for equal revisions.
Since broadcastAll runs goroutines concurrently and the arrival order into syncCollector.add is non-deterministic, the outcome of the sync depends on goroutine scheduling. The schema could appear live or deleted depending on which server's data arrives first.
The tie-breaking should have a deterministic rule — for example, preferring the entry with the higher deleteTime (deletion wins), or preferring the non-deleted entry.

Comment on lines +1076 to +1101
func (r *SchemaRegistry) collectIncrementalSync(ctx context.Context, collector *syncCollector) {
broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient) error {
updatedKindNames, queryErr := r.queryUpdatedSchemas(ctx, sc.update, r.cache.GetMaxRevision())
if queryErr != nil {
return queryErr
}
for _, kindName := range updatedKindNames {
kind, kindErr := KindFromString(kindName)
if kindErr != nil {
r.l.Warn().Str("kindName", kindName).Msg("unknown kind from aggregate update")
continue
}
query := buildSchemaQuery(kind, "", "")
schemas, schemasErr := r.querySchemasFromClient(ctx, sc.management, query)
if schemasErr != nil {
r.l.Warn().Err(schemasErr).Stringer("kind", kind).Str("node", nodeName).
Msg("failed to query schemas for incremental sync")
continue
}
collector.add(nodeName, kind, schemas)
}
return nil
})
if broadcastErr != nil {
r.l.Error().Err(broadcastErr).Msg("failed to collect incremental sync from some nodes")
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When any schema of kind X is updated, the client fetches all schemas of kind X from every server. If there are 10,000 measures and one is updated, the incremental sync fetches all 10,000 from every server. This defeats the purpose of incremental sync for large schema sets.
A more efficient approach would be to fetch only schemas with UpdatedAt > sinceRevision for the given kind, rather than the entire kind.

}
return nil
})
if broadcastErr != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collectFullSync inner function always returns nil — broadcastAll error is always nil

@mrproliu mrproliu merged commit 66c9bdc into apache:main Mar 2, 2026
39 of 41 checks passed
@mrproliu mrproliu deleted the property-client branch March 2, 2026 03:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants