Skip to content

Commit c289829

Browse files
committed
all: introduce transport.Transport interface and Transport plugin mechanism
Updates go-zeromq/zmq4#87.
1 parent 061cd68 commit c289829

File tree

7 files changed

+256
-53
lines changed

7 files changed

+256
-53
lines changed

internal/inproc/transport.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2020 The go-zeromq Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package inproc
6+
7+
import (
8+
"context"
9+
"net"
10+
11+
"github.com/go-zeromq/zmq4/transport"
12+
)
13+
14+
// Transport implements the zmq4 Transport interface for the inproc transport.
15+
type Transport struct{}
16+
17+
// Dial connects to the address on the named network using the provided
18+
// context.
19+
func (Transport) Dial(ctx context.Context, dialer transport.Dialer, addr string) (net.Conn, error) {
20+
return Dial(addr)
21+
}
22+
23+
// Listen announces on the provided network address.
24+
func (Transport) Listen(ctx context.Context, addr string) (net.Listener, error) {
25+
return Listen(addr)
26+
}
27+
28+
// Addr returns the end-point address.
29+
func (Transport) Addr(ep string) (addr string, err error) {
30+
return ep, nil
31+
}
32+
33+
var (
34+
_ transport.Transport = (*Transport)(nil)
35+
)

socket.go

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import (
1515
"strings"
1616
"sync"
1717
"time"
18-
19-
"github.com/go-zeromq/zmq4/internal/inproc"
2018
)
2119

