Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions controllers/druid/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ under the License.
*/
package druid

import "github.com/apache/druid-operator/apis/druid/v1alpha1"
import (
"sort"

"github.com/apache/druid-operator/apis/druid/v1alpha1"
)

var (
druidServicesOrder = []string{historical, overlord, middleManager, indexer, broker, coordinator, router}
Expand All @@ -29,8 +33,10 @@ type ServiceGroup struct {
spec v1alpha1.DruidNodeSpec
}

// getNodeSpecsByOrder returns all NodeSpecs f a given Druid object.
// Recommended order is described at http://druid.io/docs/latest/operations/rolling-updates.html
// getNodeSpecsByOrder returns all NodeSpecs of a given Druid object in the
// recommended rolling-update order (see http://druid.io/docs/latest/operations/rolling-updates.html).
// Specs sharing a NodeType are sorted by map key so rollingDeploy stays
// deterministic across reconciles instead of flapping with map iteration.
func getNodeSpecsByOrder(m *v1alpha1.Druid) []*ServiceGroup {

scaledServiceSpecsByNodeType := map[string][]*ServiceGroup{}
Expand All @@ -46,7 +52,11 @@ func getNodeSpecsByOrder(m *v1alpha1.Druid) []*ServiceGroup {
allScaledServiceSpecs := make([]*ServiceGroup, 0, len(m.Spec.Nodes))

for _, t := range druidServicesOrder {
allScaledServiceSpecs = append(allScaledServiceSpecs, scaledServiceSpecsByNodeType[t]...)
specs := scaledServiceSpecsByNodeType[t]
sort.Slice(specs, func(i, j int) bool {
return specs[i].key < specs[j].key
})
allScaledServiceSpecs = append(allScaledServiceSpecs, specs...)
}

return allScaledServiceSpecs
Expand Down
142 changes: 133 additions & 9 deletions controllers/druid/ordering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ under the License.
package druid

import (
"reflect"
"sort"
"testing"
"time"

druidv1alpha1 "github.com/apache/druid-operator/apis/druid/v1alpha1"
Expand Down Expand Up @@ -56,16 +59,137 @@ var _ = Describe("Test ordering logic", func() {
return err == nil
}, timeout, interval).Should(BeTrue())
})
It("Should return an ordered list of nodes", func() {
It("Should return a deterministic, lexicographically ordered list of nodes within each NodeType", func() {
orderedServiceGroups := getNodeSpecsByOrder(druid)
Expect(orderedServiceGroups[0].key).Should(MatchRegexp("historicals"))
Expect(orderedServiceGroups[1].key).Should(MatchRegexp("historicals"))
Expect(orderedServiceGroups[2].key).Should(Equal("overlords"))
Expect(orderedServiceGroups[3].key).Should(Equal("middle-managers"))
Expect(orderedServiceGroups[4].key).Should(Equal("indexers"))
Expect(orderedServiceGroups[5].key).Should(Equal("brokers"))
Expect(orderedServiceGroups[6].key).Should(Equal("coordinators"))
Expect(orderedServiceGroups[7].key).Should(Equal("routers"))
// Three historical tiers (historicalstier1–3) all share NodeType
// "historical" and must come back sorted by key so rollingDeploy
// can never roll two of them out at the same time.
Expect(orderedServiceGroups[0].key).Should(Equal("historicalstier1"))
Expect(orderedServiceGroups[1].key).Should(Equal("historicalstier2"))
Expect(orderedServiceGroups[2].key).Should(Equal("historicalstier3"))
Expect(orderedServiceGroups[3].key).Should(Equal("overlords"))
Expect(orderedServiceGroups[4].key).Should(Equal("middle-managers"))
Expect(orderedServiceGroups[5].key).Should(Equal("indexers"))
Expect(orderedServiceGroups[6].key).Should(Equal("brokers"))
Expect(orderedServiceGroups[7].key).Should(Equal("coordinators"))
Expect(orderedServiceGroups[8].key).Should(Equal("routers"))
})
})
})

// determinismCallCount is the number of times getNodeSpecsByOrder is invoked
// per test to surface map-iteration non-determinism. Without the sort step in
// getNodeSpecsByOrder, randomized map iteration over m.Spec.Nodes makes the
// intra-NodeType order flap. With many specs sharing one NodeType and many
// repeated calls, the probability of observing at least one differing order
// approaches 1, which is exactly what we want for a regression test.
const determinismCallCount = 200

// makeMultiHistoricalDruid returns a Druid CR with several node specs sharing
// the same "historical" NodeType, plus one spec per other NodeType. This is
// the shape that triggered the bug fixed here: multiple StatefulSets/
// Deployments belonging to a single NodeType.
func makeMultiHistoricalDruid() *druidv1alpha1.Druid {
return &druidv1alpha1.Druid{
Spec: druidv1alpha1.DruidSpec{
Nodes: map[string]druidv1alpha1.DruidNodeSpec{
"historicalstier1": {NodeType: historical},
"historicalstier2": {NodeType: historical},
"historicalstier3": {NodeType: historical},
"historicalstier4": {NodeType: historical},
"brokers": {NodeType: broker},
"coordinators": {NodeType: coordinator},
"overlords": {NodeType: overlord},
"middle-managers": {NodeType: middleManager},
"indexers": {NodeType: indexer},
"routers": {NodeType: router},
},
},
}
}

