Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package libnetwork
//go:generate protoc -I.:Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. agent.proto

import (
"context"
"encoding/json"
"fmt"
"net"
Expand All @@ -26,8 +27,11 @@ const (
subsysGossip = "networking:gossip"
subsysIPSec = "networking:ipsec"
keyringSize = 3
callerCtxKey = callerCtx("caller")
)

type callerCtx string

// ByTime implements sort.Interface for []*types.EncryptionKey based on
// the LamportTime field.
type ByTime []*types.EncryptionKey
Expand Down Expand Up @@ -597,6 +601,8 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
defer sb.Service.Unlock()
logrus.Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())

ctx := context.WithValue(context.Background(), callerCtxKey, "addServiceInfoToCluster")

// Check that the endpoint is still present on the sandbox before adding it to the service discovery.
// This is to handle a race between the EnableService and the sbLeave
// It is possible that the EnableService starts, fetches the list of the endpoints and
Expand Down Expand Up @@ -628,12 +634,12 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
if n.ingress {
ingressPorts = ep.ingressPorts
}
if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
if err := c.addServiceBinding(ctx, ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP); err != nil {
return err
}
} else {
// This is a container simply attached to an attachable network
if err := c.addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
if err := c.addContainerNameResolution(ctx, n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP); err != nil {
return err
}
}
Expand Down Expand Up @@ -664,7 +670,7 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
return nil
}

func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) error {
func (ep *endpoint) deleteServiceInfoFromCluster(ctx context.Context, sb *sandbox) error {
if ep.isAnonymous() && len(ep.myAliases) == 0 {
return nil
}
Expand All @@ -676,7 +682,7 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err

sb.Service.Lock()
defer sb.Service.Unlock()
logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())
logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", getCaller(ctx), ep.svcName, ep.ID())

c := n.getController()
agent := c.getAgent()
Expand All @@ -685,6 +691,7 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
if ep.isAnonymous() {
name = ep.MyAliases()[0]
}
newCtx := context.WithValue(ctx, callerCtxKey, "deleteServiceInfoFromCluster")

if agent != nil {
// First delete from networkDB then locally
Expand All @@ -700,18 +707,18 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
if n.ingress {
ingressPorts = ep.ingressPorts
}
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true); err != nil {
if err := c.rmServiceBinding(newCtx, ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, true); err != nil {
return err
}
} else {
// This is a container simply attached to an attachable network
if err := c.delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
if err := c.delContainerNameResolution(newCtx, n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP); err != nil {
return err
}
}
}

logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())
logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", getCaller(ctx), ep.svcName, ep.ID())

return nil
}
Expand Down Expand Up @@ -883,33 +890,42 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
return
}

ctx := context.WithValue(context.Background(), callerCtxKey, "handleEpTableEvent")

if isAdd {
logrus.Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
if err := c.addServiceBinding(ctx, svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
return
}
} else {
// This is a remote container simply attached to an attachable network
if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
if err := c.addContainerNameResolution(ctx, nid, eid, containerName, taskAliases, ip); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
}
}
} else {
logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true); err != nil {
if err := c.rmServiceBinding(ctx, svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, true); err != nil {
logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err)
return
}
} else {
// This is a remote container simply attached to an attachable network
if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
if err := c.delContainerNameResolution(ctx, nid, eid, containerName, taskAliases, ip); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
}
}
}
}

func getCaller(ctx context.Context) string {
if value := ctx.Value(callerCtxKey); value != nil {
return value.(string)
}
return "unknown"
}
10 changes: 7 additions & 3 deletions endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package libnetwork