2220
const (
@@ -180,15 +178,10 @@ func (sck *socket) Listen(endpoint string) error {
180178

181179
var l net.Listener
182180

183-
switch network {
184-
case "ipc":
185-
l, err = net.Listen("unix", addr)
186-
case "tcp":
187-
l, err = net.Listen("tcp", addr)
188-
case "udp":
189-
l, err = net.Listen("udp", addr)
190-
case "inproc":
191-
l, err = inproc.Listen(addr)
181+
trans, ok := drivers.get(network)
182+
switch {
183+
case ok:
184+
l, err = trans.Listen(sck.ctx, addr)
192185
default:
193186
panic("zmq4: unknown protocol " + network)
194187
}
@@ -240,18 +233,15 @@ func (sck *socket) Dial(endpoint string) error {
240233
return err
241234
}
242235

243-
retries := 0
244-
var conn net.Conn
236+
var (
237+
conn net.Conn
238+
trans, ok = drivers.get(network)
239+
retries = 0
240+
)
245241
connect:
246-
switch network {
247-
case "ipc":
248-
conn, err = sck.dialer.DialContext(sck.ctx, "unix", addr)
249-
case "tcp":
250-
conn, err = sck.dialer.DialContext(sck.ctx, "tcp", addr)
251-
case "udp":
252-
conn, err = sck.dialer.DialContext(sck.ctx, "udp", addr)
253-
case "inproc":
254-
conn, err = inproc.Dial(addr)
242+
switch {
243+
case ok:
244+
conn, err = trans.Dial(sck.ctx, &sck.dialer, addr)
255245
default:
256246
panic("zmq4: unknown protocol " + network)
257247
}
@@ -262,7 +252,7 @@ connect:
262252
time.Sleep(sck.retry)
263253
goto connect
264254
}
265-
return fmt.Errorf("zmq4: could not dial to %q: %w", endpoint, err)
255+
return fmt.Errorf("zmq4: could not dial to %q (retry=%v): %w", endpoint, sck.retry, err)
266256
}
267257

268258
if conn == nil {

transport.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2018 The go-zeromq Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package zmq4
6+
7+
import (
8+
"fmt"
9+
"sort"
10+
"sync"
11+
12+
"github.com/go-zeromq/zmq4/internal/inproc"
13+
"github.com/go-zeromq/zmq4/transport"
14+
)
15+
16+
// Transports returns the sorted list of currently registered transports.
17+
func Transports() []string {
18+
return drivers.names()
19+
}
20+
21+
// RegisterTransport registers a new transport with the zmq4 package.
22+
func RegisterTransport(name string, trans transport.Transport) error {
23+
return drivers.add(name, trans)
24+
}
25+
26+
type transports struct {
27+
sync.RWMutex
28+
db map[string]transport.Transport
29+
}
30+
31+
func (ts *transports) get(name string) (transport.Transport, bool) {
32+
ts.RLock()
33+
defer ts.RUnlock()
34+
35+
v, ok := ts.db[name]
36+
return v, ok
37+
}
38+
39+
func (ts *transports) add(name string, trans transport.Transport) error {
40+
ts.Lock()
41+
defer ts.Unlock()
42+
43+
if old, dup := ts.db[name]; dup {
44+
return fmt.Errorf("zmq4: duplicate transport %q (%T)", name, old)
45+
}
46+
47+
ts.db[name] = trans
48+
return nil
49+
}
50+
51+
func (ts *transports) names() []string {
52+
ts.RLock()
53+
defer ts.RUnlock()
54+
55+
o := make([]string, 0, len(ts.db))
56+
for k := range ts.db {
57+
o = append(o, k)
58+
}
59+
sort.Strings(o)
60+
return o
61+
}
62+
63+
var drivers = transports{
64+
db: make(map[string]transport.Transport),
65+
}
66+
67+
func init() {
68+
RegisterTransport("ipc", transport.New("unix"))
69+
RegisterTransport("tcp", transport.New("tcp"))
70+
RegisterTransport("udp", transport.New("udp"))
71+
RegisterTransport("inproc", inproc.Transport{})
72+
}

transport/transport.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright 2020 The go-zeromq Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
// Package transport defines the Transport interface and provides a net-based
6+
// implementation that can be used by zmq4 sockets to exchange messages.
7+
package transport // import "github.com/go-zeromq/zmq4/transport"
8+
9+
import (
10+
"context"
11+
"fmt"
12+
"net"
13+
)
14+
15+
// Dialer is the interface that wraps the DialContext method.
16+
type Dialer interface {
17+
DialContext(ctx context.Context, network, address string) (net.Conn, error)
18+
}
19+
20+
// Transport is the zmq4 transport interface that wraps
21+
// the Dial and Listen methods.
22+
type Transport interface {
23+
// Dial connects to the address on the named network using the provided
24+
// context.
25+
Dial(ctx context.Context, dialer Dialer, addr string) (net.Conn, error)
26+
27+
// Listen announces on the provided network address.
28+
Listen(ctx context.Context, addr string) (net.Listener, error)
29+
30+
// Addr returns the end-point address.
31+
Addr(ep string) (addr string, err error)
32+
}
33+
34+
// netTransport implements the Transport interface using the net package.
35+
type netTransport struct {
36+
prot string
37+
}
38+
39+
// New returns a new net-based transport with the given network (e.g "tcp").
40+
func New(network string) Transport {
41+
return netTransport{prot: network}
42+
}
43+
44+
// Dial connects to the address on the named network using the provided
45+
// context.
46+
func (trans netTransport) Dial(ctx context.Context, dialer Dialer, addr string) (net.Conn, error) {
47+
return dialer.DialContext(ctx, trans.prot, addr)
48+
}
49+
50+
// Listen announces on the provided network address.
51+
func (trans netTransport) Listen(ctx context.Context, addr string) (net.Listener, error) {
52+
return net.Listen(trans.prot, addr)
53+
}
54+
55+
// Addr returns the end-point address.
56+
func (trans netTransport) Addr(ep string) (addr string, err error) {
57+
switch trans.prot {
58+
case "tcp", "udp":
59+
host, port, err := net.SplitHostPort(ep)
60+
if err != nil {
61+
return addr, err
62+
}
63+
switch port {
64+
case "0", "*", "":
65+
port = "0"
66+
}
67+
switch host {
68+
case "", "*":
69+
host = "0.0.0.0"
70+
}
71+
addr = net.JoinHostPort(host, port)
72+
return addr, err
73+
74+
case "unix":
75+
return ep, nil
76+
77+
default:
78+
err = fmt.Errorf("zmq4: unknown protocol %q", trans.prot)
79+
}
80+
81+
return addr, err
82+
}
83+
84+
var (
85+
_ Transport = (*netTransport)(nil)
86+
)

transport_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright 2018 The go-zeromq Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package zmq4
6+
7+
import (
8+
"reflect"
9+
"testing"
10+
11+
"github.com/go-zeromq/zmq4/internal/inproc"
12+
)
13+
14+
func TestTransport(t *testing.T) {
15+
if got, want := Transports(), []string{"inproc", "ipc", "tcp", "udp"}; !reflect.DeepEqual(got, want) {
16+
t.Fatalf("invalid list of transports.\ngot= %q\nwant=%q", got, want)
17+
}
18+
19+
err := RegisterTransport("tcp", inproc.Transport{})
20+
if err == nil {
21+
t.Fatalf("expected a duplicate-registration error")
22+
}
23+
if got, want := err.Error(), "zmq4: duplicate transport \"tcp\" (transport.netTransport)"; got != want {
24+
t.Fatalf("invalid duplicate registration error:\ngot= %s\nwant=%s", got, want)
25+
}
26+
27+
err = RegisterTransport("inproc2", inproc.Transport{})
28+
if err != nil {
29+
t.Fatalf("could not register 'inproc2': %+v", err)
30+
}
31+
}

utils.go

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"fmt"
1010
"io"
1111
"log"
12-
"net"
1312
"strings"
1413
)
1514

@@ -20,39 +19,15 @@ func splitAddr(v string) (network, addr string, err error) {
2019
err = errInvalidAddress
2120
return network, addr, err
2221
}
23-
var (
24-
host string
25-
port string
26-
)
2722
network = ep[0]
28-
switch network {
29-
case "tcp", "udp":
30-
host, port, err = net.SplitHostPort(ep[1])
31-
if err != nil {
32-
return network, addr, err
33-
}
34-
switch port {
35-
case "0", "*", "":
36-
port = "0"
37-
}
38-
switch host {
39-
case "", "*":
40-
host = "0.0.0.0"
41-
}
42-
addr = net.JoinHostPort(host, port)
43-
return network, addr, err
4423

45-
case "ipc":
46-
host = ep[1]
47-
port = ""
48-
return network, host, nil
49-
case "inproc":
50-
host = ep[1]
51-
return "inproc", host, nil
52-
default:
53-
err = fmt.Errorf("zmq4: unknown protocol %q", network)
24+
trans, ok := drivers.get(network)
25+
if !ok {
26+
err = fmt.Errorf("zmq4: unknown transport %q", network)
27+
return network, addr, err
5428
}
5529

30+
addr, err = trans.Addr(ep[1])
5631
return network, addr, err
5732
}
5833

utils_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,20 @@ func TestSplitAddr(t *testing.T) {
3838
addr: "[::1]:7000",
3939
err: nil,
4040
},
41+
{
42+
desc: "ipc",
43+
v: "ipc://some-ep",
44+
network: "ipc",
45+
addr: "some-ep",
46+
err: nil,
47+
},
48+
{
49+
desc: "inproc",
50+
v: "inproc://some-ep",
51+
network: "inproc",
52+
addr: "some-ep",
53+
err: nil,
54+
},
4155
}
4256
for _, tc := range testCases {
4357
t.Run(tc.desc, func(t *testing.T) {

0 commit comments

Comments
 (0)