Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hstream/hstream.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ library
HStream.Server.Core.Stream
HStream.Server.Core.Subscription
HStream.Server.Core.View
HStream.Server.Core.Shard
HStream.Server.Handler.Admin
HStream.Server.Handler.Cluster
HStream.Server.Handler.Connector
Expand All @@ -76,6 +77,7 @@ library
HStream.Server.Handler.Stream
HStream.Server.Handler.Subscription
HStream.Server.Handler.View
HStream.Server.Handler.Shard
HStream.Server.Persistence.Common
HStream.Server.Persistence.MemoryStore
HStream.Server.Persistence.Object
Expand Down
192 changes: 192 additions & 0 deletions hstream/src/HStream/Server/Core/Shard.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
{-# LANGUAGE LambdaCase #-}

module HStream.Server.Core.Shard
( readShard
, listShards
, splitShards
, mergeShards
)
where

import Control.Exception (bracket)
import Control.Monad (foldM, void)
import Data.Foldable (foldl')
import qualified Data.HashMap.Strict as HM
import qualified Data.Map.Strict as M
import qualified Data.Vector as V
import GHC.Stack (HasCallStack)
import qualified Z.Data.CBytes as CB

import Control.Concurrent (MVar, modifyMVar, modifyMVar_,
readMVar)
import Data.Maybe (fromJust, fromMaybe)
import Data.Text (Text)
import Data.Word (Word64)
import HStream.Server.Core.Common (decodeRecordBatch)
import qualified HStream.Server.HStreamApi as API
import HStream.Server.ReaderPool (getReader, putReader)
import HStream.Server.Shard (Shard (..), ShardKey (..),
SharedShardMap, cBytesToKey,
mergeTwoShard, mkShard,
mkSharedShardMapWithShards,
shardKeyToText, splitByKey,
splitHalf, textToShardKey)
import HStream.Server.Types (ServerContext (..),
transToStreamName)
import qualified HStream.Store as S
import HStream.Utils
import Proto3.Suite (Enumerated (Enumerated))
import Z.Data.CBytes (CBytes)

-------------------------------------------------------------------------------

listShards
:: HasCallStack
=> ServerContext
-> API.ListShardsRequest
-> IO (V.Vector API.Shard)
listShards ServerContext{..} API.ListShardsRequest{..} = do
shards <- M.elems <$> S.listStreamPartitions scLDClient streamId
V.foldM' constructShard V.empty $ V.fromList shards
where
streamId = transToStreamName listShardsRequestStreamName

constructShard shards logId = do
attr <- S.getStreamPartitionExtraAttrs scLDClient logId
case getShardInfo attr of
Nothing -> return . V.snoc shards $
API.Shard { API.shardStreamName = listShardsRequestStreamName
, API.shardShardId = logId
, API.shardIsActive = True
}
Just(sKey, eKey, ep) -> return . V.snoc shards $
API.Shard { API.shardStreamName = listShardsRequestStreamName
, API.shardShardId = logId
, API.shardStartHashRangeKey = shardKeyToText sKey
, API.shardEndHashRangeKey = shardKeyToText eKey
, API.shardEpoch = ep
-- FIXME: neet a way to find if this shard is active
, API.shardIsActive = True
}

readShard
:: HasCallStack
=> ServerContext
-> API.ReadShardRequest
-> IO (V.Vector API.ReceivedRecord)
readShard ServerContext{..} API.ReadShardRequest{..} = do
logId <- S.getUnderlyingLogId scLDClient streamId (Just shard)
startLSN <- getStartLSN logId

bracket
(getReader readerPool)
(flip S.readerStopReading logId >> putReader readerPool)
(\reader -> readData reader logId startLSN)
where
streamId = transToStreamName readShardRequestStreamName
shard = textToCBytes readShardRequestShardId

getStartLSN :: S.C_LogID -> IO S.LSN
getStartLSN logId =
case fromJust . API.shardOffsetOffset . fromJust $ readShardRequestOffset of
API.ShardOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetEARLIEST)) -> return S.LSN_MIN
API.ShardOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetLATEST)) -> (+ 1) <$> S.getTailLSN scLDClient logId
API.ShardOffsetOffsetRecordOffset API.RecordId{..} -> return recordIdBatchId
_ -> error "wrong shard offset"

