Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion agent/exec/dockerapi/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/docker/go-connections/nat"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/genericresource"
"github.com/docker/swarmkit/api/naming"
"github.com/docker/swarmkit/template"
gogotypes "github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -138,12 +139,15 @@ func (c *containerConfig) exposedPorts() map[nat.Port]struct{} {
}

func (c *containerConfig) config() *enginecontainer.Config {
genericEnvs := genericresource.EnvFormat(c.task.AssignedGenericResources, "DOCKER_RESOURCE")
env := append(c.spec().Env, genericEnvs...)

config := &enginecontainer.Config{
Labels: c.labels(),
StopSignal: c.spec().StopSignal,
User: c.spec().User,
Hostname: c.spec().Hostname,
Env: c.spec().Env,
Env: env,
WorkingDir: c.spec().Dir,
Tty: c.spec().TTY,
OpenStdin: c.spec().OpenStdin,
Expand Down
12 changes: 10 additions & 2 deletions agent/exec/dockerapi/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
engineapi "github.com/docker/docker/client"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/genericresource"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -40,6 +41,9 @@ func TestControllerFlowIntegration(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, client)

available := genericresource.NewSet("apple", "blue", "red")
available = append(available, genericresource.NewDiscrete("orange", 3))

task := &api.Task{
ID: "dockerexec-integration-task-id",
ServiceID: "dockerexec-integration-service-id",
Expand All @@ -50,20 +54,24 @@ func TestControllerFlowIntegration(t *testing.T) {
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{
Command: []string{"sh", "-c", "sleep 5; echo hello; echo stderr >&2"},
Command: []string{"sh", "-c", "sleep 5; echo $apple $orange; echo stderr >&2"},
Image: "alpine",
},
},
},
AssignedGenericResources: available,
}

var receivedLogs bool
publisher := exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error {
receivedLogs = true
v1 := genericresource.Value(available[0])
v2 := genericresource.Value(available[1])
genericResourceString := v1 + " " + v2 + "\n"

switch message.Stream {
case api.LogStreamStdout:
assert.Equal(t, "hello\n", string(message.Data))
assert.Equal(t, genericResourceString, string(message.Data))
case api.LogStreamStderr:
assert.Equal(t, "stderr\n", string(message.Data))
}
Expand Down
13 changes: 8 additions & 5 deletions agent/exec/dockerapi/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ import (
)

type executor struct {
client engineapi.APIClient
secrets exec.SecretsManager
client engineapi.APIClient
secrets exec.SecretsManager
genericResources []*api.GenericResource
}

// NewExecutor returns an executor from the docker client.
func NewExecutor(client engineapi.APIClient) exec.Executor {
func NewExecutor(client engineapi.APIClient, genericResources []*api.GenericResource) exec.Executor {
return &executor{
client: client,
secrets: secrets.NewManager(),
client: client,
secrets: secrets.NewManager(),
genericResources: genericResources,
}
}

Expand Down Expand Up @@ -105,6 +107,7 @@ func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) {
Resources: &api.Resources{
NanoCPUs: int64(info.NCPU) * 1e9,
MemoryBytes: info.MemTotal,
Generic: e.genericResources,
},
}

Expand Down
111 changes: 111 additions & 0 deletions api/genericresource/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package genericresource
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to expose these functions outside of genericresource?

Copy link
Copy Markdown
Contributor Author

@RenaudWasTaken RenaudWasTaken Apr 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NewError isn't used anywhere, the others are as they mask the type generated by the oneof


import (
"github.com/docker/swarmkit/api"
)

// NewSet creates a set object
func NewSet(key string, vals ...string) []*api.GenericResource {
rs := make([]*api.GenericResource, 0, len(vals))

for _, v := range vals {
rs = append(rs, NewString(key, v))
}

return rs
}

// NewString creates a String resource
func NewString(key, val string) *api.GenericResource {
return &api.GenericResource{
Resource: &api.GenericResource_Str{
Str: &api.GenericString{
Kind: key,
Value: val,
},
},
}
}

// NewDiscrete creates a Discrete resource
func NewDiscrete(key string, val int64) *api.GenericResource {
return &api.GenericResource{
Resource: &api.GenericResource_Discrete{
Discrete: &api.GenericDiscrete{
Kind: key,
Value: val,
},
},
}
}

// GetResource returns resources from the "resources" parameter matching the kind key
func GetResource(kind string, resources []*api.GenericResource) []*api.GenericResource {
var res []*api.GenericResource

for _, r := range resources {
if Kind(r) != kind {
continue
}

res = append(res, r)
}

return res
}

// ConsumeNodeResources removes "res" from nodeAvailableResources
func ConsumeNodeResources(nodeAvailableResources *[]*api.GenericResource, res []*api.GenericResource) {
if nodeAvailableResources == nil {
return
}

w := 0

loop:
for _, na := range *nodeAvailableResources {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the number of generic resources is high, this double loop may become heavy. Should we change the data structure to map to avoid double loops?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented it as list because it kept the code clean.
I was thinking of addressing performance issue in another PR if it became a real problem (as in people complaining about it).

What do you think ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense. However this data structure is a key piece in this implementation and I want to get it right from start. Performance issue may be hidden and we may not realize it.

I also think map shouldn't be more complicated than slice in implementation. But I might miss something. Let me know.

Copy link
Copy Markdown
Contributor Author

@RenaudWasTaken RenaudWasTaken May 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well to implement a map we have two ways to do it:

  • Convert at the "genericresource module level" (in the Claim, Reclaim, HasResource functions) which:

    • will likely increase the cost of calling these functions with a small amount of resource
    • Will however be better for a large amount of resource
  • Convert at the scheduler level, which implies changing the whole interface of the genericresource module and looks weird because it wouldn't match the api definition then

Copy link
Copy Markdown
Contributor Author

@RenaudWasTaken RenaudWasTaken May 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a side note these functions are really called once per task (once the node has been selected)

The function that would be the bottleneck if we have performance issue is HasResource and is of order O(n)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you described makes sense. I prefer the manager(leader) keeps a map of resources and everything is done by lookup. But I don't want this to hold this long PR.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convert at the scheduler level, which implies changing the whole interface of the genericresource module and looks weird because it wouldn't match the api definition then

This is my ideal solution, but it's a big change and might make sense as a followup. I don't want to block the PR on it.

for _, r := range res {
if Kind(na) != Kind(r) {
continue
}

if remove(na, r) {
continue loop
}
// If this wasn't the right element then
// we need to continue
}

(*nodeAvailableResources)[w] = na
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

na.Copy() so these aren't referencing the same memory?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal here is to move valid elements on top of deleted elements and at the end slice the array.
This is a simple deletion algorithm from an array so no need to copy the element :)

w++
}

*nodeAvailableResources = (*nodeAvailableResources)[:w]
}

// Returns true if the element is to be removed from the list
func remove(na, r *api.GenericResource) bool {
switch tr := r.Resource.(type) {
case *api.GenericResource_Discrete:
if na.GetDiscrete() == nil {
return false // Type change, ignore
}

na.GetDiscrete().Value -= tr.Discrete.Value
if na.GetDiscrete().Value <= 0 {
return true
}
case *api.GenericResource_Str:
if na.GetStr() == nil {
return false // Type change, ignore
}

if tr.Str.Value != na.GetStr().Value {
return false // not the right item, ignore
}

return true
}

return false
}
66 changes: 66 additions & 0 deletions api/genericresource/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package genericresource

import (
"testing"

"github.com/docker/swarmkit/api"
"github.com/stretchr/testify/assert"
)

func TestConsumeResourcesSingle(t *testing.T) {
nodeAvailableResources := NewSet("apple", "red", "orange", "blue")
res := NewSet("apple", "red")

ConsumeNodeResources(&nodeAvailableResources, res)
assert.Len(t, nodeAvailableResources, 2)

nodeAvailableResources = append(nodeAvailableResources, NewDiscrete("apple", 1))
res = []*api.GenericResource{NewDiscrete("apple", 1)}

ConsumeNodeResources(&nodeAvailableResources, res)
assert.Len(t, nodeAvailableResources, 2)

nodeAvailableResources = append(nodeAvailableResources, NewDiscrete("apple", 4))
res = []*api.GenericResource{NewDiscrete("apple", 1)}

ConsumeNodeResources(&nodeAvailableResources, res)
assert.Len(t, nodeAvailableResources, 3)
assert.Equal(t, int64(3), nodeAvailableResources[2].GetDiscrete().Value)
}

func TestConsumeResourcesMultiple(t *testing.T) {
nodeAvailableResources := NewSet("apple", "red", "orange", "blue", "green", "yellow")
nodeAvailableResources = append(nodeAvailableResources, NewDiscrete("orange", 5))
nodeAvailableResources = append(nodeAvailableResources, NewDiscrete("banana", 3))
nodeAvailableResources = append(nodeAvailableResources, NewSet("grape", "red", "orange", "blue", "green", "yellow")...)
nodeAvailableResources = append(nodeAvailableResources, NewDiscrete("cakes", 3))

res := NewSet("apple", "red")
res = append(res, NewDiscrete("banana", 2))
res = append(res, NewSet("apple", "green", "blue", "red")...)
res = append(res, NewSet("grape", "red", "blue", "red")...)
res = append(res, NewDiscrete("cakes", 3))

ConsumeNodeResources(&nodeAvailableResources, res)
assert.Len(t, nodeAvailableResources, 7)

apples := GetResource("apple", nodeAvailableResources)
oranges := GetResource("orange", nodeAvailableResources)
bananas := GetResource("banana", nodeAvailableResources)
grapes := GetResource("grape", nodeAvailableResources)
assert.Len(t, apples, 2)
assert.Len(t, oranges, 1)
assert.Len(t, bananas, 1)
assert.Len(t, grapes, 3)

for _, k := range []string{"yellow", "orange"} {
assert.True(t, HasResource(NewString("apple", k), apples))
}

for _, k := range []string{"yellow", "orange", "green"} {
assert.True(t, HasResource(NewString("grape", k), grapes))
}

assert.Equal(t, int64(5), oranges[0].GetDiscrete().Value)
assert.Equal(t, int64(1), bananas[0].GetDiscrete().Value)
}
47 changes: 47 additions & 0 deletions api/genericresource/parse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package genericresource

import (
"fmt"
"strconv"
"strings"

"github.com/docker/swarmkit/api"
)

func newParseError(format string, args ...interface{}) error {
return fmt.Errorf("could not parse GenericResource: "+format, args...)
}

// Parse parses the GenericResource resources given by the arguments
func Parse(cmd string) ([]*api.GenericResource, error) {
var rs []*api.GenericResource

for _, term := range strings.Split(cmd, ";") {
kva := strings.Split(term, "=")
if len(kva) != 2 {
return nil, newParseError("incorrect term %s, missing '=' or malformed expr", term)
}

key := strings.TrimSpace(kva[0])
val := strings.TrimSpace(kva[1])

u, err := strconv.ParseInt(val, 10, 64)
if err == nil {
if u < 0 {
return nil, newParseError("cannot ask for negative resource %s", key)
}
rs = append(rs, NewDiscrete(key, u))
continue
}

if len(val) > 2 && val[0] == '{' && val[len(val)-1] == '}' {
val = val[1 : len(val)-1]
rs = append(rs, NewSet(key, strings.Split(val, ",")...)...)
continue
}

return nil, newParseError("could not parse expression '%s'", term)
}

return rs, nil
}
45 changes: 45 additions & 0 deletions api/genericresource/parse_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package genericresource

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestParseDiscrete(t *testing.T) {
res, err := Parse("apple=3")
assert.NoError(t, err)
assert.Equal(t, len(res), 1)

apples := GetResource("apple", res)
assert.Equal(t, len(apples), 1)
assert.Equal(t, apples[0].GetDiscrete().Value, int64(3))
}

func TestParseStr(t *testing.T) {
res, err := Parse("orange={red,green,blue}")
assert.NoError(t, err)
assert.Equal(t, len(res), 3)

oranges := GetResource("orange", res)
assert.Equal(t, len(oranges), 3)
for _, k := range []string{"red", "green", "blue"} {
assert.True(t, HasResource(NewString("orange", k), oranges))
}
}

func TestParseDiscreteAndStr(t *testing.T) {
res, err := Parse("orange={red,green,blue};apple=3")
assert.NoError(t, err)
assert.Equal(t, len(res), 4)

oranges := GetResource("orange", res)
assert.Equal(t, len(oranges), 3)
for _, k := range []string{"red", "green", "blue"} {
assert.True(t, HasResource(NewString("orange", k), oranges))
}

apples := GetResource("apple", res)
assert.Equal(t, len(apples), 1)
assert.Equal(t, apples[0].GetDiscrete().Value, int64(3))
}
Loading