Skip to content

Commit d329406

Browse files
committed
init
0 parents  commit d329406

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+2684
-0
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
.idea
2+
.env
3+
config.yaml

Dockerfile

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
## 1. 构建应用 ##
2+
FROM golang:1.15-buster as builder
3+
4+
ARG APP_MODULE="github.com/Gopusher/gateway"
5+
ARG APP_NAME="gateway"
6+
7+
RUN export GO111MODULE=on \
8+
&& export GOPROXY=https://goproxy.io \
9+
&& mkdir -p /go/src/${APP_MODULE}
10+
11+
COPY . /go/src/${APP_MODULE}
12+
13+
# RUN echo /go/src/${APP_MODULE}
14+
# RUN echo /go/bin/${APP_NAME} ${APP_NAME}.go
15+
16+
RUN cd /go/src/${APP_MODULE} \
17+
&& CGO_ENABLED=0 go build -ldflags '-s -w' -o /go/bin/${APP_NAME} ./app/${APP_NAME}/app/cmd/main.go
18+
19+
## 2. 应用 ##
20+
FROM debian:buster
21+
22+
ARG OLD_APP_NAME="gateway"
23+
ARG APP_NAME="gopusher"
24+
25+
26+
# 增加根证书
27+
RUN apt-get update \
28+
&& apt-get install ca-certificates -y
29+
30+
COPY --from=builder /go/bin/${OLD_APP_NAME} /usr/local/bin/${APP_NAME}
31+
32+
# .env => /app/.env
33+
WORKDIR /app
34+
VOLUME /app
35+
36+
EXPOSE 8900 8901
37+
38+
COPY docker-entrypoint.sh /usr/local/bin/
39+
ENTRYPOINT ["docker-entrypoint.sh"]
40+
41+
CMD ["${APP_NAME}", "start", "-c", "config.yaml"]

app/gateway/app/api/anycall.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package api
2+
3+
import (
4+
"encoding/json"
5+
)
6+
7+
type AnyCallMessage struct {
8+
TokenMessage
9+
Method string `json:"method"`
10+
Args json.RawMessage `json:"args"`
11+
}
12+
13+
//获取部分 order book
14+
func (s *Server) AnyCall(message *AnyCallMessage, reply *Response) error {
15+
if errResp := s.checkToken(message.Token); errResp != nil {
16+
*reply = *errResp
17+
return nil
18+
}
19+
//log.Debug("AnyCall method: " + message.Method + ", args: " + string(message.Args))
20+
21+
data, err := s.server.AnyCall(message.Method, message.Args)
22+
if err != nil {
23+
*reply = s.failure(ServerErrorCode, err.Error())
24+
return nil
25+
}
26+
27+
*reply = s.success(data)
28+
return nil
29+
}

app/gateway/app/api/api.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package api
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"net/rpc"
7+
"net/rpc/jsonrpc"
8+
9+
"github.com/gopusher/gateway/app/gateway/app/protocols"
10+
"github.com/gopusher/gateway/pkg/log"
11+
"go.uber.org/zap"
12+
)
13+
14+
type Config struct {
15+
Address string `mapstructure:"address" validate:"required"`
16+
Token string `mapstructure:"token"`
17+
}
18+
19+
//Server is api server
20+
type Server struct {
21+
node string
22+
server protocols.Server
23+
config *Config
24+
}
25+
26+
//InitRpcServer init rpc server
27+
func InitRpcServer(node string, server protocols.Server, config *Config) {
28+
if err := rpc.Register(&Server{
29+
node: node,
30+
server: server,
31+
config: config,
32+
}); err != nil {
33+
log.Panic("Gateway api server run failed, error: "+err.Error(), zap.Error(err))
34+
}
35+
36+
listener, err := net.Listen("tcp", config.Address)
37+
if err != nil {
38+
log.Panic("Gateway api server run failed, error: "+err.Error(), zap.Error(err))
39+
}
40+
41+
log.Info(fmt.Sprintf(
42+
"Gateway api server start running, Node: %s, gateway api address: %s, token: %s",
43+
node, config.Address, config.Token,
44+
))
45+
defer func() {
46+
if err := listener.Close(); err != nil {
47+
log.Panic("Gateway api server run failed, error: "+err.Error(), zap.Error(err))
48+
}
49+
}()
50+
for {
51+
conn, err := listener.Accept()
52+
if err != nil {
53+
continue
54+
}
55+
56+
go jsonrpc.ServeConn(conn)
57+
}
58+
}
59+
60+
//TokenMessage is token type message
61+
type TokenMessage struct {
62+
Token string `json:"token"` //作为消息发送鉴权
63+
}
64+
65+
//ConnectionsMessage is a connection type message
66+
type ConnectionsMessage struct {
67+
Connections []string `json:"connections"` //消息接受者
68+
TokenMessage
69+
}
70+
71+
//Response is api response
72+
type Response struct {
73+
Code int `json:"code"`
74+
Data interface{} `json:"data"`
75+
Error string `json:"error"`
76+
}
77+
78+
const (
79+
SuccessCode = 20000
80+
81+
ParametersError = 40000
82+
83+
TokenErrorCode = 40100
84+
85+
ServerErrorCode = 50000
86+
87+
SendToConnectionsErrorCode = 50001
88+
)
89+
90+
func (s *Server) success(data interface{}) Response {
91+
return Response{
92+
Code: SuccessCode,
93+
Data: data,
94+
Error: "",
95+
}
96+
}
97+
98+
func (s *Server) failure(code int, err string) Response {
99+
return Response{
100+
Code: code,
101+
Data: "",
102+
Error: err,
103+
}
104+
}
105+
106+
func (s *Server) failureWithData(code int, data interface{}, err string) Response {
107+
return Response{
108+
Code: code,
109+
Data: data,
110+
Error: err,
111+
}
112+
}
113+
114+
func (s *Server) checkToken(token string) *Response {
115+
if token != s.config.Token {
116+
resp := s.failure(TokenErrorCode, "error rpc token")
117+
return &resp
118+
}
119+
120+
return nil
121+
}