readData :: S.LDReader -> S.C_LogID -> S.LSN -> IO (V.Vector API.ReceivedRecord)
readData reader logId startLSN = do
void $ S.readerStartReading reader logId startLSN (startLSN + fromIntegral readShardRequestMaxRead)
S.readerSetTimeout reader 0
records <- S.readerRead reader (fromIntegral readShardRequestMaxRead)
let receivedRecordsVecs = decodeRecordBatch <$> records
return $ foldl' (\acc (_, _, _, record) -> acc <> record) V.empty receivedRecordsVecs

splitShards
:: ServerContext
-> API.SplitShardsRequest
-> IO (V.Vector API.Shard)
splitShards ServerContext{..} API.SplitShardsRequest{..} = do
sharedShardMp <- getShardMap scLDClient shardInfo splitShardsRequestStreamName
newShards <- splitShard sharedShardMp
updateShardTable newShards
return . V.map (shardToPb splitShardsRequestStreamName) $ V.fromList newShards
where
splitKey = textToShardKey splitShardsRequestSplitKey

split :: Bool -> ShardKey -> SharedShardMap -> IO (Shard, Shard)
split True key mps = splitHalf scLDClient mps key
split False key mps = splitByKey scLDClient mps key

splitShard sharedShardMp =
modifyMVar shardInfo $ \info -> do
(s1, s2) <- split splitShardsRequestHalfSplit splitKey sharedShardMp
return (HM.insert splitShardsRequestStreamName sharedShardMp info, [s1, s2])

updateShardTable newShards =
modifyMVar_ shardTable $ \mp -> do
let dict = fromMaybe M.empty $ HM.lookup splitShardsRequestStreamName mp
dict' = foldl' (\acc Shard{startKey=sKey, shardId=sId} -> M.insert sKey sId acc) dict newShards
return $ HM.insert splitShardsRequestStreamName dict' mp

mergeShards
:: ServerContext
-> API.MergeShardsRequest
-> IO API.Shard
mergeShards ServerContext{..} API.MergeShardsRequest{..} = do
sharedShardMp <- getShardMap scLDClient shardInfo mergeShardsRequestStreamName
(newShard, removedKey) <- mergeShard sharedShardMp
updateShardTable newShard removedKey
return . shardToPb mergeShardsRequestStreamName $ newShard
where
mergeShard sharedShardMp = do
modifyMVar shardInfo $ \info -> do
let [shardKey1, shardKey2] = V.toList . V.map textToShardKey $ mergeShardsRequestShardKeys
res <- mergeTwoShard scLDClient sharedShardMp shardKey1 shardKey2
return (HM.insert mergeShardsRequestStreamName sharedShardMp info, res)

updateShardTable Shard{startKey=sKey, shardId=sId} removedKey =
modifyMVar_ shardTable $ \mp -> do
let dict = fromMaybe M.empty $ HM.lookup mergeShardsRequestStreamName mp
dict' = M.insert sKey sId dict
dict'' = M.delete removedKey dict'
return $ HM.insert mergeShardsRequestStreamName dict'' mp

getShardMap :: S.LDClient -> MVar (HM.HashMap Text SharedShardMap) -> Text -> IO SharedShardMap
getShardMap client shardInfo streamName = do
let streamId = transToStreamName streamName
readMVar shardInfo >>= pure <$> HM.lookup streamName >>= \case
Just mps -> return mps
Nothing -> loadSharedShardMap client streamId

loadSharedShardMap :: S.LDClient -> S.StreamId -> IO SharedShardMap
loadSharedShardMap client streamId = do
shardIds <- M.elems <$> S.listStreamPartitions client streamId
mkSharedShardMapWithShards =<< foldM createShard [] shardIds
where
createShard acc shardId = do
attrs <- S.getStreamPartitionExtraAttrs client shardId
case getShardInfo attrs of
Nothing -> return acc
Just (sKey, eKey, epoch) -> return $ mkShard shardId streamId sKey eKey epoch : acc

getShardInfo :: M.Map CBytes CBytes -> Maybe (ShardKey, ShardKey, Word64)
getShardInfo mp = do
startHashRangeKey <- cBytesToKey <$> M.lookup startKey mp
endHashRangeKey <- cBytesToKey <$> M.lookup endKey mp
shardEpoch <- read . CB.unpack <$> M.lookup epoch mp
return (startHashRangeKey, endHashRangeKey, shardEpoch)
where
startKey = CB.pack "startKey"
endKey = CB.pack "endKey"
epoch = CB.pack "epoch"

