Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/src-lib/Hasura/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ runHGEServer
, MonadExecuteQuery m
, Tracing.HasReporter m
, MonadQueryInstrumentation m
, MonadMetadataStorage m
, MonadMetadataStorage (MetadataStorageT m)
)
=> Env.Environment
-> ServeOptions impl
Expand Down Expand Up @@ -685,7 +685,7 @@ runTxInMetadataStorage tx = do
liftEitherM $ liftIO $ runExceptT $
Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) tx

instance MonadMetadataStorage ServerAppM where
instance MonadMetadataStorage (MetadataStorageT ServerAppM) where

getDeprivedCronTriggerStats = runTxInMetadataStorage getDeprivedCronTriggerStatsTx
getScheduledEventsForDelivery = runTxInMetadataStorage getScheduledEventsForDeliveryTx
Expand Down
83 changes: 42 additions & 41 deletions server/src-lib/Hasura/Class.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Hasura.Eventing.ScheduledTrigger.Types
import Hasura.Prelude
import Hasura.RQL.Types

import Control.Monad.Morph (MFunctor, hoist)
import Control.Monad.Morph (MFunctor)

import qualified Hasura.Tracing as Tracing

Expand All @@ -21,6 +21,7 @@ newtype MetadataStorageT m a
, MFunctor
, MonadTrans
, MonadIO
, Tracing.HasReporter
)

runMetadataStorageT
Expand All @@ -29,56 +30,56 @@ runMetadataStorageT =
runExceptT . unMetadataStorageT


class (Monad m) => MonadMetadataStorage m where
class (MonadError QErr m) => MonadMetadataStorage m where

-- Scheduled triggers
getDeprivedCronTriggerStats :: MetadataStorageT m [CronTriggerStats]
getScheduledEventsForDelivery :: MetadataStorageT m ([CronEvent], [OneOffScheduledEvent])
insertScheduledEvent :: ScheduledEventSeed -> MetadataStorageT m ()
getDeprivedCronTriggerStats :: m [CronTriggerStats]
getScheduledEventsForDelivery :: m ([CronEvent], [OneOffScheduledEvent])
insertScheduledEvent :: ScheduledEventSeed -> m ()
insertScheduledEventInvocation
:: Invocation 'ScheduledType -> ScheduledEventType -> MetadataStorageT m ()
:: Invocation 'ScheduledType -> ScheduledEventType -> m ()
setScheduledEventOp
:: ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> MetadataStorageT m ()
:: ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
unlockScheduledEvents
:: ScheduledEventType -> [ScheduledEventId] -> MetadataStorageT m Int
unlockAllLockedScheduledEvents :: MetadataStorageT m ()
:: ScheduledEventType -> [ScheduledEventId] -> m Int
unlockAllLockedScheduledEvents :: m ()

instance (MonadMetadataStorage m) => MonadMetadataStorage (ReaderT r m) where

getDeprivedCronTriggerStats = (hoist lift) getDeprivedCronTriggerStats
getScheduledEventsForDelivery = (hoist lift) getScheduledEventsForDelivery
insertScheduledEvent = (hoist lift) . insertScheduledEvent
insertScheduledEventInvocation a b = (hoist lift) $ insertScheduledEventInvocation a b
setScheduledEventOp a b c = (hoist lift) $ setScheduledEventOp a b c
unlockScheduledEvents a b = (hoist lift) $ unlockScheduledEvents a b
unlockAllLockedScheduledEvents = (hoist lift) unlockAllLockedScheduledEvents
getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats
getScheduledEventsForDelivery = lift getScheduledEventsForDelivery
insertScheduledEvent = lift . insertScheduledEvent
insertScheduledEventInvocation a b = lift $ insertScheduledEventInvocation a b
setScheduledEventOp a b c = lift $ setScheduledEventOp a b c
unlockScheduledEvents a b = lift $ unlockScheduledEvents a b
unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents

instance (MonadMetadataStorage m) => MonadMetadataStorage (ExceptT e m) where
-- instance (MonadMetadataStorage m) => MonadMetadataStorage (ExceptT e m) where

