Skip to content

readTQueueN #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions Control/Concurrent/STM/TQueue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ module Control.Concurrent.STM.TQueue (
newTQueue,
newTQueueIO,
readTQueue,
readTQueueN,
tryReadTQueue,
flushTQueue,
peekTQueue,
Expand Down Expand Up @@ -103,6 +104,59 @@ readTQueue (TQueue read write) = do
writeTVar read zs
return z


-- Logic of `readTQueueN`:
-- +-----------+--------------- +-----------------+
-- | write = 0 | write < N-read | write >= N-read |
-- +--------------+-----------+--------------- +-----------------+
-- | read == 0 | retry | retry | case 2 |
-- | 0 < read < N | retry | retry | case 3 |
-- +--------------+-----------+--------------- +-----------------+
-- | read >= N | . . . . . . . case 1 . . . . . . . . . |
-- +----=--------------------------------------------------------+

-- case 1a: More than N: splitAt N read -> put suffix in read and return prefix
-- case 1b: Exactly N: Reverse write into read, and return all of the old read
-- case 2: Reverse write -> splitAt N, put suffix in read and return prefix
-- case 3: Like case 2 but prepend read onto return value

-- |Reads N values, blocking until enough are available.
-- This is likely never to return if another thread is
-- blocking on `readTQueue`. It has quadratic complexity
-- in N due to each write triggering `readTQueueN` to calculate
-- the length of the write side as <N items pile up there.
--
-- @since 2.5.4
readTQueueN :: TQueue a -> Int -> STM [a]
readTQueueN (TQueue read write) n = do
xs <- readTVar read
let xl = length xs
if xl > n then do -- case 1a
let (as,bs) = splitAt n xs
writeTVar read bs
pure as
else if xl == n then do -- case 1b
ys <- readTVar write
case ys of
[] -> do
writeTVar read []
retry
_ -> do
let zs = reverse ys
writeTVar write []
writeTVar read zs
pure xs
else do
ys <- readTVar write
let yl = length ys
if yl == 0 then
retry
else if yl < n - xl then retry
else do -- cases 2 and 3
let (as,bs) = splitAt (n-xl) (reverse ys)
writeTVar read bs
pure $ xs <> as

-- | A version of 'readTQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryReadTQueue :: TQueue a -> STM (Maybe a)
Expand Down
2 changes: 1 addition & 1 deletion stm.cabal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cabal-version: >=1.10
name: stm
version: 2.5.3.1
version: 2.5.4
-- don't forget to update changelog.md file!

license: BSD3
Expand Down
2 changes: 2 additions & 0 deletions testsuite/src/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import qualified Issue17
import qualified Stm052
import qualified Stm064
import qualified Stm065
import qualified Stm066

main :: IO ()
main = do
Expand All @@ -23,6 +24,7 @@ main = do
, testCase "stm052" Stm052.main
, testCase "stm064" Stm064.main
, testCase "stm065" Stm065.main
, testCase "stm066" Stm066.main
]
]

30 changes: 30 additions & 0 deletions testsuite/src/Stm066.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{-# LANGUAGE CPP #-}

{- NB: This one fails for GHC < 7.6 which had a bug exposed via
nested uses of `orElse` in `stmCommitNestedTransaction`

This was fixed in GHC via
f184d9caffa09750ef6a374a7987b9213d6db28e
-}

module Stm066 (main) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Control.Monad (unless)

main :: IO ()
main = do
q <- atomically $ newTQueue
_ <- forkIO $ atomically $ do
writeTQueue q (1::Int)
writeTQueue q 2
writeTQueue q 3
writeTQueue q 4
l <- atomically $ do
_ <- readTQueueN q 1
readTQueueN q 3

unless (l == [2,3,4]) $
fail (show l)
1 change: 1 addition & 0 deletions testsuite/testsuite.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ test-suite stm
Stm052
Stm064
Stm065
Stm066

type: exitcode-stdio-1.0

Expand Down
Loading