shardToPb :: Text -> Shard -> API.Shard
shardToPb sName Shard{..} = API.Shard
{ API.shardShardId = shardId
, API.shardStreamName = sName
, API.shardStartHashRangeKey = shardKeyToText startKey
, API.shardEndHashRangeKey = shardKeyToText endKey
, API.shardEpoch = epoch
, API.shardIsActive = True
}
95 changes: 5 additions & 90 deletions hstream/src/HStream/Server/Core/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,15 @@ module HStream.Server.Core.Stream
, append
, appendStream
, append0Stream
, readShard
, listShards
, FoundSubscription (..)
, StreamExists (..)
, RecordTooBig (..)
) where

import Control.Concurrent (MVar, modifyMVar,
modifyMVar_)
import Control.Concurrent.STM (readTVarIO)
import Control.Exception (Exception (displayException),
bracket, catch, throwIO)
import Control.Monad (foldM, forM, unless, void,
when)
catch, throwIO)
import Control.Monad (foldM, forM, unless, when)
import qualified Data.ByteString as BS
import Data.Foldable (foldl')
import qualified Data.HashMap.Strict as HM
Expand All @@ -32,19 +27,17 @@ import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Vector as V
import GHC.Stack (HasCallStack)
import Proto3.Suite (Enumerated (Enumerated))
import qualified Z.Data.CBytes as CB

import Control.Concurrent (MVar, modifyMVar,
modifyMVar_)
import HStream.Common.ConsistentHashing (getAllocatedNodeId)
import qualified HStream.Logger as Log
import HStream.Server.Core.Common (decodeRecordBatch)
import HStream.Server.Exception (InvalidArgument (..),
StreamNotExist (..),
WrongServer (..))
import qualified HStream.Server.HStreamApi as API
import HStream.Server.Persistence.Object (getSubscriptionWithStream,
updateSubscription)
import HStream.Server.ReaderPool (getReader, putReader)
import HStream.Server.Shard (Shard (..), cBytesToKey,
createShard, devideKeySpace,
hashShardKey,
Expand All @@ -61,7 +54,7 @@ import HStream.Utils
-------------------------------------------------------------------------------

createStream :: HasCallStack => ServerContext -> API.Stream -> IO ()
createStream ServerContext{..} stream@API.Stream{
createStream ServerContext{..} API.Stream{
streamBacklogDuration = backlogSec, streamShardCount = shardCount, ..} = do

when (streamReplicationFactor == 0) $ throwIO (InvalidArgument "Stream replicationFactor cannot be zero")
Expand Down Expand Up @@ -163,39 +156,6 @@ appendStream ServerContext{..} API.AppendRequest {appendRequestStreamName = sNam
where
streamName = textToCBytes sName

readShard
:: HasCallStack
=> ServerContext
-> API.ReadShardRequest
-> IO (V.Vector API.ReceivedRecord)
readShard ServerContext{..} API.ReadShardRequest{..} = do
logId <- S.getUnderlyingLogId scLDClient streamId (Just shard)
startLSN <- getStartLSN logId

bracket
(getReader readerPool)
(flip S.readerStopReading logId >> putReader readerPool)
(\reader -> readData reader logId startLSN)
where
streamId = transToStreamName readShardRequestStreamName
shard = textToCBytes readShardRequestShardId

getStartLSN :: S.C_LogID -> IO S.LSN
getStartLSN logId =
case fromJust . API.shardOffsetOffset . fromJust $ readShardRequestOffset of
API.ShardOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetEARLIEST)) -> return S.LSN_MIN
API.ShardOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetLATEST)) -> (+ 1) <$> S.getTailLSN scLDClient logId
API.ShardOffsetOffsetRecordOffset API.RecordId{..} -> return recordIdBatchId
_ -> error "wrong shard offset"

readData :: S.LDReader -> S.C_LogID -> S.LSN -> IO (V.Vector API.ReceivedRecord)
readData reader logId startLSN = do
void $ S.readerStartReading reader logId startLSN (startLSN + fromIntegral readShardRequestMaxRead)
S.readerSetTimeout reader 0
records <- S.readerRead reader (fromIntegral readShardRequestMaxRead)
let receivedRecordsVecs = decodeRecordBatch <$> records
return $ foldl' (\acc (_, _, _, record) -> acc <> record) V.empty receivedRecordsVecs