getDeprivedCronTriggerStats = (hoist lift) getDeprivedCronTriggerStats
getScheduledEventsForDelivery = (hoist lift) getScheduledEventsForDelivery
insertScheduledEvent = (hoist lift) . insertScheduledEvent
insertScheduledEventInvocation a b = (hoist lift) $ insertScheduledEventInvocation a b
setScheduledEventOp a b c = (hoist lift) $ setScheduledEventOp a b c
unlockScheduledEvents a b = (hoist lift) $ unlockScheduledEvents a b
unlockAllLockedScheduledEvents = (hoist lift) unlockAllLockedScheduledEvents
-- getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats
-- getScheduledEventsForDelivery = lift getScheduledEventsForDelivery
-- insertScheduledEvent = lift . insertScheduledEvent
-- insertScheduledEventInvocation a b = lift $ insertScheduledEventInvocation a b
-- setScheduledEventOp a b c = lift $ setScheduledEventOp a b c
-- unlockScheduledEvents a b = lift $ unlockScheduledEvents a b
-- unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents

instance (MonadMetadataStorage m) => MonadMetadataStorage (Tracing.TraceT m) where

getDeprivedCronTriggerStats = (hoist lift) getDeprivedCronTriggerStats
getScheduledEventsForDelivery = (hoist lift) getScheduledEventsForDelivery
insertScheduledEvent = (hoist lift) . insertScheduledEvent
insertScheduledEventInvocation a b = (hoist lift) $ insertScheduledEventInvocation a b
setScheduledEventOp a b c = (hoist lift) $ setScheduledEventOp a b c
unlockScheduledEvents a b = (hoist lift) $ unlockScheduledEvents a b
unlockAllLockedScheduledEvents = (hoist lift) unlockAllLockedScheduledEvents

instance (MonadMetadataStorage m) => MonadMetadataStorage (LazyTxT e m) where

getDeprivedCronTriggerStats = (hoist lift) getDeprivedCronTriggerStats
getScheduledEventsForDelivery = (hoist lift) getScheduledEventsForDelivery
insertScheduledEvent = (hoist lift) . insertScheduledEvent
insertScheduledEventInvocation a b = (hoist lift) $ insertScheduledEventInvocation a b
setScheduledEventOp a b c = (hoist lift) $ setScheduledEventOp a b c
unlockScheduledEvents a b = (hoist lift) $ unlockScheduledEvents a b
unlockAllLockedScheduledEvents = (hoist lift) unlockAllLockedScheduledEvents
getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats
getScheduledEventsForDelivery = lift getScheduledEventsForDelivery
insertScheduledEvent = lift . insertScheduledEvent
insertScheduledEventInvocation a b = lift $ insertScheduledEventInvocation a b
setScheduledEventOp a b c = lift $ setScheduledEventOp a b c
unlockScheduledEvents a b = lift $ unlockScheduledEvents a b
unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents

-- instance (MonadMetadataStorage m) => MonadMetadataStorage (LazyTxT e m) where

-- getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats
-- getScheduledEventsForDelivery = lift getScheduledEventsForDelivery
-- insertScheduledEvent = lift . insertScheduledEvent
-- insertScheduledEventInvocation a b = lift $ insertScheduledEventInvocation a b
-- setScheduledEventOp a b c = lift $ setScheduledEventOp a b c
-- unlockScheduledEvents a b = lift $ unlockScheduledEvents a b
-- unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents
35 changes: 15 additions & 20 deletions server/src-lib/Hasura/Eventing/ScheduledTrigger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ import Hasura.SQL.Types
-- have an adequate buffer of cron events.
runCronEventsGenerator
:: ( MonadIO m
, MonadMetadataStorage m
, MonadMetadataStorage (MetadataStorageT m)
)
=> L.Logger L.Hasura
-> IO SchemaCache
Expand Down Expand Up @@ -160,7 +160,7 @@ runCronEventsGenerator logger getSC = do
insertCronEventsFor
:: (MonadMetadataStorage m)
=> [(CronTriggerInfo, CronTriggerStats)]
-> MetadataStorageT m ()
-> m ()
insertCronEventsFor cronTriggersWithStats = do
let scheduledEvents = flip concatMap cronTriggersWithStats $ \(cti, stats) ->
generateCronEventsFrom (ctsMaxScheduledTime stats) cti
Expand All @@ -183,7 +183,7 @@ processCronEvents
:: ( HasVersion
, MonadIO m
, Tracing.HasReporter m
, MonadMetadataStorage m
, MonadMetadataStorage (MetadataStorageT m)
)
=> L.Logger L.Hasura
-> LogEnvHeaders
Expand All @@ -209,7 +209,7 @@ processCronEvents logger logEnv httpMgr cronEvents getSC lockedCronEvents = do
(fromMaybe J.Null ctiPayload) ctiComment
Nothing
retryCtx = RetryContext tries ctiRetryConf
finally <- runExceptT $ flip runReaderT (logger, httpMgr) $
finally <- runMetadataStorageT $ flip runReaderT (logger, httpMgr) $
processScheduledEvent logEnv id' ctiHeaders retryCtx
payload webhookUrl Cron
removeEventFromLockedEvents id' lockedCronEvents
Expand All @@ -221,7 +221,7 @@ processOneOffScheduledEvents
:: ( HasVersion
, MonadIO m
, Tracing.HasReporter m
, MonadMetadataStorage m
, MonadMetadataStorage (MetadataStorageT m)
)
=> Env.Environment
-> L.Logger L.Hasura
Expand All @@ -236,7 +236,7 @@ processOneOffScheduledEvents env logger logEnv httpMgr oneOffEvents lockedOneOff
-- graceful shutdown is initiated in midst of processing these events
saveLockedEvents (map _ooseId oneOffEvents) lockedOneOffScheduledEvents
for_ oneOffEvents $ \OneOffScheduledEvent{..} -> do
(either logInternalError pure) =<< runExceptT do
(either logInternalError pure) =<< runMetadataStorageT do
webhookInfo <- resolveWebhook env _ooseWebhookConf
headerInfo <- getHeaderInfosFromConf env _ooseHeaderConf
let webhookUrl = unResolvedWebhook webhookInfo
Expand All @@ -255,7 +255,7 @@ processScheduledTriggers
:: ( HasVersion
, MonadIO m
, Tracing.HasReporter m
, MonadMetadataStorage m
, MonadMetadataStorage (MetadataStorageT m)
)
=> Env.Environment
-> L.Logger L.Hasura
Expand All @@ -282,7 +282,6 @@ processScheduledEvent
, Has (L.Logger L.Hasura) r
, HasVersion
, MonadIO m
, MonadError QErr m
, Tracing.HasReporter m
, MonadMetadataStorage m
)
Expand Down Expand Up @@ -322,7 +321,6 @@ processScheduledEvent logEnv eventId eventHeaders retryCtx payload webhookUrl ty

processError
:: ( MonadIO m
, MonadError QErr m
, MonadMetadataStorage m
)
=> ScheduledEventId
Expand All @@ -348,17 +346,16 @@ processError eventId retryCtx decodedHeaders type' reqJson err = do
HOther detail -> do
let errMsg = (TBS.fromLBS $ J.encode detail)
mkInvocation eventId 500 decodedHeaders errMsg [] reqJson
liftEitherM $ runMetadataStorageT $ do
insertScheduledEventInvocation invocation type'
retryOrMarkError eventId retryCtx err type'
insertScheduledEventInvocation invocation type'
retryOrMarkError eventId retryCtx err type'

retryOrMarkError
:: (MonadIO m, MonadMetadataStorage m)
=> ScheduledEventId
-> RetryContext
-> HTTPErr a
-> ScheduledEventType
-> MetadataStorageT m ()
-> m ()
retryOrMarkError eventId retryCtx err type' = do
let RetryContext tries retryConf = retryCtx
mRetryHeader = getRetryAfterHeaderFromHTTPErr err
Expand Down Expand Up @@ -409,7 +406,7 @@ and it can transition to other states in the following ways:
-}

processSuccess
:: (MonadIO m, MonadError QErr m, MonadMetadataStorage m)
:: (MonadMetadataStorage m)
=> ScheduledEventId
-> [HeaderConf]
-> ScheduledEventType
Expand All @@ -421,16 +418,14 @@ processSuccess eventId decodedHeaders type' reqBodyJson resp = do
respHeaders = hrsHeaders resp
respStatus = hrsStatus resp
invocation = mkInvocation eventId respStatus decodedHeaders respBody respHeaders reqBodyJson
liftEitherM $ runMetadataStorageT $ do
insertScheduledEventInvocation invocation type'
setScheduledEventOp eventId (SEOpStatus SESDelivered) type'
insertScheduledEventInvocation invocation type'
setScheduledEventOp eventId (SEOpStatus SESDelivered) type'

processDead
:: (MonadError QErr m, MonadMetadataStorage m)
:: (MonadMetadataStorage m)
=> ScheduledEventId -> ScheduledEventType -> m ()
processDead eventId type' =
liftEitherM $ runMetadataStorageT $
setScheduledEventOp eventId (SEOpStatus SESDead) type'
setScheduledEventOp eventId (SEOpStatus SESDead) type'

mkInvocation
:: ScheduledEventId
Expand Down