{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE CPP #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Data.Conduit.Process
(
sourceCmdWithConsumer
, sourceProcessWithConsumer
, sourceCmdWithStreams
, sourceProcessWithStreams
, withCheckedProcessCleanup
, FlushInput(..)
, BuilderInput(..)
, module Data.Streaming.Process
) where
import Data.Streaming.Process
import Data.Streaming.Process.Internal
import System.Exit (ExitCode (..))
import Control.Monad.IO.Unlift (MonadIO, liftIO, MonadUnliftIO, withRunInIO, withUnliftIO, unliftIO)
import System.IO (hClose, BufferMode (NoBuffering), hSetBuffering)
import Data.Conduit
import Data.Functor (($>))
import Data.Conduit.Binary (sourceHandle, sinkHandle, sinkHandleBuilder, sinkHandleFlush)
import Data.ByteString (ByteString)
import Data.ByteString.Builder (Builder)
import Control.Concurrent.Async (runConcurrently, Concurrently(..))
import Control.Exception (onException, throwIO, finally, bracket)
#if (__GLASGOW_HASKELL__ < 710)
import Control.Applicative ((<$>), (<*>))
#endif
instance (r ~ (), MonadIO m, i ~ ByteString) => InputSource (ConduitM i o m r) where
isStdStream :: (Maybe Handle -> IO (ConduitM i o m r), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
NoBuffering forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
sinkHandle Handle
h, forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, i ~ ByteString) => InputSource (ConduitM i o m r, n r') where
isStdStream :: (Maybe Handle -> IO (ConduitM i o m r, n r'), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
NoBuffering forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> (forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
sinkHandle Handle
h, forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
hClose Handle
h), forall a. a -> Maybe a
Just StdStream
CreatePipe)
newtype BuilderInput o m r = BuilderInput (ConduitM Builder o m r)
newtype FlushInput o m r = FlushInput (ConduitM (Flush ByteString) o m r)
instance (MonadIO m, r ~ ()) => InputSource (BuilderInput o m r) where
isStdStream :: (Maybe Handle -> IO (BuilderInput o m r), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall o (m :: * -> *) r.
ConduitM Builder o m r -> BuilderInput o m r
BuilderInput forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitM Builder o m ()
sinkHandleBuilder Handle
h, forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (BuilderInput o m r, n r') where
isStdStream :: (Maybe Handle -> IO (BuilderInput o m r, n r'), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall o (m :: * -> *) r.
ConduitM Builder o m r -> BuilderInput o m r
BuilderInput forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitM Builder o m ()
sinkHandleBuilder Handle
h, forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
hClose Handle
h), forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (MonadIO m, r ~ ()) => InputSource (FlushInput o m r) where
isStdStream :: (Maybe Handle -> IO (FlushInput o m r), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall o (m :: * -> *) r.
ConduitM (Flush ByteString) o m r -> FlushInput o m r
FlushInput forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitM (Flush ByteString) o m ()
sinkHandleFlush Handle
h, forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (FlushInput o m r, n r') where
isStdStream :: (Maybe Handle -> IO (FlushInput o m r, n r'), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall o (m :: * -> *) r.
ConduitM (Flush ByteString) o m r -> FlushInput o m r
FlushInput forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitM (Flush ByteString) o m ()
sinkHandleFlush Handle
h, forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
hClose Handle
h), forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (r ~ (), MonadIO m, o ~ ByteString) => OutputSink (ConduitM i o m r) where
osStdStream :: (Maybe Handle -> IO (ConduitM i o m r), Maybe StdStream)
osStdStream = (\(Just Handle
h) -> Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
NoBuffering forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> forall (m :: * -> *) i.
MonadIO m =>
Handle -> ConduitT i ByteString m ()
sourceHandle Handle
h, forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, o ~ ByteString) => OutputSink (ConduitM i o m r, n r') where
osStdStream :: (Maybe Handle -> IO (ConduitM i o m r, n r'), Maybe StdStream)
osStdStream = (\(Just Handle
h) -> Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
NoBuffering forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> (forall (m :: * -> *) i.
MonadIO m =>
Handle -> ConduitT i ByteString m ()
sourceHandle Handle
h, forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
hClose Handle
h), forall a. a -> Maybe a
Just StdStream
CreatePipe)
sourceProcessWithConsumer :: MonadIO m
=> CreateProcess
-> ConduitT ByteString Void m a
-> m (ExitCode, a)
sourceProcessWithConsumer :: forall (m :: * -> *) a.
MonadIO m =>
CreateProcess -> ConduitT ByteString Void m a -> m (ExitCode, a)
sourceProcessWithConsumer CreateProcess
cp ConduitT ByteString Void m a
consumer = do
(ClosedStream
ClosedStream, (ConduitT () ByteString m ()
source, m ()
close), ClosedStream
ClosedStream, StreamingProcessHandle
cph) <- forall (m :: * -> *) stdin stdout stderr.
(MonadIO m, InputSource stdin, OutputSink stdout,
OutputSink stderr) =>
CreateProcess -> m (stdin, stdout, stderr, StreamingProcessHandle)
streamingProcess CreateProcess
cp
a
res <- forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit forall a b. (a -> b) -> a -> b
$ ConduitT () ByteString m ()
source forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void m a
consumer
m ()
close
ExitCode
ec <- forall (m :: * -> *).
MonadIO m =>
StreamingProcessHandle -> m ExitCode
waitForStreamingProcess StreamingProcessHandle
cph
forall (m :: * -> *) a. Monad m => a -> m a
return (ExitCode
ec, a
res)
sourceCmdWithConsumer :: MonadIO m
=> String
-> ConduitT ByteString Void m a
-> m (ExitCode, a)
sourceCmdWithConsumer :: forall (m :: * -> *) a.
MonadIO m =>
String -> ConduitT ByteString Void m a -> m (ExitCode, a)
sourceCmdWithConsumer String
cmd = forall (m :: * -> *) a.
MonadIO m =>
CreateProcess -> ConduitT ByteString Void m a -> m (ExitCode, a)
sourceProcessWithConsumer (String -> CreateProcess
shell String
cmd)
sourceProcessWithStreams
:: MonadUnliftIO m
=> CreateProcess
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceProcessWithStreams :: forall (m :: * -> *) a b.
MonadUnliftIO m =>
CreateProcess
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceProcessWithStreams CreateProcess
cp ConduitT () ByteString m ()
producerStdin ConduitT ByteString Void m a
consumerStdout ConduitT ByteString Void m b
consumerStderr =
forall (m :: * -> *) a.
MonadUnliftIO m =>
(UnliftIO m -> IO a) -> m a
withUnliftIO forall a b. (a -> b) -> a -> b
$ \UnliftIO m
u -> do
( (ConduitT ByteString Void m ()
sinkStdin, IO ()
closeStdin)
, (ConduitT () ByteString m ()
sourceStdout, IO ()
closeStdout)
, (ConduitT () ByteString m ()
sourceStderr, IO ()
closeStderr)
, StreamingProcessHandle
sph) <- forall (m :: * -> *) stdin stdout stderr.
(MonadIO m, InputSource stdin, OutputSink stdout,
OutputSink stderr) =>
CreateProcess -> m (stdin, stdout, stderr, StreamingProcessHandle)
streamingProcess CreateProcess
cp
(()
_, a
resStdout, b
resStderr) <-
forall a. Concurrently a -> IO a
runConcurrently (
(,,)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IO a -> Concurrently a
Concurrently ((forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO UnliftIO m
u forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit forall a b. (a -> b) -> a -> b
$ ConduitT () ByteString m ()
producerStdin forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void m ()
sinkStdin) forall a b. IO a -> IO b -> IO a
`finally` IO ()
closeStdin)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. IO a -> Concurrently a
Concurrently (forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO UnliftIO m
u forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit forall a b. (a -> b) -> a -> b
$ ConduitT () ByteString m ()
sourceStdout forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void m a
consumerStdout)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. IO a -> Concurrently a
Concurrently (forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO UnliftIO m
u forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit forall a b. (a -> b) -> a -> b
$ ConduitT () ByteString m ()
sourceStderr forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void m b
consumerStderr))
forall a b. IO a -> IO b -> IO a
`finally` (IO ()
closeStdout forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
closeStderr)
forall a b. IO a -> IO b -> IO a
`onException` forall (m :: * -> *). MonadIO m => StreamingProcessHandle -> m ()
terminateStreamingProcess StreamingProcessHandle
sph
ExitCode
ec <- forall (m :: * -> *).
MonadIO m =>
StreamingProcessHandle -> m ExitCode
waitForStreamingProcess StreamingProcessHandle
sph
forall (m :: * -> *) a. Monad m => a -> m a
return (ExitCode
ec, a
resStdout, b
resStderr)
sourceCmdWithStreams
:: MonadUnliftIO m
=> String
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceCmdWithStreams :: forall (m :: * -> *) a b.
MonadUnliftIO m =>
String
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceCmdWithStreams String
cmd = forall (m :: * -> *) a b.
MonadUnliftIO m =>
CreateProcess
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceProcessWithStreams (String -> CreateProcess
shell String
cmd)
withCheckedProcessCleanup
:: ( InputSource stdin
, OutputSink stderr
, OutputSink stdout
, MonadUnliftIO m
)
=> CreateProcess
-> (stdin -> stdout -> stderr -> m b)
-> m b
withCheckedProcessCleanup :: forall stdin stderr stdout (m :: * -> *) b.
(InputSource stdin, OutputSink stderr, OutputSink stdout,
MonadUnliftIO m) =>
CreateProcess -> (stdin -> stdout -> stderr -> m b) -> m b
withCheckedProcessCleanup CreateProcess
cp stdin -> stdout -> stderr -> m b
f = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
(forall (m :: * -> *) stdin stdout stderr.
(MonadIO m, InputSource stdin, OutputSink stdout,
OutputSink stderr) =>
CreateProcess -> m (stdin, stdout, stderr, StreamingProcessHandle)
streamingProcess CreateProcess
cp)
(\(stdin
_, stdout
_, stderr
_, StreamingProcessHandle
sph) -> forall (m :: * -> *). MonadIO m => StreamingProcessHandle -> m ()
closeStreamingProcessHandle StreamingProcessHandle
sph)
forall a b. (a -> b) -> a -> b
$ \(stdin
x, stdout
y, stderr
z, StreamingProcessHandle
sph) -> do
b
res <- forall a. m a -> IO a
run (stdin -> stdout -> stderr -> m b
f stdin
x stdout
y stderr
z) forall a b. IO a -> IO b -> IO a
`onException` forall (m :: * -> *). MonadIO m => StreamingProcessHandle -> m ()
terminateStreamingProcess StreamingProcessHandle
sph
ExitCode
ec <- forall (m :: * -> *).
MonadIO m =>
StreamingProcessHandle -> m ExitCode
waitForStreamingProcess StreamingProcessHandle
sph
if ExitCode
ec forall a. Eq a => a -> a -> Bool
== ExitCode
ExitSuccess
then forall (m :: * -> *) a. Monad m => a -> m a
return b
res
else forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ CreateProcess -> ExitCode -> ProcessExitedUnsuccessfully
ProcessExitedUnsuccessfully CreateProcess
cp ExitCode
ec
terminateStreamingProcess :: MonadIO m => StreamingProcessHandle -> m ()
terminateStreamingProcess :: forall (m :: * -> *). MonadIO m => StreamingProcessHandle -> m ()
terminateStreamingProcess = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessHandle -> IO ()
terminateProcess forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamingProcessHandle -> ProcessHandle
streamingProcessHandleRaw