--------------------------------------------------------------------------------

append0Stream :: ServerContext -> API.AppendRequest -> Maybe Text -> IO API.AppendResponse
Expand Down Expand Up @@ -249,46 +209,6 @@ getShardId client shardTable sName partitionKey = do

--------------------------------------------------------------------------------

listShards
:: HasCallStack
=> ServerContext
-> API.ListShardsRequest
-> IO (V.Vector API.Shard)
listShards ServerContext{..} API.ListShardsRequest{..} = do
shards <- M.elems <$> S.listStreamPartitions scLDClient streamId
V.foldM' getShardInfo V.empty $ V.fromList shards
where
streamId = transToStreamName listShardsRequestStreamName
startKey = CB.pack "startKey"
endKey = CB.pack "endKey"
epoch = CB.pack "epoch"

getShardInfo shards logId = do
attr <- S.getStreamPartitionExtraAttrs scLDClient logId
case getInfo attr of
Nothing -> return . V.snoc shards $
API.Shard { API.shardStreamName = listShardsRequestStreamName
, API.shardShardId = logId
, API.shardIsActive = True
}
Just(sKey, eKey, ep) -> return . V.snoc shards $
API.Shard { API.shardStreamName = listShardsRequestStreamName
, API.shardShardId = logId
, API.shardStartHashRangeKey = sKey
, API.shardEndHashRangeKey = eKey
, API.shardEpoch = ep
-- FIXME: neet a way to find if this shard is active
, API.shardIsActive = True
}

getInfo mp = do
startHashRangeKey <- cBytesToText <$> M.lookup startKey mp
endHashRangeKey <- cBytesToText <$> M.lookup endKey mp
shardEpoch <- read . CB.unpack <$> M.lookup epoch mp
return (startHashRangeKey, endHashRangeKey, shardEpoch)

--------------------------------------------------------------------------------

data FoundSubscription = FoundSubscription
deriving (Show)
instance Exception FoundSubscription
Expand All @@ -301,8 +221,3 @@ data StreamExists = StreamExists
deriving (Show)
instance Exception StreamExists where
displayException StreamExists = "StreamExists: Stream has been created"

newtype ShardKeyNotFound = ShardKeyNotFound S.C_LogID
deriving (Show)
instance Exception ShardKeyNotFound where
displayException (ShardKeyNotFound shardId) = "Can't get shardKey for shard " <> show shardId
17 changes: 16 additions & 1 deletion hstream/src/HStream/Server/Exception.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import ZooKeeper.Exception

import qualified HStream.Logger as Log
import HStream.Server.Persistence.Exception (PersistenceException)
import HStream.Server.Shard (CanNotMerge, CanNotSplit,
ShardNotExist)
import qualified HStream.Store as Store
import HStream.Utils (mkServerErrResp)

Expand Down Expand Up @@ -106,7 +108,7 @@ finalExceptionHandlers = [
Log.fatal $ Log.buildString' err
return (StatusInternal, mkStatusDetails err)
,
Handler $ \(err :: AsyncCancelled) -> do
Handler $ \(_ :: AsyncCancelled) -> do
return (StatusOk, "")
,
Handler $ \(err :: SomeException) -> do
Expand Down Expand Up @@ -144,6 +146,19 @@ zooKeeperExceptionHandler = [
Handler $ \(e :: ZooException ) -> handleZKException e StatusInternal
]

shardExceptionHandler :: Handlers (StatusCode, StatusDetails)
shardExceptionHandler = [
Handler $ \(e :: CanNotSplit) -> do
Log.fatal $ Log.buildString' e
return (StatusFailedPrecondition, mkStatusDetails e),
Handler $ \(e :: CanNotMerge) -> do
Log.fatal $ Log.buildString' e
return (StatusFailedPrecondition, mkStatusDetails e),
Handler $ \(e :: ShardNotExist) -> do
Log.fatal $ Log.buildString' e
return (StatusNotFound, mkStatusDetails e)
]

defaultHandlers :: Handlers (StatusCode, StatusDetails)
defaultHandlers = serverExceptionHandlers
++ storeExceptionHandlers
Expand Down
Loading