Skip to content

Add UNIX socket to serve RPC #83

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions msgpack-aeson/msgpack-aeson.cabal
Original file line number Diff line number Diff line change
@@ -23,16 +23,15 @@ library
hs-source-dirs: src
exposed-modules: Data.MessagePack.Aeson

build-depends: base >= 4.7 && < 4.14
, aeson >= 0.8.0.2 && < 0.12
|| >= 1.0 && < 1.5
, bytestring >= 0.10.4 && < 0.11
, msgpack >= 1.1.0 && < 1.2
, scientific >= 0.3.2 && < 0.4
, text >= 1.2.3 && < 1.3
, unordered-containers >= 0.2.5 && < 0.3
, vector >= 0.10.11 && < 0.13
, deepseq >= 1.3 && < 1.5
build-depends: base == 4.14.*
, aeson == 1.5.*
, bytestring == 0.10.*
, msgpack == 1.2.*
, scientific == 0.3.*
, text == 1.2.*
, unordered-containers == 0.2.*
, vector == 0.12.*
, deepseq == 1.4.*

default-language: Haskell2010

@@ -48,7 +47,7 @@ test-suite msgpack-aeson-test
, aeson
, msgpack
-- test-specific dependencies
, tasty == 1.2.*
, tasty-hunit == 0.10.*
, tasty
, tasty-hunit

default-language: Haskell2010
35 changes: 18 additions & 17 deletions msgpack-rpc/msgpack-rpc.cabal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cabal-version: 1.12
name: msgpack-rpc
version: 1.0.0
version: 1.1.0

synopsis: A MessagePack-RPC Implementation
description: A MessagePack-RPC Implementation <http://msgpack.org/>
@@ -26,19 +26,19 @@ library
exposed-modules: Network.MessagePack.Server
Network.MessagePack.Client

build-depends: base >= 4.5 && < 4.13
, bytestring >= 0.10.4 && < 0.11
, text >= 1.2.3 && < 1.3
, network >= 2.6 && < 2.9
|| >= 3.0 && < 3.1
, mtl >= 2.2.1 && < 2.3
, monad-control >= 1.0.0.0 && < 1.1
, conduit >= 1.2.3.1 && < 1.3
, conduit-extra >= 1.1.3.4 && < 1.3
, binary-conduit >= 1.2.3 && < 1.3
, exceptions >= 0.8 && < 0.11
, binary >= 0.7.1 && < 0.9
, msgpack >= 1.1.0 && < 1.2
build-depends: base == 4.14.*
, binary == 0.8.*
, bytestring == 0.10.*
, binary-conduit == 1.3.*
, conduit == 1.3.*
, conduit-extra == 1.3.*
, exceptions == 0.10.*
, msgpack == 1.2.*
, mtl == 2.2.*
, monad-control == 1.0.*
, network == 3.1.*
, streaming-commons == 0.2.*
, text == 1.2.*

test-suite msgpack-rpc-test
default-language: Haskell2010
@@ -49,9 +49,10 @@ test-suite msgpack-rpc-test
build-depends: msgpack-rpc
-- inherited constraints via `msgpack-rpc`
, base
, conduit-extra == 1.3.*
, mtl
, network
-- test-specific dependencies
, async == 2.2.*
, tasty == 1.2.*
, tasty-hunit == 0.10.*
, async
, tasty
, tasty-hunit
40 changes: 33 additions & 7 deletions msgpack-rpc/src/Network/MessagePack/Client.hs
Original file line number Diff line number Diff line change
@@ -30,13 +30,28 @@

module Network.MessagePack.Client (
-- * MessagePack Client type
Client, execClient,
Client, execClient, execClientUnix,

-- * Call RPC method
call,

-- * RPC error
RpcError(..),

-- * Settings
ClientSettings,
clientSettings,
U.ClientSettingsUnix,
SN.clientSettingsUnix,

-- * Getters & setters
SN.serverSettingsUnix,
SN.getReadBufferSize,
SN.setReadBufferSize,
getAfterBind,
setAfterBind,
getPort,
setPort,
) where

import Control.Applicative
@@ -49,25 +64,36 @@ import qualified Data.ByteString as S
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import Data.Conduit.Network
import qualified Data.Conduit.Network.Unix as U
import Data.Conduit.Serialization.Binary
import Data.MessagePack
import qualified Data.Streaming.Network as SN
import Data.Typeable
import System.IO

clientSettingsUnix :: FilePath -> U.ClientSettingsUnix
clientSettingsUnix = U.clientSettings

newtype Client a
= ClientT { runClient :: StateT Connection IO a }
deriving (Functor, Applicative, Monad, MonadIO, MonadThrow)

