{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE CPP #-}
module Data.Conduit.Network
(
sourceSocket
, sinkSocket
, SN.AppData
, appSource
, appSink
, SN.appSockAddr
, SN.appLocalAddr
, SN.ServerSettings
, serverSettings
, SN.runTCPServer
, SN.runTCPServerWithHandle
, forkTCPServer
, runGeneralTCPServer
, SN.ClientSettings
, clientSettings
, SN.runTCPClient
, runGeneralTCPClient
, SN.getPort
, SN.getHost
, SN.getAfterBind
, SN.getNeedLocalAddr
, SN.setPort
, SN.setHost
, SN.setAfterBind
, SN.setNeedLocalAddr
, SN.HostPreference
) where
import Prelude
import Data.Conduit
import Network.Socket (Socket)
import Network.Socket.ByteString (sendAll)
import Data.ByteString (ByteString)
import qualified GHC.Conc as Conc (yield)
import qualified Data.ByteString as S
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad (unless)
import Control.Monad.Trans.Class (lift)
import Control.Concurrent (forkIO, newEmptyMVar, putMVar, takeMVar, MVar, ThreadId)
import qualified Data.Streaming.Network as SN
import Control.Monad.IO.Unlift (MonadUnliftIO, withRunInIO)
sourceSocket :: MonadIO m => Socket -> ConduitT i ByteString m ()
sourceSocket :: forall (m :: * -> *) i.
MonadIO m =>
Socket -> ConduitT i ByteString m ()
sourceSocket Socket
socket =
forall {i}. ConduitT i ByteString m ()
loop
where
loop :: ConduitT i ByteString m ()
loop = do
ByteString
bs <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Socket -> Int -> IO ByteString
SN.safeRecv Socket
socket Int
4096
if ByteString -> Bool
S.null ByteString
bs
then forall (m :: * -> *) a. Monad m => a -> m a
return ()
else forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield ByteString
bs forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT i ByteString m ()
loop
sinkSocket :: MonadIO m => Socket -> ConduitT ByteString o m ()
sinkSocket :: forall (m :: * -> *) o.
MonadIO m =>
Socket -> ConduitT ByteString o m ()
sinkSocket Socket
socket =
forall {o}. ConduitT ByteString o m ()
loop
where
loop :: ConduitT ByteString o m ()
loop = forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (m :: * -> *) a. Monad m => a -> m a
return ()) (\ByteString
bs -> forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Socket -> ByteString -> IO ()
sendAll Socket
socket ByteString
bs) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT ByteString o m ()
loop)
serverSettings :: Int -> SN.HostPreference -> SN.ServerSettings
serverSettings :: Int -> HostPreference -> ServerSettings
serverSettings = Int -> HostPreference -> ServerSettings
SN.serverSettingsTCP
clientSettings :: Int -> ByteString -> SN.ClientSettings
clientSettings :: Int -> ByteString -> ClientSettings
clientSettings = Int -> ByteString -> ClientSettings
SN.clientSettingsTCP
appSource :: (SN.HasReadWrite ad, MonadIO m) => ad -> ConduitT i ByteString m ()
appSource :: forall ad (m :: * -> *) i.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT i ByteString m ()
appSource ad
ad =
forall {i}. ConduitT i ByteString m ()
loop
where
read' :: IO ByteString
read' = forall a. HasReadWrite a => a -> IO ByteString
SN.appRead ad
ad
loop :: ConduitT i ByteString m ()
loop = do
ByteString
bs <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ByteString
read'
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
S.null ByteString
bs) forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield ByteString
bs
ConduitT i ByteString m ()
loop
appSink :: (SN.HasReadWrite ad, MonadIO m) => ad -> ConduitT ByteString o m ()
appSink :: forall ad (m :: * -> *) o.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT ByteString o m ()
appSink ad
ad = forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever forall a b. (a -> b) -> a -> b
$ \ByteString
d -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. HasReadWrite a => a -> ByteString -> IO ()
SN.appWrite ad
ad ByteString
d forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
Conc.yield
addBoundSignal::MVar ()-> SN.ServerSettings -> SN.ServerSettings
addBoundSignal :: MVar () -> ServerSettings -> ServerSettings
addBoundSignal MVar ()
isBound ServerSettings
set = forall a. HasAfterBind a => (Socket -> IO ()) -> a -> a
SN.setAfterBind ( \Socket
socket -> Socket -> IO ()
originalAfterBind Socket
socket forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Socket -> IO ()
signalBound Socket
socket) ServerSettings
set
where originalAfterBind :: Socket -> IO ()
originalAfterBind :: Socket -> IO ()
originalAfterBind = forall a. HasAfterBind a => a -> Socket -> IO ()
SN.getAfterBind ServerSettings
set
signalBound :: Socket -> IO ()
signalBound :: Socket -> IO ()
signalBound Socket
_socket = forall a. MVar a -> a -> IO ()
putMVar MVar ()
isBound ()
forkTCPServer
:: MonadUnliftIO m
=> SN.ServerSettings
-> (SN.AppData -> m ())
-> m ThreadId
forkTCPServer :: forall (m :: * -> *).
MonadUnliftIO m =>
ServerSettings -> (AppData -> m ()) -> m ThreadId
forkTCPServer ServerSettings
set AppData -> m ()
f =
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
MVar ()
isBound <- forall a. IO (MVar a)
newEmptyMVar
let setWithWaitForBind :: ServerSettings
setWithWaitForBind = MVar () -> ServerSettings -> ServerSettings
addBoundSignal MVar ()
isBound ServerSettings
set
ThreadId
threadId <- IO () -> IO ThreadId
forkIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. m a -> IO a
run forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadUnliftIO m =>
ServerSettings -> (AppData -> m ()) -> m a
runGeneralTCPServer ServerSettings
setWithWaitForBind AppData -> m ()
f
forall a. MVar a -> IO a
takeMVar MVar ()
isBound
forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
threadId
runGeneralTCPServer
:: MonadUnliftIO m
=> SN.ServerSettings
-> (SN.AppData -> m ())
-> m a
runGeneralTCPServer :: forall (m :: * -> *) a.
MonadUnliftIO m =>
ServerSettings -> (AppData -> m ()) -> m a
runGeneralTCPServer ServerSettings
set AppData -> m ()
f = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run ->
forall a. ServerSettings -> (AppData -> IO ()) -> IO a
SN.runTCPServer ServerSettings
set forall a b. (a -> b) -> a -> b
$ forall a. m a -> IO a
run forall b c a. (b -> c) -> (a -> b) -> a -> c
. AppData -> m ()
f
runGeneralTCPClient
:: MonadUnliftIO m
=> SN.ClientSettings
-> (SN.AppData -> m a)
-> m a
runGeneralTCPClient :: forall (m :: * -> *) a.
MonadUnliftIO m =>
ClientSettings -> (AppData -> m a) -> m a
runGeneralTCPClient ClientSettings
set AppData -> m a
f = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run ->
forall a. ClientSettings -> (AppData -> IO a) -> IO a
SN.runTCPClient ClientSettings
set forall a b. (a -> b) -> a -> b
$ forall a. m a -> IO a
run forall b c a. (b -> c) -> (a -> b) -> a -> c
. AppData -> m a
f