// keysOfServiceGroups extracts the ordered list of keys from a slice of
// *ServiceGroup so test assertions can compare orderings as plain strings.
func keysOfServiceGroups(specs []*ServiceGroup) []string {
out := make([]string, len(specs))
for i, s := range specs {
out[i] = s.key
}
return out
}

// TestGetNodeSpecsByOrder_DeterministicAcrossCalls invokes getNodeSpecsByOrder
// many times on the same Druid CR and asserts every call returns the exact
// same ordering. Before the fix, Go's randomized map iteration over
// m.Spec.Nodes causes the order of specs sharing a NodeType (e.g. the four
// "historical" entries) to flap between calls, so this test fails. With the
// sort.Slice fix, all calls return the same ordering and the test passes.
func TestGetNodeSpecsByOrder_DeterministicAcrossCalls(t *testing.T) {
druid := makeMultiHistoricalDruid()

first := keysOfServiceGroups(getNodeSpecsByOrder(druid))

for i := 1; i < determinismCallCount; i++ {
got := keysOfServiceGroups(getNodeSpecsByOrder(druid))
if !reflect.DeepEqual(first, got) {
t.Fatalf(
"getNodeSpecsByOrder is non-deterministic: call 0 returned %v, call %d returned %v",
first, i, got,
)
}
}
}

// TestGetNodeSpecsByOrder_LexicographicWithinNodeType asserts the contractual
// intra-NodeType ordering: ascending by spec key. This pins down the exact
// behavior the operator relies on for sequential rolling deploy.
func TestGetNodeSpecsByOrder_LexicographicWithinNodeType(t *testing.T) {
druid := makeMultiHistoricalDruid()

got := keysOfServiceGroups(getNodeSpecsByOrder(druid))

wantHistoricals := []string{"historicalstier1", "historicalstier2", "historicalstier3", "historicalstier4"}
gotHistoricals := got[:len(wantHistoricals)]

if !sort.StringsAreSorted(gotHistoricals) {
t.Errorf("historical specs must be sorted ascending by key, got %v", gotHistoricals)
}
if !reflect.DeepEqual(gotHistoricals, wantHistoricals) {
t.Errorf("historical block ordering mismatch: want %v, got %v", wantHistoricals, gotHistoricals)
}

want := []string{
"historicalstier1", "historicalstier2", "historicalstier3", "historicalstier4",
"overlords",
"middle-managers",
"indexers",
"brokers",
"coordinators",
"routers",
}
if !reflect.DeepEqual(got, want) {
t.Errorf("full ordering mismatch:\nwant %v\n got %v", want, got)
}
}

// TestGetNodeSpecsByOrder_NodeTypeOrderPreserved guards against a regression
// in the cross-NodeType ordering defined by druidServicesOrder.
func TestGetNodeSpecsByOrder_NodeTypeOrderPreserved(t *testing.T) {
druid := &druidv1alpha1.Druid{
Spec: druidv1alpha1.DruidSpec{
Nodes: map[string]druidv1alpha1.DruidNodeSpec{
"routers": {NodeType: router},
"coordinators": {NodeType: coordinator},
"brokers": {NodeType: broker},
"historicals": {NodeType: historical},
"overlords": {NodeType: overlord},
},
},
}

got := keysOfServiceGroups(getNodeSpecsByOrder(druid))
want := []string{"historicals", "overlords", "brokers", "coordinators", "routers"}
if !reflect.DeepEqual(got, want) {
t.Errorf("NodeType ordering broken: want %v, got %v", want, got)
}
}
13 changes: 11 additions & 2 deletions controllers/druid/testdata/ordering.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,16 @@ spec:
- /bin/sh echo hello
containerName: node-level
image: hello-world
historicals2:
historicalstier2:
nodeType: "historical"
druid.port: 8080
nodeConfigMountPath: "/opt/druid/conf/druid/cluster/data/historical"
replicas: 1
runtime.properties: |-
druid.service=druid/historical
druid.segmentCache.locations=[{\"path\":\"/druid/data/segments\",\"maxSize\":10737418240}]
druid.server.maxSize=10737418240
historicalstier3:
nodeType: "historical"
druid.port: 8080
nodeConfigMountPath: "/opt/druid/conf/druid/cluster/data/historical"
Expand Down Expand Up @@ -119,7 +128,7 @@ spec:
druid.realtime.cache.populateCache=true
druid.indexer.runner.javaOptsArray=["-server","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager","--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED","--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED","--add-opens=java.base/java.lang=ALL-UNNAMED","--add-opens=java.base/java.io=ALL-UNNAMED","--add-opens=java.base/java.nio=ALL-UNNAMED","--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED","--add-opens=java.base/sun.nio.ch=ALL-UNNAMED"]
druid.indexer.task.restoreTasksOnRestart=true
historicals:
historicalstier1:
nodeType: "historical"
druid.port: 8080
nodeConfigMountPath: "/opt/druid/conf/druid/cluster/data/historical"
Expand Down
Loading
Loading