Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,17 @@ public int compare(
ImmutableZkWorker zkWorker, ImmutableZkWorker zkWorker2
)
{
return -Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
int retVal = -Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
// the version sorting is needed because if the workers have the same currCapacityUsed only one of them is
// returned. Exists the possibility that this worker is disabled and doesn't have valid version so can't
// run new tasks, so in this case the workers are sorted using version to ensure that if exists enable
// workers the comparator return one of them.

if(retVal == 0) {
retVal = zkWorker2.getWorker().getVersion().compareTo(zkWorker.getWorker().getVersion());
}

return retVal;
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ public int compare(
)
{
int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
// the version sorting is needed because if the workers have the same currCapacityUsed only one of them is
// returned. Exists the possibility that this worker is disabled and doesn't have valid version so can't
// run new tasks, so in this case the workers are sorted using version to ensure that if exists enable
// workers the comparator return one of them.

if (retVal == 0) {
retVal = zkWorker.getWorker().getHost().compareTo(zkWorker2.getWorker().getHost());
retVal = zkWorker.getWorker().getVersion().compareTo(zkWorker2.getWorker().getVersion());
}

return retVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,70 @@ public String getDataSource()
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("lhost", worker.getWorker().getHost());
}

@Test
public void testOneDisableWorkerDifferentUsedCapacity() throws Exception
{
String DISABLED_VERSION = "";
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();

Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 2,
Sets.<String>newHashSet()
),
"localhost",
new ImmutableZkWorker(
new Worker("enableHost", "enableHost", 10, "v1"), 5,
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
);
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("enableHost", worker.getWorker().getHost());
}

@Test
public void testOneDisableWorkerSameUsedCapacity() throws Exception
{
String DISABLED_VERSION = "";
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();

Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 5,
Sets.<String>newHashSet()
),
"localhost",
new ImmutableZkWorker(
new Worker("enableHost", "enableHost", 10, "v1"), 5,
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
);
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("enableHost", worker.getWorker().getHost());
}
}