app/gateway/app/api/check_online.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package api
2+
3+
//CheckConnectionsOnline check connections is online
4+
func (s *Server) CheckConnectionsOnline(message *ConnectionsMessage, reply *Response) error {
5+
if errResp := s.checkToken(message.Token); errResp != nil {
6+
*reply = *errResp
7+
return nil
8+
}
9+
10+
var onlineConnections []string
11+
if len(message.Connections) > 0 {
12+
onlineConnections = s.server.CheckConnectionsOnline(message.Connections)
13+
}
14+
15+
*reply = s.success(onlineConnections)
16+
return nil
17+
}
18+
19+
//GetAllConnections returns all connections
20+
func (s *Server) GetAllConnections(message *TokenMessage, reply *Response) error {
21+
if errResp := s.checkToken(message.Token); errResp != nil {
22+
*reply = *errResp
23+
return nil
24+
}
25+
26+
onlineConnections := s.server.GetAllConnections()
27+
28+
*reply = s.success(onlineConnections)
29+
return nil
30+
}

app/gateway/app/api/client/example.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"net/rpc/jsonrpc"
7+
"os"
8+
)
9+
10+
type message struct {
11+
Connections []string `json:"connections"` //消息接受者
12+
Msg string `json:"msg"` //为一个json,里边包含 type 消息类型
13+
Token string `json:"token"` //作为消息发送鉴权
14+
}
15+
16+
type kickMessage struct {
17+
Connections []string `json:"connections"` //消息接受者
18+
Token string `json:"token"` //作为消息发送鉴权
19+
}
20+
21+
func main() {
22+
if len(os.Args) < 2 {
23+
log.Fatal("消息发送参数格式: go run ./example.go msg ...to")
24+
}
25+
26+
//连接远程rpc服务
27+
//这里使用jsonrpc.Dial
28+
//todo 这里的 ip 要注意
29+
rpc, err := jsonrpc.Dial("tcp", "192.168.3.165:8901")
30+
if err != nil {
31+
log.Fatal(err)
32+
}
33+
//response 为 json 字符串
34+
var response string
35+
//调用远程方法
36+
//注意第三个参数是指针类型
37+
38+
//发送消息
39+
err2 := rpc.Call("Server.SendToConnections", &message{
40+
Connections: os.Args[2:],
41+
Msg: os.Args[1],
42+
Token: "token",
43+
}, &response)
44+
45+
//kick conns
46+
//err2 := rpc.Call("Server.KickConnections", &kickMessage{
47+
// Connections: os.Args[1:],
48+
// Token: "token",
49+
//}, &response)
50+
51+
if err2 != nil {
52+
log.Fatal(err2)
53+
}
54+
55+
fmt.Println(response)
56+
}

app/gateway/app/api/client/example.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# -*- coding: UTF-8 -*-
2+
import json
3+
import socket
4+
5+
6+
class Rpc(object):
7+
host: str
8+
port: int
9+
token: str
10+
conn: socket.socket
11+
12+
def __init__(self, host, port, token):
13+
self.host = host
14+
self.port = port
15+
self.token = token
16+
# self.conn = self.get_connect()
17+
18+
def connect(self):
19+
if self.conn is None:
20+
self.conn = socket.create_connection((self.host, self.port), timeout=1.5)
21+
return self.conn
22+
23+
def __getattr__(self, method):
24+
if method in ["conn"]:
25+
return None
26+
27+
def func(**kwargs):
28+
# args = kwargs if len(kwargs) else args
29+
params = {
30+
'token': self.token,
31+
}
32+
if kwargs is not None:
33+
params.update(kwargs)
34+
35+
data = {
36+
'method': "Server." + method,
37+
'params': [params],
38+
}
39+
40+
return self.execute(data)
41+
42+
return func
43+
44+
def execute(self, data):
45+
self.connect()
46+
47+
msg_id = 0
48+
data['id'] = msg_id
49+
msg = json.dumps(data)
50+
self.conn.sendall(msg.encode())
51+
52+
resp = self.read_line()
53+
if not resp:
54+
self.close()
55+
raise Exception("rpc 获取数据失败, Not resp, server gone")
56+
57+
resp = json.loads(resp)
58+
59+
if resp.get('id') != msg_id:
60+
raise Exception("expected id=%s, received id=%s: %s"
61+
% (msg_id, resp.get('id'), resp.get('error')))
62+
63+
if resp.get('error') is not None:
64+
raise Exception(resp.get('error'))
65+
66+
data = json.loads(resp.get('result'))
67+
if data['code'] != '0':
68+
raise Exception("rpc 获取数据失败: %s" % data['error'])
69+
70+
return data
71+
72+
def read_line(self):
73+
# return self.conn.makefile().readline()
74+
75+
ret = b''
76+
while True:
77+
c = self.conn.recv(1)
78+
if c == b'\n' or c == b'':
79+
break
80+
else:
81+
ret += c
82+
83+
return ret.decode("utf-8")
84+
85+
def close(self):
86+
self.conn.close()
87+
self.conn = None
88+
89+
90+
if __name__ == '__main__':
91+
rpc = Rpc('127.0.0.1', 8901, 'token')
92+
print(rpc.SendToConnections(connections=['a', 'b']))

0 commit comments

Comments
 (0)