-- | RPC connection type
data Connection
= Connection
!(ResumableSource IO S.ByteString)
!(Sink S.ByteString IO ())
!(SealedConduitT () S.ByteString IO ())
!(ConduitT S.ByteString Void IO ())
!Int

execClient :: S.ByteString -> Int -> Client a -> IO ()
execClient host port m =
runTCPClient (clientSettings port host) $ \ad -> do
execClient :: ClientSettings -> Client a -> IO ()
execClient settings m =
runTCPClient settings $ \ad -> do
(rsrc, _) <- appSource ad $$+ return ()
void $ evalStateT (runClient m) (Connection rsrc (appSink ad) 0)

execClientUnix :: U.ClientSettingsUnix -> Client a -> IO ()
execClientUnix settings m =
U.runUnixClient settings $ \ad -> do
(rsrc, _) <- appSource ad $$+ return ()
void $ evalStateT (runClient m) (Connection rsrc (appSink ad) 0)

@@ -97,7 +123,7 @@ rpcCall :: String -> [Object] -> Client Object
rpcCall methodName args = ClientT $ do
Connection rsrc sink msgid <- CMS.get
(rsrc', res) <- lift $ do
CB.sourceLbs (pack (0 :: Int, msgid, methodName, args)) $$ sink
runConduit $ CB.sourceLbs (pack (0 :: Int, msgid, methodName, args)) .| sink
rsrc $$++ sinkGet Binary.get
CMS.put $ Connection rsrc' sink (msgid + 1)

66 changes: 51 additions & 15 deletions msgpack-rpc/src/Network/MessagePack/Server.hs
Original file line number Diff line number Diff line change
@@ -39,20 +39,40 @@ module Network.MessagePack.Server (
method,
-- * Start RPC server
serve,
serveUnix,

-- * RPC server settings
ServerSettings,
serverSettings,
U.ServerSettingsUnix,

-- * Getters & setters
SN.serverSettingsUnix,
SN.getReadBufferSize,
SN.setReadBufferSize,
getAfterBind,
setAfterBind,
getPort,
setPort,
) where

import Conduit (MonadUnliftIO)
import Control.Applicative
import Control.Monad
import Control.Monad.Catch
import Control.Monad.Trans
import Control.Monad.Trans.Control
import Data.Binary
import Data.ByteString (ByteString)
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import Data.Conduit.Network
import qualified Data.Conduit.Network.Unix as U
import Data.Conduit.Serialization.Binary
import Data.List
import Data.MessagePack
import Data.MessagePack.Result
import qualified Data.Streaming.Network as SN
import Data.Typeable

-- ^ MessagePack RPC method
@@ -100,25 +120,41 @@ method :: MethodType m f
-> Method m
method name body = Method name $ toBody body

-- | Start RPC server with a set of RPC methods.
serve :: (MonadBaseControl IO m, MonadIO m, MonadCatch m, MonadThrow m)
=> Int -- ^ Port number
-> [Method m] -- ^ list of methods
-- | Start an RPC server with a set of RPC methods on a TCP socket.
serve :: (MonadBaseControl IO m, MonadUnliftIO m, MonadIO m, MonadCatch m, MonadThrow m)
=> ServerSettings -- ^ settings
-> [Method m] -- ^ list of methods
-> m ()
serve port methods = runGeneralTCPServer (serverSettings port "*") $ \ad -> do
serve settings methods = runGeneralTCPServer settings $ \ad -> do
(rsrc, _) <- appSource ad $$+ return ()
(_ :: Either ParseError ()) <- try $ processRequests rsrc (appSink ad)
(_ :: Either ParseError ()) <- try $ processRequests methods rsrc (appSink ad)
return ()
where
processRequests rsrc sink = do
(rsrc', res) <- rsrc $$++ do
obj <- sinkGet get
case fromObject obj of
Error e -> throwM $ ServerError e
Success req -> lift $ getResponse (req :: Request)
_ <- CB.sourceLbs (pack res) $$ sink
processRequests rsrc' sink

-- | Start an RPC server with a set of RPC methods on a Unix domain socket.
serveUnix :: (MonadBaseControl IO m, MonadIO m, MonadCatch m, MonadThrow m)
=> U.ServerSettingsUnix
-> [Method m] -- ^ list of methods
-> m ()
serveUnix settings methods = liftBaseWith $ \run ->
U.runUnixServer settings $ \ad -> void . run $ do
(rsrc, _) <- appSource ad $$+ return ()
(_ :: Either ParseError ()) <- try $ processRequests methods rsrc (appSink ad)
return ()

processRequests :: (MonadThrow m)
=> [Method m] -- ^ list of methods
-> SealedConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> m b
processRequests methods rsrc sink = do
(rsrc', res) <- rsrc $$++ do
obj <- sinkGet get
case fromObject obj of
Error err -> throwM $ ServerError $ "invalid request: " ++ err
Success req -> lift $ getResponse (req :: Request)
_ <- runConduit $ CB.sourceLbs (pack res) .| sink
processRequests methods rsrc' sink
where
getResponse (rtype, msgid, methodName, args) = do
when (rtype /= 0) $
throwM $ ServerError $ "request type is not 0, got " ++ show rtype
61 changes: 51 additions & 10 deletions msgpack-rpc/test/test.hs
Original file line number Diff line number Diff line change
@@ -1,26 +1,66 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.Chan
import Control.Monad.Trans
import Test.Tasty
import Test.Tasty.HUnit

import Network.MessagePack.Client
import Network.MessagePack.Server
import Network.Socket (withSocketsDo)
import Network.Socket (Socket, withSocketsDo)

import System.IO (openTempFile)

port :: Int
port = 5000

main :: IO ()
main = withSocketsDo $ defaultMain $
testGroup "simple service"
[ testCase "test" $ server `race_` (threadDelay 1000 >> client) ]
main = do
(f, _) <- openTempFile "/tmp" "socket.sock"
withSocketsDo $ defaultMain $
testGroup "simple service"
[ testCase "test TCP" $ testClientServer (clientTCP port) (serverTCP port)
, testCase "test Unix" $ testClientServer (clientUnix f) (serverUnix f) ]

testClientServer :: IO () -> ((Socket -> IO ()) -> IO ()) -> IO ()
testClientServer client server = do
(okChan :: Chan ()) <- newChan
forkIO $ server (const $ writeChan okChan ())
readChan okChan
client

serverTCP :: Int -> (Socket -> IO ()) -> IO ()
serverTCP port afterBind =
serve (setAfterBind afterBind $ serverSettings port "*")
[ method "add" add
, method "echo" echo
]
where
add :: Int -> Int -> Server Int
add x y = return $ x + y

echo :: String -> Server String
echo s = return $ "***" ++ s ++ "***"

server :: IO ()
server =
serve port
clientTCP :: Int -> IO ()
clientTCP port = execClient (clientSettings port "localhost") $ do
r1 <- add 123 456
liftIO $ r1 @?= 123 + 456
r2 <- echo "hello"
liftIO $ r2 @?= "***hello***"
where
add :: Int -> Int -> Client Int
add = call "add"

echo :: String -> Client String
echo = call "echo"

serverUnix :: FilePath -> (Socket -> IO ()) -> IO ()
serverUnix path afterBind =
serveUnix (setAfterBind afterBind $ serverSettingsUnix path)
[ method "add" add
, method "echo" echo
]
@@ -31,8 +71,8 @@ server =
echo :: String -> Server String
echo s = return $ "***" ++ s ++ "***"

client :: IO ()
client = execClient "localhost" port $ do
clientUnix :: FilePath -> IO ()
clientUnix path = execClientUnix (clientSettingsUnix path) $ do
r1 <- add 123 456
liftIO $ r1 @?= 123 + 456
r2 <- echo "hello"
@@ -43,3 +83,4 @@ client = execClient "localhost" port $ do

echo :: String -> Client String
echo = call "echo"

41 changes: 20 additions & 21 deletions msgpack/msgpack.cabal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cabal-version: 1.12
name: msgpack
version: 1.1.0.0
version: 1.2.0.0

synopsis: A Haskell implementation of MessagePack
description:
@@ -64,27 +64,26 @@ library
Data.MessagePack.Object
Data.MessagePack.Get
Data.MessagePack.Put
Data.MessagePack.Result

other-modules: Data.MessagePack.Tags
Data.MessagePack.Result
Data.MessagePack.Get.Internal
Compat.Binary
Compat.Prelude

build-depends: base >= 4.7 && < 4.14
, mtl >= 2.2.1 && < 2.3
, bytestring >= 0.10.4 && < 0.11
, text >= 1.2.3 && < 1.3
, containers >= 0.5.5 && < 0.7
, unordered-containers >= 0.2.5 && < 0.3
, hashable >= 1.1.2.4 && < 1.4
, vector >= 0.10.11 && < 0.13
, deepseq >= 1.3 && < 1.5
, binary >= 0.7.1 && < 0.9
, semigroups >= 0.5.0 && < 0.20
, time >= 1.4.2 && < 1.10
, int-cast >= 0.1.1 && < 0.3
, array >= 0.5.0 && < 0.6
build-depends: base == 4.14.*
, mtl == 2.2.*
, bytestring == 0.10.*
, text == 1.2.*
, containers == 0.6.*
, unordered-containers == 0.2.*
, hashable == 1.3.*
, vector == 0.12.*
, deepseq == 1.4.*
, binary == 0.8.*
, time == 1.9.*
, int-cast == 0.2.*
, array == 0.5.*

if !impl(ghc > 8.0)
build-depends: fail == 4.9.*
@@ -117,8 +116,8 @@ test-suite msgpack-tests
-- test-specific dependencies
, async == 2.2.*
, filepath == 1.3.* || == 1.4.*
, HsYAML >= 0.1.1 && < 0.2
, tasty == 1.2.*
, tasty-quickcheck == 0.10.*
, tasty-hunit == 0.10.*
, QuickCheck == 2.13.*
, HsYAML >= 0.1.1
, tasty
, tasty-quickcheck
, tasty-hunit
, QuickCheck
20 changes: 9 additions & 11 deletions msgpack/test/DataCases.hs
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ genDataCases fns = testGroup "Reference Tests" <$> forM fns doFile
forM_ (zip [0..] (dcMsgPack tc)) $ \(j,b) -> do
let Right decoded = unpack (L.fromStrict b)

packLbl = "pack #" ++ (show (j::Int))
packLbl = "pack #" ++ show (j::Int)
unpackLbl = "un" ++ packLbl

-- the `number` test-cases conflate integers and floats
@@ -62,8 +62,6 @@ genDataCases fns = testGroup "Reference Tests" <$> forM fns doFile

_ -> assertEqual unpackLbl obj decoded

pure ()

pure (testGroup fn tcs)


@@ -76,26 +74,26 @@ instance FromYAML DataCase where
parseYAML = Y.withMap "DataCase" $ \m -> do
msgpack <- m .: "msgpack"

obj <- do { Just (Y.Scalar Y.SNull) <- m .:! "nil" ; pure ObjectNil }
obj <- do { Just (Y.Scalar _ Y.SNull) <- m .:! "nil" ; pure ObjectNil }
<|> do { Just b <- m .:! "bool" ; pure (ObjectBool b) }
<|> do { Just i <- m .:! "number" ; pure (ObjectInt (fromInteger i)) }
<|> do { Just s <- m .:! "bignum" ; pure (ObjectInt (read . T.unpack $ s)) }
<|> do { Just d <- m .:! "number" ; pure (ObjectDouble d) }
<|> do { Just t <- m .:! "string" ; pure (ObjectStr t) }
<|> do { Just t <- m .:! "binary" ; pure (ObjectBin (hex2bin t)) }
<|> do { Just v@(Y.Sequence _ _) <- m .:! "array" ; pure (nodeToObj v) }
<|> do { Just m'@(Y.Mapping _ _) <- m .:! "map" ; pure (nodeToObj m') }
<|> do { Just v@(Y.Sequence _ _ _) <- m .:! "array" ; pure (nodeToObj v) }
<|> do { Just m'@(Y.Mapping _ _ _) <- m .:! "map" ; pure (nodeToObj m') }
<|> do { Just (n,t) <- m .:! "ext" ; pure (ObjectExt n (hex2bin t)) }
<|> do { Just (s,ns) <- m .:! "timestamp"; pure (toObject $ mptsFromPosixSeconds2 s ns) }

pure (DataCase { dcMsgPack = map hex2bin msgpack, dcObject = obj })


nodeToObj :: Y.Node -> Object
nodeToObj (Y.Scalar sca) = scalarToObj sca
nodeToObj (Y.Sequence _ ns) = ObjectArray (Lst.fromList (map nodeToObj ns))
nodeToObj (Y.Mapping _ ns) = ObjectMap (Lst.fromList $ map (\(k,v) -> (nodeToObj k, nodeToObj v)) $ Map.toList ns)
nodeToObj (Y.Anchor _ n) = nodeToObj n
nodeToObj :: Y.Node loc -> Object
nodeToObj (Y.Scalar _ sca) = scalarToObj sca
nodeToObj (Y.Sequence _ _ ns) = ObjectArray (Lst.fromList (map nodeToObj ns))
nodeToObj (Y.Mapping _ _ ns) = ObjectMap (Lst.fromList $ map (\(k,v) -> (nodeToObj k, nodeToObj v)) $ Map.toList ns)
nodeToObj (Y.Anchor _ _ n) = nodeToObj n

scalarToObj :: Y.Scalar -> Object
scalarToObj Y.SNull = ObjectNil