diff --git a/.gitignore b/.gitignore index fa4e5f0..c81c401 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,6 @@ cabal.project.local # datasets *.csv + +# profiling artifacts +*.ps diff --git a/app/LoadBalancer.hs b/app/LoadBalancer.hs new file mode 100644 index 0000000..d04dafa --- /dev/null +++ b/app/LoadBalancer.hs @@ -0,0 +1,178 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE FlexibleContexts #-} + +module Main where + +import Control.Monad (replicateM, forM_, forever) +import Control.Monad.Loops (iterateM_) +import Control.Monad.Random.Class (getRandomR) +import Data.Coerce (coerce) +import Data.List (elemIndex, foldl1') +import Data.Maybe (fromMaybe) +import Data.Random.Source.PureMT (newPureMT) +import Deli (Channel, Deli, JobTiming(..)) +import Deli.Printer (printResults) +import System.Random +import qualified Data.PQueue.Min as PQueue +import qualified Deli +import qualified Deli.Random + +createWorker + :: Deli JobTiming (Channel JobTiming) +createWorker = do + workerChannel <- Deli.newChannel Nothing + Deli.fork $ forever $ do + job <- Deli.readChannel workerChannel + Deli.runJob job + return workerChannel + +roundRobinWorkers + :: Int + -> Channel JobTiming + -> Deli JobTiming () +roundRobinWorkers num jobChannel = do + chans :: [Channel JobTiming] <- replicateM num createWorker + -- create an infinite list of all channels, repeated, + -- then for each one, read from main queue, and write + -- to the worker's queue + let roundRobinList = cycle chans + forM_ roundRobinList $ \worker -> do + job <- Deli.readChannel jobChannel + Deli.writeChannel worker job + +randomWorkers + :: Int + -> Channel JobTiming + -> Deli JobTiming () +randomWorkers num jobChannel = do + chans :: [Channel JobTiming] <- replicateM num createWorker + forever $ do + randomWorkerIndex <- getRandomR (0, length chans - 1) + let workerQueue = chans !! randomWorkerIndex + job <- Deli.readChannel jobChannel + Deli.writeChannel workerQueue job + +leastConn + :: Int + -> Channel JobTiming + -> Deli JobTiming () +leastConn num jobChannel = do + chans :: [Channel JobTiming] <- replicateM num createWorker + forever $ do + job <- Deli.readChannel jobChannel + + conns <- mapM Deli.channelLength chans + let minIndex = fromMaybe 0 $ elemIndex (foldl1' min conns) conns + + Deli.writeChannel (chans !! minIndex) job + +twoRandomChoices + :: Int + -> Channel JobTiming + -> Deli JobTiming () +twoRandomChoices num jobChannel = do + chans :: [Channel JobTiming] <- replicateM num createWorker + forever $ do + job <- Deli.readChannel jobChannel + + randomWorkerIndexA <- getRandomR (0, length chans - 1) + randomWorkerIndexB <- getRandomR (0, length chans - 1) + + aLength <- Deli.channelLength (chans !! randomWorkerIndexA) + bLength <- Deli.channelLength (chans !! randomWorkerIndexB) + + if aLength < bLength + then Deli.writeChannel (chans !! randomWorkerIndexA) job + else Deli.writeChannel (chans !! randomWorkerIndexB) job + +data PriorityChannel = PriorityChannel + { _pduration :: !Deli.Duration + , _pchannel :: !(Deli.Channel JobTiming) + } deriving (Eq, Ord, Show) + +lwlDispatcher + :: Deli.Channel JobTiming + -> PQueue.MinQueue PriorityChannel + -> Deli JobTiming () +lwlDispatcher !readChan !queue = do + now <- Deli.now + iterateM_ (dispatch readChan) (queue, now) + +dispatch + :: Deli.Channel JobTiming + -> (PQueue.MinQueue PriorityChannel, Deli.Time) + -> Deli JobTiming (PQueue.MinQueue PriorityChannel, Deli.Time) +dispatch readChan (queue, prevTime) = do + job <- Deli.readChannel readChan + newTime <- Deli.now + + durationMultiplier <- fromRational . toRational <$> getRandomR (0.7, 1.3 :: Float) + + + let mFun lastTime nowTime (PriorityChannel d c) = + PriorityChannel (max 0 (d - coerce (nowTime - lastTime))) c + !adjustedQueue = PQueue.map (mFun prevTime newTime) queue + (PriorityChannel shortestPrevDuration shortestQueue, deletedMin) = PQueue.deleteFindMin adjustedQueue + + approxJobDuration = durationMultiplier * _jobDuration job + newPriorityChannel = PriorityChannel (shortestPrevDuration + approxJobDuration) shortestQueue + !addedBack = PQueue.insert newPriorityChannel deletedMin + + Deli.writeChannel shortestQueue job + return (addedBack, newTime) + +leastWorkLeft + :: Int + -> Channel JobTiming + -> Deli JobTiming () +leastWorkLeft num jobChannel = do + chans :: [Channel JobTiming] <- replicateM num createWorker + let workQueue :: PQueue.MinQueue PriorityChannel + startingTimes = take num [0.00001, 0.00002..] + queueList = [PriorityChannel d c | (d, c) <- zip startingTimes chans] + workQueue = PQueue.fromAscList queueList + lwlDispatcher jobChannel workQueue + +loadBalancerExample :: IO () +loadBalancerExample = do + simulationGen <- newStdGen + inputGen <- newPureMT + let arrivals = Deli.Random.arrivalTimePoissonDistribution 1500 + serviceTimes = Deli.Random.durationExponentialDistribution 0.025 + numTests = 1000 * 1000 * 1 + jobsA = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen + jobsB = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen + jobsC = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen + jobsD = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen + jobsE = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen + roundRobinRes = Deli.simulate simulationGen jobsA (roundRobinWorkers 48) + randomRes = Deli.simulate simulationGen jobsB (randomWorkers 48) + leastWorkLeftRes = Deli.simulate simulationGen jobsC (leastWorkLeft 48) + twoRandomChoicesRes = Deli.simulate simulationGen jobsD (twoRandomChoices 48) + leastConnRes = Deli.simulate simulationGen jobsE (leastConn 48) + + putStrLn "## Round Robin ##" + printResults roundRobinRes + newline + putStrLn "## Random ##" + printResults randomRes + newline + putStrLn "## LeastWorkLeft ##" + printResults leastWorkLeftRes + newline + putStrLn "## TwoRandomChoices ##" + printResults twoRandomChoicesRes + newline + putStrLn "## LeastConn ##" + printResults leastConnRes + newline + + where newline = putStrLn "\n" + +main :: IO () +main = do + loadBalancerExample + newline + + where newline = putStrLn "\n" diff --git a/app/Performance.hs b/app/Performance.hs new file mode 100644 index 0000000..dd1d8f7 --- /dev/null +++ b/app/Performance.hs @@ -0,0 +1,113 @@ +import qualified Control.Monad.Concurrent as C +import Control.Monad (forever, replicateM_) +import Deli (Channel, Deli, JobTiming(..)) +import Deli.Printer (printResults) +import System.Random +import qualified Deli + +singleQueue + :: Channel JobTiming + -> Deli JobTiming () +singleQueue queue = + forever $ do + job <- Deli.readChannel queue + Deli.runJob job + +singleQueueExample :: IO () +singleQueueExample = do + gen <- newStdGen + let durations = repeat 0.5 + count = 1000 * 100 + times = [0,1..(count - 1)] + jobs = zipWith JobTiming times durations + res = Deli.simulate gen jobs singleQueue + printResults res + +chainedQueues + :: Channel JobTiming + -> Deli JobTiming () +chainedQueues queue = do + middleChan <- Deli.newChannel Nothing + Deli.fork $ forever $ do + job <- Deli.readChannel middleChan + Deli.runJob job + forever $ do + job <- Deli.readChannel queue + Deli.writeChannel middleChan job + +chainedQueueExample :: IO () +chainedQueueExample = do + gen <- newStdGen + let durations = repeat 0.5 + count = 1000 * 100 + times = [0,1..(count - 1)] + jobs = zipWith JobTiming times durations + res = Deli.simulate gen jobs chainedQueues + printResults res + +oneThread + :: Channel JobTiming + -> Deli JobTiming () +oneThread queue = do + middleChan <- Deli.newChannel (Just 1) + forever $ do + jobA <- Deli.readChannel queue + Deli.writeChannel middleChan jobA + jobB <- Deli.readChannel middleChan + Deli.runJob jobB + +oneThreadExample :: IO () +oneThreadExample = do + gen <- newStdGen + let durations = repeat 0.5 + count = 1000 * 1000 + times = [0,1..(count - 1)] + jobs = zipWith JobTiming times durations + res = Deli.simulate gen jobs oneThread + printResults res + +concurrentSingleExample + :: IO () +concurrentSingleExample = + C.runConcurrentT $ do + chan <- C.newChannel (Just 1) + C.fork $ forever $ + C.readChannel chan >> return () + replicateM_ (1000 * 100 * 10) $ do + C.writeChannel chan True + +concurrentChainedExample + :: IO () +concurrentChainedExample = + C.runConcurrentT $ do + chanOne <- C.newChannel (Just 1) + chanTwo <- C.newChannel (Just 1) + C.fork $ forever $ do + val <- C.readChannel chanOne + C.writeChannel chanTwo val + C.fork $ forever $ + C.readChannel chanTwo >> return () + replicateM_ (1000 * 100 * 10) $ do + C.writeChannel chanOne True + +main :: IO () +main = do + newline + putStrLn "## singleQueueExample ##" + singleQueueExample + newline + + newline + putStrLn "## chainedQueueExample ##" + chainedQueueExample + newline + + newline + putStrLn "## oneThreadExample ##" + oneThreadExample + newline + + concurrentSingleExample + concurrentChainedExample + + where newline = putStrLn "\n" diff --git a/deli.cabal b/deli.cabal index 1923e11..0129868 100644 --- a/deli.cabal +++ b/deli.cabal @@ -19,12 +19,13 @@ library , Deli , Deli.Printer , Deli.Random - build-depends: base >= 4.7 && < 5 + build-depends: base , MonadRandom , bytestring , containers , dlist , lens + , monad-loops , mtl , pqueue , random @@ -33,8 +34,9 @@ library , tdigest , time , transformers + , strict default-language: Haskell2010 - ghc-options: -Wall + ghc-options: -Wall -O2 executable tutorial hs-source-dirs: app @@ -56,14 +58,38 @@ executable tutorial default-language: Haskell2010 ghc-options: -threaded -rtsopts -with-rtsopts=-N -O1 -test-suite deli-test - type: exitcode-stdio-1.0 - hs-source-dirs: test - main-is: Spec.hs +executable performance + hs-source-dirs: app + main-is: Performance.hs + build-depends: base + , deli + , random + ghc-options: -rtsopts -O2 + default-language: Haskell2010 + +executable load-balancer + hs-source-dirs: app + main-is: LoadBalancer.hs + ghc-options: -O1 build-depends: base + , MonadRandom + , bytestring + , containers , deli - ghc-options: -threaded -rtsopts -with-rtsopts=-N + , deepseq + , lens + , monad-loops + , mtl + , parallel + , pqueue + , random + , random-fu + , random-source + , tdigest + , time default-language: Haskell2010 + ghc-options: -Wall -O2 + source-repository head type: git diff --git a/src/Control/Monad/Concurrent.hs b/src/Control/Monad/Concurrent.hs index 1b670e4..21e0fc0 100644 --- a/src/Control/Monad/Concurrent.hs +++ b/src/Control/Monad/Concurrent.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} @@ -29,14 +30,20 @@ module Control.Monad.Concurrent , writeChannelNonblocking , readChannel , readChannelNonblocking + , channelLength , runConcurrentT ) where import Control.Lens (at, ix, makeLenses, to, use, (^?), (.=), (+=), (%=), (?~)) -import Control.Monad.State.Strict +import Control.Monad (void, join, unless) +import Control.Monad.State.CPS (StateT, evalStateT) import Control.Monad.Reader (MonadReader, ReaderT, ask, local, runReaderT) +import Control.Monad.IO.Class (MonadIO) import Control.Monad.Trans.Cont (ContT, evalContT, resetT, shiftT) +import Control.Monad.Trans.Class (MonadTrans, lift) +import Control.Monad.State.Class (MonadState, get, state, put) import Data.Map.Strict +import Data.Strict.Tuple (Pair((:!:))) import Data.Maybe import Data.PQueue.Min as PQueue import Data.Sequence @@ -82,8 +89,8 @@ data Channel a = Channel data ChanAndWaiters chanState m = ChanAndWaiters { _contents :: !(Seq chanState) - , _readers :: Queue (ThreadId, IConcurrentT chanState m ()) - , _writers :: Queue (ThreadId, IConcurrentT chanState m ()) + , _readers :: !(Queue (Pair ThreadId (IConcurrentT chanState m ()))) + , _writers :: !(Queue (Pair ThreadId (IConcurrentT chanState m ()))) } newtype Time = Time DiffTime @@ -140,7 +147,7 @@ subtractTime (Time end) (Time start) = Duration (end - start) data PriorityCoroutine chanState m = PriorityCoroutine - { _routine :: IConcurrentT chanState m () + { _routine :: !(IConcurrentT chanState m ()) , _pId :: !ThreadId , _priority :: !Time } @@ -161,19 +168,13 @@ data ConcurrentState chanState m = ConcurrentState , _channels :: !(Map (Channel chanState) (ChanAndWaiters chanState m)) , _nextChannelIdent :: !Integer , _nowTime :: !Time + , _done :: Bool } newtype IConcurrentT chanState m a = IConcurrentT { runIConcurrentT' :: ContT () (ReaderT ThreadId (StateT (ConcurrentState chanState m) m)) a - } deriving (Functor, Monad, MonadIO, MonadReader ThreadId, MonadState (ConcurrentState chanState m)) - -instance Applicative (IConcurrentT chanState m) where - pure = IConcurrentT . pure - - (IConcurrentT a) <*> (IConcurrentT b) = IConcurrentT (a <*> b) - - (IConcurrentT a) *> (IConcurrentT b) = IConcurrentT $ a >>= const b + } deriving (Functor, Applicative, Monad, MonadIO, MonadReader ThreadId, MonadState (ConcurrentState chanState m)) instance MonadTrans (IConcurrentT chanState) where lift = IConcurrentT . lift . lift . lift @@ -210,6 +211,7 @@ freshState = ConcurrentState , _channels = Data.Map.Strict.empty , _nextChannelIdent = 0 , _nowTime = 0 + , _done = False } register @@ -218,7 +220,7 @@ register -> IConcurrentT chanState m () register callback = IConcurrentT $ shiftT $ \k -> do - let routine = IConcurrentT (lift (k ())) + let !routine = IConcurrentT (lift (k ())) runIConcurrentT' (callback routine) getCCs @@ -240,6 +242,14 @@ updateNow updateNow time = nowTime .= time +dequeue' + :: Monad m + => IConcurrentT chanState m () +dequeue' = do + IConcurrentT (resetT (runIConcurrentT' dequeue)) + flag <- use done + unless flag dequeue' + dequeue :: Monad m => IConcurrentT chanState m () @@ -249,29 +259,25 @@ dequeue = do let mMin = PQueue.minView queue case (mMin, scheduled) of (Nothing, []) -> - return () + done .= True (Just (PriorityCoroutine nextCoroutine pId priority, modifiedQueue), []) -> do putCCs modifiedQueue updateNow priority - IConcurrentT (resetT (runIConcurrentT' (local (const pId) nextCoroutine))) - dequeue + local (const pId) nextCoroutine (Nothing, (priority, nextCoroutine): tl) -> do scheduledRoutines .= tl updateNow priority - IConcurrentT (resetT (runIConcurrentT' nextCoroutine)) - dequeue + nextCoroutine (Just (PriorityCoroutine nextCoroutineQ pId priorityQ, modifiedQueue), (priorityL, nextCoroutineL): tl) -> if priorityL <= priorityQ then do scheduledRoutines .= tl updateNow priorityL - IConcurrentT (resetT (runIConcurrentT' (local (const pId) nextCoroutineL))) - dequeue + local (const pId) nextCoroutineL else do putCCs modifiedQueue updateNow priorityQ - IConcurrentT (resetT (runIConcurrentT' nextCoroutineQ)) - dequeue + nextCoroutineQ ischeduleDuration :: Monad m @@ -339,8 +345,8 @@ ischedule time pId routine = do -- time. Effectively this immediately schedules the process if it were -- to otherwise have been scheduled for the past. let scheduleTime = max time currentNow - newRoutines = insertBehind (PriorityCoroutine routine pId scheduleTime) currentRoutines - putCCs newRoutines + newRoutines = PQueue.insert (PriorityCoroutine routine pId scheduleTime) currentRoutines + putCCs (PQueue.seqSpine newRoutines newRoutines) now :: Monad m @@ -427,7 +433,7 @@ iwriteChannel chan@(Channel _ident mMaxSize) item = do case mMaxSize of Just maxSize | chanCurrentSize >= maxSize -> register $ \routine -> - channels . ix chan . writers %= flip writeQueue (myId, routine) + channels . ix chan . writers %= flip writeQueue (myId :!: routine) _ -> return () @@ -446,7 +452,7 @@ iwriteChannel chan@(Channel _ident mMaxSize) item = do Nothing -> return () -- there is a reader, call the reader - Just ((readerId, nextReader), newReaders) -> do + Just (readerId :!: nextReader, newReaders) -> do channels . ix chan . readers .= newReaders local (const readerId) nextReader @@ -465,7 +471,6 @@ iwriteChannelNonblocking -> IConcurrentT chanState m (Maybe chanState) iwriteChannelNonblocking chan@(Channel _ident mMaxSize) item = do chanMap <- use channels - myId <- ithreadId let chanContents = chanMap ^? (ix chan . contents) chanCurrentSize = maybe 0 Data.Sequence.length chanContents @@ -483,14 +488,12 @@ iwriteChannelNonblocking chan@(Channel _ident mMaxSize) item = do case readerView of -- there are no readers Nothing -> - return (Just item) + return () -- there is a reader, call the reader - Just ((readerId, nextReader), newReaders) -> do + Just (readerId :!: nextReader, newReaders) -> do channels . ix chan . readers .= newReaders - --local (const readerId) nextReader - ischeduleDuration 0 readerId nextReader - register (ischeduleDuration 0 myId) - return (Just item) + IConcurrentT (resetT (runIConcurrentT' (local (const readerId) nextReader))) + return (Just item) readChannel :: Monad m @@ -512,7 +515,7 @@ ireadChannel chan = do EmptyL -> do -- nothing to read, so we add ourselves to the queue register $ \routine -> - channels . ix chan . readers %= flip writeQueue (myId, routine) + channels . ix chan . readers %= flip writeQueue (myId :!: routine) -- we can actually just recur here to read the value, since now -- that we're running again, the queue will have a value for us to -- read @@ -526,11 +529,11 @@ ireadChannel chan = do let writerView = join $ (readQueue . _writers) <$> Data.Map.Strict.lookup chan chanMap2 case writerView of Nothing -> - return val - Just ((writerId, nextWriter), newWriters) -> do + return () + Just (writerId :!: nextWriter, newWriters) -> do channels . ix chan . writers .= newWriters - local (const writerId) nextWriter - return val + IConcurrentT (resetT (runIConcurrentT' (local (const writerId) nextWriter))) + return val readChannelNonblocking :: Monad m @@ -557,11 +560,26 @@ ireadChannelNonblocking chan = do let writerView = join $ (readQueue . _writers) <$> Data.Map.Strict.lookup chan chanMap2 case writerView of Nothing -> - return (Just val) - Just ((writerId, nextWriter), newWriters) -> do + return () + Just (writerId :!: nextWriter, newWriters) -> do channels . ix chan . writers .= newWriters - local (const writerId) nextWriter - return (Just val) + IConcurrentT (resetT (runIConcurrentT' (local (const writerId) nextWriter))) + return (Just val) + +channelLength + :: Monad m + => Channel chanState + -> ConcurrentT chanState m Int +channelLength = ConcurrentT . iChannelLength + +iChannelLength + :: Monad m + => Channel chanState + -> IConcurrentT chanState m Int +iChannelLength chan = do + chanMap <- use channels + let chanContents = fromMaybe Data.Sequence.empty $ chanMap ^? (ix chan . contents) + return (Data.Sequence.length chanContents) runConcurrentT :: Monad m @@ -577,7 +595,7 @@ runIConcurrentT runIConcurrentT routine = let resetAction = do resetT (runIConcurrentT' routine) - runIConcurrentT' dequeue + runIConcurrentT' dequeue' in void $ flip evalStateT freshState $ flip runReaderT (ThreadId 0) $ evalContT resetAction diff --git a/src/Control/Monad/State/CPS.hs b/src/Control/Monad/State/CPS.hs new file mode 100644 index 0000000..60ebed1 --- /dev/null +++ b/src/Control/Monad/State/CPS.hs @@ -0,0 +1,138 @@ +{- +Copyright (c) 2013, Fumiaki Kinoshita + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + * Neither the name of Fumiaki Kinoshita nor the names of other + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +-} + +{-# LANGUAGE Trustworthy, Rank2Types, FlexibleInstances, FlexibleContexts, MultiParamTypeClasses, BangPatterns, + UndecidableInstances #-} +module Control.Monad.State.CPS (StateT(..) + , runStateT + , evalStateT + , execStateT + , mapStateT + , State + , runState + , evalState + , execState + , module Control.Monad.State.Class) where +import Control.Monad.State.Class +import Control.Applicative +import Control.Monad.Identity +import Control.Monad.Trans +import Control.Monad.IO.Class +import Control.Monad +import Control.Monad.Cont.Class +import Control.Monad.Reader.Class + +newtype StateT s m a = StateT { unStateT :: forall r. s -> (a -> s -> m r) -> m r } + +runStateT :: Monad m => StateT s m a -> s -> m (a, s) +runStateT m s = unStateT m s (\a s -> return (a, s)) +{-# INLINABLE runStateT #-} + +evalStateT :: Monad m => StateT s m a -> s -> m a +evalStateT m s = unStateT m s $ \a _ -> return a +{-# INLINABLE evalStateT #-} + +execStateT :: Monad m => StateT s m a -> s -> m s +execStateT m s = unStateT m s $ \_ s -> return s +{-# INLINABLE execStateT #-} + +mapStateT :: (Monad m, Monad n) => (m (a, s) -> n (b, s)) -> StateT s m a -> StateT s n b +-- This used to be implemented directly, but doing it this way produces identical +-- Core and is considerably simpler. +mapStateT t m = stateT $ \s -> t (runStateT m s) + +instance Functor (StateT s m) where + fmap f m = StateT $ \s c -> unStateT m s (c . f) + {-# INLINABLE fmap #-} + +instance Applicative (StateT s m) where + pure x = StateT $ \s c -> c x s + {-# INLINABLE pure #-} + mf <*> ma = StateT $ \s c -> unStateT mf s $ \f s' -> unStateT ma s' (c . f) + {-# INLINABLE (<*>) #-} + m *> n = StateT $ \s c -> unStateT m s $ \_ s' -> unStateT n s' c + {-# INLINABLE (*>) #-} + +instance Monad (StateT s m) where + return x = StateT $ \s c -> c x s + m >>= k = StateT $ \s c -> unStateT m s $ \a s' -> unStateT (k a) s' c + {-# INLINABLE (>>=) #-} + (>>) = (*>) + +instance MonadState s (StateT s m) where + get = StateT $ \s c -> c s s + {-# INLINABLE get #-} + put s = StateT $ \_ c -> c () s + {-# INLINABLE put #-} + state f = StateT $ \s c -> uncurry c (f s) + {-# INLINABLE state #-} + +instance MonadTrans (StateT s) where + lift m = StateT $ \s c -> m >>= \a -> c a s + {-# INLINABLE lift #-} + +instance MonadIO m => MonadIO (StateT s m) where + liftIO = lift . liftIO + +instance MonadReader e m => MonadReader e (StateT s m) where + ask = lift ask + + local f m = stateT $ \s -> local f (runStateT m s) + +instance MonadFix m => MonadFix (StateT s m) where + mfix f = stateT $ \s -> mfix $ \ ~(a, _) -> runStateT (f a) s + +instance MonadCont m => MonadCont (StateT s m) where + callCC f = stateT $ \s -> callCC $ \c -> runStateT (f (\a -> stateT $ \s' -> c (a, s'))) s + +-- A stricter version of 'state'. The latter uses the +-- lazy 'uncurry' function for some reason. +stateT :: Monad m => (s -> m (a, s)) -> StateT s m a +stateT f = StateT $ \s c -> do + (a, s') <- f s + c a s' +{-# INLINE stateT #-} + +type State s = StateT s Identity + +runState :: State s a -> s -> (a, s) +runState m s = runIdentity $ runStateT m s +{-# INLINE runState #-} + +evalState :: State s a -> s -> a +evalState m s = runIdentity $ evalStateT m s +{-# INLINE evalState #-} + +execState :: State s a -> s -> s +execState m s = runIdentity $ execStateT m s +{-# INLINE execState #-} diff --git a/src/Deli.hs b/src/Deli.hs index 61f9a03..c57b291 100644 --- a/src/Deli.hs +++ b/src/Deli.hs @@ -32,6 +32,7 @@ module Deli , writeChannelNonblocking , readChannel , readChannelNonblocking + , channelLength , runDeli , runJob , priority @@ -39,8 +40,10 @@ module Deli ) where import Control.Lens (Getter, makeLenses, to, use, (%~), (+~), (.~), (^.)) -import Control.Monad.Random.Strict -import Control.Monad.State.Strict (State, execState, modify') +import Control.Monad.Random.Strict (MonadRandom(..), RandT, evalRandT) +import Control.Monad.State.CPS (State, execState) +import Control.Monad.State.Class (modify') +import Control.Monad.Trans.Class (lift) import Data.Function ((&)) import Data.Map.Strict import Data.Maybe (fromJust) @@ -166,6 +169,11 @@ readChannelNonblocking -> Deli chanState (Maybe chanState) readChannelNonblocking = Deli . Concurrent.readChannelNonblocking +channelLength + :: Concurrent.Channel chanState + -> Deli chanState Int +channelLength = Deli . Concurrent.channelLength + ------------------------------------------------------------------------------ -- ## Time Conversion ------------------------------------------------------------------------------ diff --git a/stack.yaml b/stack.yaml index d2eb91c..07c6759 100644 --- a/stack.yaml +++ b/stack.yaml @@ -15,7 +15,7 @@ # resolver: # name: custom-snapshot # location: "./custom-snapshot.yaml" -resolver: nightly-2017-10-19 +resolver: nightly-2019-05-01 # User packages to be built. # Various formats can be used as shown in the example below. @@ -40,13 +40,9 @@ packages: # Dependency packages to be pulled from upstream that are not in the resolver # (e.g., acme-missiles-0.3) extra-deps: - - transformers-0.5.5.0 - - random-fu-0.2.7.0 - - rvar-0.2.0.3 - - MonadPrompt-1.0.0.5 - - log-domain-0.12 - - random-source-0.3.0.6 - - flexible-defaults-0.0.1.2 + - dump-core-0.1.3.2 + - monadLib-3.9 + - mtl-c-0.1.1 # Override default flag values for local packages and extra-deps flags: {} @@ -54,6 +50,8 @@ flags: {} # Extra package databases containing global packages extra-package-dbs: [] +allow-newer: true + # Control whether we use the GHC we find on the path # system-ghc: true # diff --git a/test/Spec.hs b/test/Spec.hs deleted file mode 100644 index cd4753f..0000000 --- a/test/Spec.hs +++ /dev/null @@ -1,2 +0,0 @@ -main :: IO () -main = putStrLn "Test suite not yet implemented"