{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
module System.IO.Streams.Concurrent
(
inputToChan
, chanToInput
, chanToOutput
, concurrentMerge
, makeChanPipe
) where
#if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<$>), (<*>))
#endif
import Control.Concurrent (forkIO)
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
import Control.Concurrent.MVar (modifyMVar, newEmptyMVar, newMVar, putMVar, takeMVar)
import Control.Exception (SomeException, mask, throwIO, try)
import Control.Monad (forM_)
import Prelude hiding (read)
import System.IO.Streams.Internal (InputStream, OutputStream, makeInputStream, makeOutputStream, nullInput, read)
inputToChan :: InputStream a -> Chan (Maybe a) -> IO ()
inputToChan :: forall a. InputStream a -> Chan (Maybe a) -> IO ()
inputToChan InputStream a
is Chan (Maybe a)
ch = IO ()
go
where
go :: IO ()
go = do
Maybe a
mb <- forall a. InputStream a -> IO (Maybe a)
read InputStream a
is
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe a)
ch Maybe a
mb
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$! ()) (forall a b. a -> b -> a
const IO ()
go) Maybe a
mb
chanToInput :: Chan (Maybe a) -> IO (InputStream a)
chanToInput :: forall a. Chan (Maybe a) -> IO (InputStream a)
chanToInput Chan (Maybe a)
ch = forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream forall a b. (a -> b) -> a -> b
$! forall a. Chan a -> IO a
readChan Chan (Maybe a)
ch
chanToOutput :: Chan (Maybe a) -> IO (OutputStream a)
chanToOutput :: forall a. Chan (Maybe a) -> IO (OutputStream a)
chanToOutput = forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Chan a -> a -> IO ()
writeChan
concurrentMerge :: [InputStream a] -> IO (InputStream a)
concurrentMerge :: forall a. [InputStream a] -> IO (InputStream a)
concurrentMerge [] = forall a. IO (InputStream a)
nullInput
concurrentMerge [InputStream a]
iss = do
MVar (Either SomeException (Maybe a))
mv <- forall a. IO (MVar a)
newEmptyMVar
MVar Int
nleft <- forall a. a -> IO (MVar a)
newMVar forall a b. (a -> b) -> a -> b
$! forall (t :: * -> *) a. Foldable t => t a -> Int
length [InputStream a]
iss
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [InputStream a]
iss forall a b. (a -> b) -> a -> b
$ \InputStream a
is -> IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ do
let producer :: IO ()
producer = do
Either SomeException (Maybe a)
emb <- forall e a. Exception e => IO a -> IO (Either e a)
try forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO a
restore forall a b. (a -> b) -> a -> b
$ forall a. InputStream a -> IO (Maybe a)
read InputStream a
is
case Either SomeException (Maybe a)
emb of
Left SomeException
exc -> do forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Maybe a))
mv (forall a b. a -> Either a b
Left (SomeException
exc :: SomeException))
IO ()
producer
Right Maybe a
Nothing -> forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Maybe a))
mv forall a b. (a -> b) -> a -> b
$! forall a b. b -> Either a b
Right forall a. Maybe a
Nothing
Right Maybe a
x -> forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Maybe a))
mv (forall a b. b -> Either a b
Right Maybe a
x) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
producer
IO ()
producer
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream forall a b. (a -> b) -> a -> b
$ forall {e} {a} {a}.
(Exception e, Ord a, Num a) =>
MVar (Either e (Maybe a)) -> MVar a -> IO (Maybe a)
chunk MVar (Either SomeException (Maybe a))
mv MVar Int
nleft
where
chunk :: MVar (Either e (Maybe a)) -> MVar a -> IO (Maybe a)
chunk MVar (Either e (Maybe a))
mv MVar a
nleft = do
Either e (Maybe a)
emb <- forall a. MVar a -> IO a
takeMVar MVar (Either e (Maybe a))
mv
case Either e (Maybe a)
emb of
Left e
exc -> forall e a. Exception e => e -> IO a
throwIO e
exc
Right Maybe a
Nothing -> do a
x <- forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar a
nleft forall a b. (a -> b) -> a -> b
$ \a
n ->
let !n' :: a
n' = a
n forall a. Num a => a -> a -> a
- a
1
in forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$! (a
n', a
n')
if a
x forall a. Ord a => a -> a -> Bool
> a
0
then MVar (Either e (Maybe a)) -> MVar a -> IO (Maybe a)
chunk MVar (Either e (Maybe a))
mv MVar a
nleft
else forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Right Maybe a
x -> forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
x
makeChanPipe :: IO (InputStream a, OutputStream a)
makeChanPipe :: forall a. IO (InputStream a, OutputStream a)
makeChanPipe = do
Chan (Maybe a)
chan <- forall a. IO (Chan a)
newChan
(,) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Chan (Maybe a) -> IO (InputStream a)
chanToInput Chan (Maybe a)
chan forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. Chan (Maybe a) -> IO (OutputStream a)
chanToOutput Chan (Maybe a)
chan