Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 91 additions & 2 deletions integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) {
}
}

func TestNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T) {
for _, previousImage := range previousVersionImages {
t.Run(fmt.Sprintf("Backward compatibility upgrading from %s", previousImage), func(t *testing.T) {
runNewDistributorsCanPushToOldIngestersWithReplication(t, previousImage)
})
}
}

func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage string) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down Expand Up @@ -92,20 +100,101 @@ func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage s
// stopped, which means the transfer to ingester-2 is completed.
require.NoError(t, s.Stop(ingester1))

checkQueries(t, consul, distributor,
expectedVector,
previousImage,
flagsForOldImage, ChunksStorageFlags,
now,
s,
1,
)
}

// Check for issues like https://github.com/cortexproject/cortex/issues/2356
func runNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T, previousImage string) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
dynamo := e2edb.NewDynamoDB()
consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(dynamo, consul))

flagsForOldImage := mergeFlags(ChunksStorageFlags, map[string]string{
"-schema-config-file": "",
"-config-yaml": ChunksStorageFlags["-schema-config-file"],
"-distributor.replication-factor": "3",
})

flagsForNewImage := mergeFlags(ChunksStorageFlags, map[string]string{
"-distributor.replication-factor": "3",
})

require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))

// Start Cortex table-manager (running on current version since the backward compatibility
// test is about testing a rolling update of other services).
tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
require.NoError(t, s.StartAndWaitReady(tableManager))

// Wait until the first table-manager sync has completed, so that we're
// sure the tables have been created.
require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_table_manager_sync_success_timestamp_seconds"))

// Start other Cortex components (ingester running on previous version).
ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage)
ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage)
ingester3 := e2ecortex.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage)
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flagsForNewImage, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3))

// Wait until the distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(1536), "cortex_ring_tokens_total"))

// Push some series to Cortex.
now := time.Now()
series, expectedVector := generateSeries("series_1", now)

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

checkQueries(t, consul, distributor,
expectedVector,
previousImage,
flagsForOldImage, flagsForNewImage,
now,
s,
3,
)
}

func checkQueries(t *testing.T, consul *e2e.HTTPService, distributor *e2ecortex.CortexService,
expectedVector model.Vector,
previousImage string,
flagsForOldImage, flagsForNewImage map[string]string,
now time.Time,
s *e2e.Scenario,
numIngesters int,
) {
// Query the new ingester both with the old and the new querier.
for _, image := range []string{previousImage, ""} {
var querier *e2ecortex.CortexService

if image == previousImage {
querier = e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flagsForOldImage, image)
} else {
querier = e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, image)
querier = e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flagsForNewImage, image)
}

require.NoError(t, s.StartAndWaitReady(querier))

// Wait until the querier has updated the ring.
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(float64(numIngesters*512)), "cortex_ring_tokens_total"))

// Query the series
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
Expand Down
8 changes: 5 additions & 3 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,13 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet
if _, ok := distinctHosts[token.Ingester]; ok {
continue
}
if _, ok := distinctZones[token.Zone]; ok {
continue
if token.Zone != "" { // Ignore if the ingesters don't have a zone set.
if _, ok := distinctZones[token.Zone]; ok {
continue
}
distinctZones[token.Zone] = struct{}{}
}
distinctHosts[token.Ingester] = struct{}{}
distinctZones[token.Zone] = struct{}{}
ingester := r.ringDesc.Ingesters[token.Ingester]

// We do not want to Write to Ingesters that are not ACTIVE, but we do want
Expand Down