Skip to content

Commit f229e13

Browse files
authored
agent: reduce subscription transactions (#1259)
* agent: reduce subscription transactions * nub * remove commented
1 parent 0dd52dc commit f229e13

File tree

3 files changed

+10
-12
lines changed

3 files changed

+10
-12
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ import Data.Bifunctor (bimap, first)
131131
import Data.ByteString.Char8 (ByteString)
132132
import qualified Data.ByteString.Char8 as B
133133
import Data.Composition ((.:), (.:.), (.::), (.::.))
134+
import Data.Containers.ListUtils (nubOrd)
134135
import Data.Either (isRight, rights)
135136
import Data.Foldable (foldl', toList)
136137
import Data.Functor (($>))
@@ -959,7 +960,7 @@ subscribeConnections' c connIds = do
959960
errs' = M.map (Left . storeError) errs
960961
(subRs, rcvQs) = M.mapEither rcvQueueOrResult cs
961962
mapM_ (mapM_ (\(cData, sqs) -> mapM_ (lift . resumeMsgDelivery c cData) sqs) . sndQueue) cs
962-
mapM_ (resumeConnCmds c) $ M.keys cs
963+
lift $ resumeConnCmds c $ M.keys cs
963964
rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concat $ M.elems rcvQs)
964965
ns <- asks ntfSupervisor
965966
tkn <- readTVarIO (ntfTkn ns)
@@ -1118,13 +1119,10 @@ resumeSrvCmds :: AgentClient -> Maybe SMPServer -> AM' ()
11181119
resumeSrvCmds = void .: getAsyncCmdWorker False
11191120
{-# INLINE resumeSrvCmds #-}
11201121

1121-
resumeConnCmds :: AgentClient -> ConnId -> AM ()
1122-
resumeConnCmds c connId =
1123-
unlessM connQueued $
1124-
withStore' c (`getPendingCommandServers` connId)
1125-
>>= mapM_ (lift . resumeSrvCmds c)
1126-
where
1127-
connQueued = atomically $ isJust <$> TM.lookupInsert connId True (connCmdsQueued c)
1122+
resumeConnCmds :: AgentClient -> [ConnId] -> AM' ()
1123+
resumeConnCmds c connIds = do
1124+
srvs <- nubOrd . concat . rights <$> withStoreBatch' c (\db -> fmap (getPendingCommandServers db) connIds)
1125+
mapM_ (resumeSrvCmds c) srvs
11281126

11291127
getAsyncCmdWorker :: Bool -> AgentClient -> Maybe SMPServer -> AM' Worker
11301128
getAsyncCmdWorker hasWork c server =

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,6 @@ data AgentClient = AgentClient
313313
workerSeq :: TVar Int,
314314
smpDeliveryWorkers :: TMap SndQAddr (Worker, TMVar ()),
315315
asyncCmdWorkers :: TMap (Maybe SMPServer) Worker,
316-
connCmdsQueued :: TMap ConnId Bool,
317316
ntfNetworkOp :: TVar AgentOpState,
318317
rcvNetworkOp :: TVar AgentOpState,
319318
msgDeliveryOp :: TVar AgentOpState,
@@ -480,7 +479,6 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} currentTs a
480479
workerSeq <- newTVarIO 0
481480
smpDeliveryWorkers <- TM.emptyIO
482481
asyncCmdWorkers <- TM.emptyIO
483-
connCmdsQueued <- TM.emptyIO
484482
ntfNetworkOp <- newTVarIO $ AgentOpState False 0
485483
rcvNetworkOp <- newTVarIO $ AgentOpState False 0
486484
msgDeliveryOp <- newTVarIO $ AgentOpState False 0
@@ -519,7 +517,6 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} currentTs a
519517
workerSeq,
520518
smpDeliveryWorkers,
521519
asyncCmdWorkers,
522-
connCmdsQueued,
523520
ntfNetworkOp,
524521
rcvNetworkOp,
525522
msgDeliveryOp,
@@ -893,7 +890,6 @@ closeAgentClient c = do
893890
atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect
894891
clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst)
895892
clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker
896-
clear connCmdsQueued
897893
atomically . RQ.clear $ activeSubs c
898894
atomically . RQ.clear $ pendingSubs c
899895
clear subscrConns

src/Simplex/Messaging/Agent/Store/SQLite.hs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1909,9 +1909,11 @@ newQueueId_ (Only maxId : _) = DBQueueId (maxId + 1)
19091909

19101910
getConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn)
19111911
getConn = getAnyConn False
1912+
{-# INLINE getConn #-}
19121913

19131914
getDeletedConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn)
19141915
getDeletedConn = getAnyConn True
1916+
{-# INLINE getDeletedConn #-}
19151917

19161918
getAnyConn :: Bool -> DB.Connection -> ConnId -> IO (Either StoreError SomeConn)
19171919
getAnyConn deleted' dbConn connId =
@@ -1932,9 +1934,11 @@ getAnyConn deleted' dbConn connId =
19321934

19331935
getConns :: DB.Connection -> [ConnId] -> IO [Either StoreError SomeConn]
19341936
getConns = getAnyConns_ False
1937+
{-# INLINE getConns #-}
19351938

19361939
getDeletedConns :: DB.Connection -> [ConnId] -> IO [Either StoreError SomeConn]
19371940
getDeletedConns = getAnyConns_ True
1941+
{-# INLINE getDeletedConns #-}
19381942

19391943
getAnyConns_ :: Bool -> DB.Connection -> [ConnId] -> IO [Either StoreError SomeConn]
19401944
getAnyConns_ deleted' db connIds = forM connIds $ E.handle handleDBError . getAnyConn deleted' db

0 commit comments

Comments
 (0)