diff --git a/go/pkg/amqp/url_test.go b/go/pkg/amqp/url_test.go index 192e2fb04e..281057d4e4 100644 --- a/go/pkg/amqp/url_test.go +++ b/go/pkg/amqp/url_test.go @@ -32,13 +32,6 @@ func ExampleParseURL() { "amqps://host", "/path", "", - ":1234", - // Taken out because the go 1.4 URL parser isn't the same as later - //"[::1]", - //"[::1", - // Output would be: - // amqp://[::1]:amqp - // parse amqp://[::1: missing ']' in host } { u, err := ParseURL(s) if err != nil { @@ -55,5 +48,4 @@ func ExampleParseURL() { // amqps://host:amqps // amqp://localhost:amqp/path // amqp://localhost:amqp - // parse :1234: missing protocol scheme } diff --git a/go/pkg/electron/link.go b/go/pkg/electron/link.go index 303f815b8f..dc5d5503d3 100644 --- a/go/pkg/electron/link.go +++ b/go/pkg/electron/link.go @@ -21,9 +21,10 @@ package electron import ( "fmt" + "time" + "github.com/apache/qpid-proton/go/pkg/amqp" "github.com/apache/qpid-proton/go/pkg/proton" - "time" ) // Settings associated with a link @@ -183,6 +184,7 @@ type TerminusSettings struct { Expiry proton.ExpiryPolicy Timeout time.Duration Dynamic bool + Properties map[string]interface{} } func makeTerminusSettings(t proton.Terminus) TerminusSettings { @@ -191,6 +193,7 @@ func makeTerminusSettings(t proton.Terminus) TerminusSettings { Expiry: t.ExpiryPolicy(), Timeout: t.Timeout(), Dynamic: t.IsDynamic(), + Properties: t.GetProperties(), } } @@ -248,12 +251,14 @@ func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSetti l.pLink.Source().SetExpiryPolicy(l.sourceSettings.Expiry) l.pLink.Source().SetTimeout(l.sourceSettings.Timeout) l.pLink.Source().SetDynamic(l.sourceSettings.Dynamic) + l.pLink.Source().SetProperties(l.sourceSettings.Properties) l.pLink.Target().SetAddress(l.target) l.pLink.Target().SetDurability(l.targetSettings.Durability) l.pLink.Target().SetExpiryPolicy(l.targetSettings.Expiry) l.pLink.Target().SetTimeout(l.targetSettings.Timeout) l.pLink.Target().SetDynamic(l.targetSettings.Dynamic) + l.pLink.Target().SetProperties(l.targetSettings.Properties) l.pLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle)) l.pLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle)) diff --git a/go/pkg/proton/wrappers.go b/go/pkg/proton/wrappers.go index 724f6934b7..e7d202ebbf 100644 --- a/go/pkg/proton/wrappers.go +++ b/go/pkg/proton/wrappers.go @@ -453,3 +453,85 @@ func (t Transport) SASL() SASL { func SASLExtended() bool { return bool(C.pn_sasl_extended()) } + +// GetProperties returns the map of dynamic-node-properties for the terminus +// See section 3.5.9 in AMQP Specification v1.0 revision 1350 for more information +func (t Terminus) GetProperties() map[string]interface{} { + properties := map[string]interface{}{} + pn_data := C.pn_terminus_properties(t.pn) + size := int(C.pn_data_get_map(pn_data)) + if size <= 0 { + return nil + } + C.pn_data_enter(pn_data) + for i := 0; i < size/2; i++ { + key := "empty" + // read key (Must be a symbol keyed map) + if C.pn_data_next(pn_data) { + switch C.pn_data_type(pn_data) { + case C.PN_SYMBOL: + csymbol := C.pn_data_get_symbol(pn_data) + key = C.GoString(csymbol.start) + default: + + } + } + // read value + if C.pn_data_next(pn_data) { + switch C.pn_data_type(pn_data) { + case C.PN_INT: + value := int(C.pn_data_get_int(pn_data)) + properties[key] = value + case C.PN_SYMBOL: + csymbol := C.pn_data_get_symbol(pn_data) + value := C.GoString(csymbol.start) + properties[key] = value + case C.PN_STRING: + value := C.pn_data_get_string(pn_data) + properties[key] = value + } + } + } + C.pn_data_exit(pn_data) + return properties +} + +// SetProperties sets the map of dynamic-node-properties for the terminus +// See section 3.5.9 in AMQP Specification v1.0 revision 1350 for more information +func (t Terminus) SetProperties(properties map[string]interface{}) { + pn_data := C.pn_terminus_properties(t.pn) + if properties == nil || len(properties) <= 0 { + return + } + C.pn_data_clear(pn_data) + C.pn_data_put_map(pn_data) + C.pn_data_enter(pn_data) + for key, val := range properties { + // Put the key in the map + keyCStr := C.CString(key) + defer C.free(unsafe.Pointer(keyCStr)) + keyCStrLen := C.size_t(len(key)) + keyCStrBytes := C.pn_bytes(keyCStrLen, keyCStr) + C.pn_data_put_symbol(pn_data, keyCStrBytes) + + // Put the value in the map + switch val.(type) { + case int: + C.pn_data_put_int(pn_data, C.int(val.(int))) + case string: + valCStr := C.CString(val.(string)) + defer C.free(unsafe.Pointer(valCStr)) + valCStrLen := C.size_t(len(val.(string))) + valCStrBytes := C.pn_bytes(valCStrLen, valCStr) + C.pn_data_put_symbol(pn_data, valCStrBytes) + default: + unknown := "unknown" + valCStr := C.CString(unknown) + defer C.free(unsafe.Pointer(valCStr)) + valCStrLen := C.size_t(len(unknown)) + valCStrBytes := C.pn_bytes(valCStrLen, valCStr) + C.pn_data_put_string(pn_data, valCStrBytes) + } + } + C.pn_data_exit(pn_data) +}