import (
"container/heap"
"context"
"encoding/json"
"fmt"
"net"
Expand Down Expand Up @@ -603,8 +604,10 @@ func (ep *endpoint) rename(name string) error {
return nil
}

ctx := context.WithValue(context.Background(), callerCtxKey, "rename")

if c.isAgent() {
if err = ep.deleteServiceInfoFromCluster(sb, "rename"); err != nil {
if err = ep.deleteServiceInfoFromCluster(ctx, sb); err != nil {
return types.InternalErrorf("Could not delete service state for endpoint %s from cluster on rename: %v", ep.Name(), err)
}
} else {
Expand All @@ -628,7 +631,7 @@ func (ep *endpoint) rename(name string) error {
}
defer func() {
if err != nil {
ep.deleteServiceInfoFromCluster(sb, "rename")
ep.deleteServiceInfoFromCluster(ctx, sb)
ep.name = oldName
ep.anonymous = oldAnonymous
ep.addServiceInfoToCluster(sb)
Expand Down Expand Up @@ -751,8 +754,9 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption)
if err := n.getController().updateToStore(ep); err != nil {
return err
}
ctx := context.WithValue(context.Background(), callerCtxKey, "sbleave")

if e := ep.deleteServiceInfoFromCluster(sb, "sbLeave"); e != nil {
if e := ep.deleteServiceInfoFromCluster(ctx, sb); e != nil {
logrus.Errorf("Could not delete service state for endpoint %s from cluster: %v", ep.Name(), e)
}

Expand Down
12 changes: 7 additions & 5 deletions libnetwork_internal_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package libnetwork

import (
"context"
"encoding/json"
"fmt"
"net"
Expand Down Expand Up @@ -479,8 +480,9 @@ func TestServiceVIPReuse(t *testing.T) {
}

// Add 2 services with same name but different service ID to share the same VIP
n.(*network).addSvcRecords("ep1", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true, "test")
n.(*network).addSvcRecords("ep2", "service_test", "serviceID2", net.ParseIP("192.168.0.1"), net.IP{}, true, "test")
ctx := context.WithValue(context.Background(), callerCtxKey, "test")
n.(*network).addSvcRecords(ctx, "ep1", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true)
n.(*network).addSvcRecords(ctx, "ep2", "service_test", "serviceID2", net.ParseIP("192.168.0.1"), net.IP{}, true)

ipToResolve := netutils.ReverseIP("192.168.0.1")

Expand All @@ -503,7 +505,7 @@ func TestServiceVIPReuse(t *testing.T) {
}

// Delete service record for one of the services, the IP should remain because one service is still associated with it
n.(*network).deleteSvcRecords("ep1", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true, "test")
n.(*network).deleteSvcRecords(ctx, "ep1", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true)
ipList, _ = n.(*network).ResolveName("service_test", types.IPv4)
if len(ipList) == 0 {
t.Fatal("There must be the VIP")
Expand All @@ -523,7 +525,7 @@ func TestServiceVIPReuse(t *testing.T) {
}

// Delete again the service using the previous service ID, nothing should happen
n.(*network).deleteSvcRecords("ep2", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true, "test")
n.(*network).deleteSvcRecords(ctx, "ep2", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true)
ipList, _ = n.(*network).ResolveName("service_test", types.IPv4)
if len(ipList) == 0 {
t.Fatal("There must be the VIP")
Expand All @@ -543,7 +545,7 @@ func TestServiceVIPReuse(t *testing.T) {
}

// Delete now using the second service ID, now all the entries should be gone
n.(*network).deleteSvcRecords("ep2", "service_test", "serviceID2", net.ParseIP("192.168.0.1"), net.IP{}, true, "test")
n.(*network).deleteSvcRecords(ctx, "ep2", "service_test", "serviceID2", net.ParseIP("192.168.0.1"), net.IP{}, true)
ipList, _ = n.(*network).ResolveName("service_test", types.IPv4)
if len(ipList) != 0 {
t.Fatal("All the VIPs should be gone now")
Expand Down
24 changes: 14 additions & 10 deletions network.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package libnetwork

import (
"context"
"encoding/json"
"fmt"
"net"
Expand Down Expand Up @@ -1240,30 +1241,33 @@ func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool
if serviceID == "" {
serviceID = ep.ID()
}

ctx := context.WithValue(context.Background(), callerCtxKey, "updateSvcRecord")

if isAdd {
// If anonymous endpoint has an alias use the first alias
// for ip->name mapping. Not having the reverse mapping
// breaks some apps
if ep.isAnonymous() {
if len(myAliases) > 0 {
n.addSvcRecords(ep.ID(), myAliases[0], serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord")
n.addSvcRecords(ctx, ep.ID(), myAliases[0], serviceID, iface.Address().IP, ipv6, true)
}
} else {
n.addSvcRecords(ep.ID(), epName, serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord")
n.addSvcRecords(ctx, ep.ID(), epName, serviceID, iface.Address().IP, ipv6, true)
}
for _, alias := range myAliases {
n.addSvcRecords(ep.ID(), alias, serviceID, iface.Address().IP, ipv6, false, "updateSvcRecord")
n.addSvcRecords(ctx, ep.ID(), alias, serviceID, iface.Address().IP, ipv6, false)
}
} else {
if ep.isAnonymous() {
if len(myAliases) > 0 {
n.deleteSvcRecords(ep.ID(), myAliases[0], serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord")
n.deleteSvcRecords(ctx, ep.ID(), myAliases[0], serviceID, iface.Address().IP, ipv6, true)
}
} else {
n.deleteSvcRecords(ep.ID(), epName, serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord")
n.deleteSvcRecords(ctx, ep.ID(), epName, serviceID, iface.Address().IP, ipv6, true)
}
for _, alias := range myAliases {
n.deleteSvcRecords(ep.ID(), alias, serviceID, iface.Address().IP, ipv6, false, "updateSvcRecord")
n.deleteSvcRecords(ctx, ep.ID(), alias, serviceID, iface.Address().IP, ipv6, false)
}
}
}
Expand Down Expand Up @@ -1299,14 +1303,14 @@ func delNameToIP(svcMap common.SetMatrix, name, serviceID string, epIP net.IP) {
})
}

func (n *network) addSvcRecords(eID, name, serviceID string, epIP, epIPv6 net.IP, ipMapUpdate bool, method string) {
func (n *network) addSvcRecords(ctx context.Context, eID, name, serviceID string, epIP, epIPv6 net.IP, ipMapUpdate bool) {
// Do not add service names for ingress network as this is a
// routing only network
if n.ingress {
return
}

logrus.Debugf("%s (%s).addSvcRecords(%s, %s, %s, %t) %s sid:%s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method, serviceID)
logrus.Debugf("%s (%s).addSvcRecords(%s, %s, %s, %t) %s sid:%s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, getCaller(ctx), serviceID)

c := n.getController()
c.Lock()
Expand Down Expand Up @@ -1335,14 +1339,14 @@ func (n *network) addSvcRecords(eID, name, serviceID string, epIP, epIPv6 net.IP
}
}

func (n *network) deleteSvcRecords(eID, name, serviceID string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) {
func (n *network) deleteSvcRecords(ctx context.Context, eID, name, serviceID string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool) {
// Do not delete service names from ingress network as this is a
// routing only network
if n.ingress {
return
}

logrus.Debugf("%s (%s).deleteSvcRecords(%s, %s, %s, %t) %s sid:%s ", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method, serviceID)
logrus.Debugf("%s (%s).deleteSvcRecords(%s, %s, %s, %t) %s sid:%s ", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, getCaller(ctx), serviceID)

c := n.getController()
c.Lock()
Expand Down
Loading