Skip to content

Commit 002bc55

Browse files
committed
implement split/merge shard
1 parent fac9563 commit 002bc55

File tree

6 files changed

+183
-55
lines changed

6 files changed

+183
-55
lines changed
Lines changed: 125 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,42 @@
1+
{-# LANGUAGE LambdaCase #-}
2+
13
module HStream.Server.Core.Shard
2-
( readShard,
3-
listShards,
4-
splitShards,
5-
)
4+
( readShard
5+
, listShards
6+
, splitShards
7+
, mergeShards
8+
)
69
where
710

8-
import Control.Exception (bracket)
9-
import Control.Monad (void)
10-
import Data.Foldable (foldl')
11-
import qualified Data.Map.Strict as M
12-
import qualified Data.Vector as V
13-
import GHC.Stack (HasCallStack)
14-
import Network.GRPC.HighLevel.Generated
15-
import qualified Z.Data.CBytes as CB
16-
17-
import Data.Maybe (fromJust)
18-
import HStream.Connector.HStore (transToStreamName)
19-
import HStream.Server.Exception (InvalidArgument (..),
20-
StreamNotExist (..))
21-
import HStream.Server.Handler.Common (decodeRecordBatch)
22-
import qualified HStream.Server.HStreamApi as API
23-
import HStream.Server.ReaderPool (getReader, putReader)
24-
import HStream.Server.Types (ServerContext (..))
25-
import qualified HStream.Store as S
26-
import HStream.ThirdParty.Protobuf as PB
11+
import Control.Exception (bracket)
12+
import Control.Monad (foldM, void)
13+
import Data.Foldable (foldl')
14+
import qualified Data.HashMap.Strict as HM
15+
import qualified Data.Map.Strict as M
16+
import qualified Data.Vector as V
17+
import GHC.Stack (HasCallStack)
18+
import qualified Z.Data.CBytes as CB
19+
20+
import Control.Concurrent (MVar, modifyMVar, modifyMVar_,
21+
readMVar)
22+
import Data.Maybe (fromJust, fromMaybe)
23+
import Data.Text (Text)
24+
import Data.Word (Word64)
25+
import HStream.Server.Core.Common (decodeRecordBatch)
26+
import qualified HStream.Server.HStreamApi as API
27+
import HStream.Server.ReaderPool (getReader, putReader)
28+
import HStream.Server.Shard (Shard (..), ShardKey (..),
29+
SharedShardMap, cBytesToKey,
30+
mergeTwoShard, mkShard,
31+
mkSharedShardMapWithShards,
32+
shardKeyToText, splitByKey,
33+
splitHalf, textToShardKey)
34+
import HStream.Server.Types (ServerContext (..),
35+
transToStreamName)
36+
import qualified HStream.Store as S
2737
import HStream.Utils
28-
import Proto3.Suite (Enumerated (Enumerated))
38+
import Proto3.Suite (Enumerated (Enumerated))
39+
import Z.Data.CBytes (CBytes)
2940

3041
-------------------------------------------------------------------------------
3142

@@ -36,16 +47,13 @@ listShards
3647
-> IO (V.Vector API.Shard)
3748
listShards ServerContext{..} API.ListShardsRequest{..} = do
3849
shards <- M.elems <$> S.listStreamPartitions scLDClient streamId
39-
V.foldM' getShardInfo V.empty $ V.fromList shards
50+
V.foldM' constructShard V.empty $ V.fromList shards
4051
where
4152
streamId = transToStreamName listShardsRequestStreamName
42-
startKey = CB.pack "startKey"
43-
endKey = CB.pack "endKey"
44-
epoch = CB.pack "epoch"
4553

46-
getShardInfo shards logId = do
54+
constructShard shards logId = do
4755
attr <- S.getStreamPartitionExtraAttrs scLDClient logId
48-
case getInfo attr of
56+
case getShardInfo attr of
4957
Nothing -> return . V.snoc shards $
5058
API.Shard { API.shardStreamName = listShardsRequestStreamName
5159
, API.shardShardId = logId
@@ -54,19 +62,13 @@ listShards ServerContext{..} API.ListShardsRequest{..} = do
5462
Just(sKey, eKey, ep) -> return . V.snoc shards $
5563
API.Shard { API.shardStreamName = listShardsRequestStreamName
5664
, API.shardShardId = logId
57-
, API.shardStartHashRangeKey = sKey
58-
, API.shardEndHashRangeKey = eKey
65+
, API.shardStartHashRangeKey = shardKeyToText sKey
66+
, API.shardEndHashRangeKey = shardKeyToText eKey
5967
, API.shardEpoch = ep
6068
-- FIXME: neet a way to find if this shard is active
6169
, API.shardIsActive = True
6270
}
6371

64-
getInfo mp = do
65-
startHashRangeKey <- cBytesToText <$> M.lookup startKey mp
66-
endHashRangeKey <- cBytesToText <$> M.lookup endKey mp
67-
shardEpoch <- read . CB.unpack <$> M.lookup epoch mp
68-
return (startHashRangeKey, endHashRangeKey, shardEpoch)
69-
7072
readShard
7173
:: HasCallStack
7274
=> ServerContext
@@ -101,8 +103,90 @@ readShard ServerContext{..} API.ReadShardRequest{..} = do
101103
return $ foldl' (\acc (_, _, _, record) -> acc <> record) V.empty receivedRecordsVecs
102104

103105
splitShards
104-
:: HasCallStack
105-
=> ServerContext
106+
:: ServerContext
106107
-> API.SplitShardsRequest
107108
-> IO (V.Vector API.Shard)
108-
splitShards = error ""
109+
splitShards ServerContext{..} API.SplitShardsRequest{..} = do
110+
sharedShardMp <- getShardMap scLDClient shardInfo splitShardsRequestStreamName
111+
newShards <- splitShard sharedShardMp
112+
updateShardTable newShards
113+
return . V.map (shardToPb splitShardsRequestStreamName) $ V.fromList newShards
114+
where
115+
splitKey = textToShardKey splitShardsRequestSplitKey
116+
117+
split :: Bool -> ShardKey -> SharedShardMap -> IO (Shard, Shard)
118+
split True key mps = splitHalf scLDClient mps key
119+
split False key mps = splitByKey scLDClient mps key
120+
121+
splitShard sharedShardMp =
122+
modifyMVar shardInfo $ \info -> do
123+
(s1, s2) <- split splitShardsRequestHalfSplit splitKey sharedShardMp
124+
return (HM.insert splitShardsRequestStreamName sharedShardMp info, [s1, s2])
125+
126+
updateShardTable newShards =
127+
modifyMVar_ shardTable $ \mp -> do
128+
let dict = fromMaybe M.empty $ HM.lookup splitShardsRequestStreamName mp
129+
dict' = foldl' (\acc Shard{startKey=sKey, shardId=sId} -> M.insert sKey sId acc) dict newShards
130+
return $ HM.insert splitShardsRequestStreamName dict' mp
131+
132+
mergeShards
133+
:: ServerContext
134+
-> API.MergeShardsRequest
135+
-> IO API.Shard
136+
mergeShards ServerContext{..} API.MergeShardsRequest{..} = do
137+
sharedShardMp <- getShardMap scLDClient shardInfo mergeShardsRequestStreamName
138+
(newShard, removedKey) <- mergeShard sharedShardMp
139+
updateShardTable newShard removedKey
140+
return . shardToPb mergeShardsRequestStreamName $ newShard
141+
where
142+
mergeShard sharedShardMp = do
143+
modifyMVar shardInfo $ \info -> do
144+
let [shardKey1, shardKey2] = V.toList . V.map textToShardKey $ mergeShardsRequestShardKeys
145+
res <- mergeTwoShard scLDClient sharedShardMp shardKey1 shardKey2
146+
return (HM.insert mergeShardsRequestStreamName sharedShardMp info, res)
147+
148+
updateShardTable Shard{startKey=sKey, shardId=sId} removedKey =
149+
modifyMVar_ shardTable $ \mp -> do
150+
let dict = fromMaybe M.empty $ HM.lookup mergeShardsRequestStreamName mp
151+
dict' = M.insert sKey sId dict
152+
dict'' = M.delete removedKey dict'
153+
return $ HM.insert mergeShardsRequestStreamName dict'' mp
154+
155+
getShardMap :: S.LDClient -> MVar (HM.HashMap Text SharedShardMap) -> Text -> IO SharedShardMap
156+
getShardMap client shardInfo streamName = do
157+
let streamId = transToStreamName streamName
158+
readMVar shardInfo >>= pure <$> HM.lookup streamName >>= \case
159+
Just mps -> return mps
160+
Nothing -> loadSharedShardMap client streamId
161+
162+
loadSharedShardMap :: S.LDClient -> S.StreamId -> IO SharedShardMap
163+
loadSharedShardMap client streamId = do
164+
shardIds <- M.elems <$> S.listStreamPartitions client streamId
165+
mkSharedShardMapWithShards =<< foldM createShard [] shardIds
166+
where
167+
createShard acc shardId = do
168+
attrs <- S.getStreamPartitionExtraAttrs client shardId
169+
case getShardInfo attrs of
170+
Nothing -> return acc
171+
Just (sKey, eKey, epoch) -> return $ mkShard shardId streamId sKey eKey epoch : acc
172+
173+
getShardInfo :: M.Map CBytes CBytes -> Maybe (ShardKey, ShardKey, Word64)
174+
getShardInfo mp = do
175+
startHashRangeKey <- cBytesToKey <$> M.lookup startKey mp
176+
endHashRangeKey <- cBytesToKey <$> M.lookup endKey mp
177+
shardEpoch <- read . CB.unpack <$> M.lookup epoch mp
178+
return (startHashRangeKey, endHashRangeKey, shardEpoch)
179+
where
180+
startKey = CB.pack "startKey"
181+
endKey = CB.pack "endKey"
182+
epoch = CB.pack "epoch"
183+
184+
shardToPb :: Text -> Shard -> API.Shard
185+
shardToPb sName Shard{..} = API.Shard
186+
{ API.shardShardId = shardId
187+
, API.shardStreamName = sName
188+
, API.shardStartHashRangeKey = shardKeyToText startKey
189+
, API.shardEndHashRangeKey = shardKeyToText endKey
190+
, API.shardEpoch = epoch
191+
, API.shardIsActive = True
192+
}

hstream/src/HStream/Server/Core/Stream.hs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,11 @@ import Data.Text (Text)
2727
import qualified Data.Text as Text
2828
import qualified Data.Vector as V
2929
import GHC.Stack (HasCallStack)
30-
import Network.GRPC.HighLevel.Generated
31-
import Proto3.Suite (Enumerated (Enumerated))
32-
import qualified Z.Data.CBytes as CB
3330

3431
import Control.Concurrent (MVar, modifyMVar,
3532
modifyMVar_)
36-
import HStream.Connector.HStore (transToStreamName)
3733
import HStream.Common.ConsistentHashing (getAllocatedNodeId)
3834
import qualified HStream.Logger as Log
39-
import HStream.Server.Core.Common (decodeRecordBatch)
4035
import HStream.Server.Exception (InvalidArgument (..),
4136
StreamNotExist (..),
4237
WrongServer (..))
@@ -54,13 +49,12 @@ import HStream.Server.Types (ServerContext (..),
5449
transToStreamName)
5550
import qualified HStream.Stats as Stats
5651
import qualified HStream.Store as S
57-
import HStream.ThirdParty.Protobuf as PB
5852
import HStream.Utils
5953

6054
-------------------------------------------------------------------------------
6155

6256
createStream :: HasCallStack => ServerContext -> API.Stream -> IO ()
63-
createStream ServerContext{..} stream@API.Stream{
57+
createStream ServerContext{..} API.Stream{
6458
streamBacklogDuration = backlogSec, streamShardCount = shardCount, ..} = do
6559

6660
when (streamReplicationFactor == 0) $ throwIO (InvalidArgument "Stream replicationFactor cannot be zero")

hstream/src/HStream/Server/Exception.hs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import ZooKeeper.Exception
2121

2222
import qualified HStream.Logger as Log
2323
import HStream.Server.Persistence.Exception (PersistenceException)
24+
import HStream.Server.Shard (CanNotMerge, CanNotSplit,
25+
ShardNotExist)
2426
import qualified HStream.Store as Store
2527
import HStream.Utils (mkServerErrResp)
2628

@@ -106,7 +108,7 @@ finalExceptionHandlers = [
106108
Log.fatal $ Log.buildString' err
107109
return (StatusInternal, mkStatusDetails err)
108110
,
109-
Handler $ \(err :: AsyncCancelled) -> do
111+
Handler $ \(_ :: AsyncCancelled) -> do
110112
return (StatusOk, "")
111113
,
112114
Handler $ \(err :: SomeException) -> do
@@ -144,6 +146,19 @@ zooKeeperExceptionHandler = [
144146
Handler $ \(e :: ZooException ) -> handleZKException e StatusInternal
145147
]
146148

149+
shardExceptionHandler :: Handlers (StatusCode, StatusDetails)
150+
shardExceptionHandler = [
151+
Handler $ \(e :: CanNotSplit) -> do
152+
Log.fatal $ Log.buildString' e
153+
return (StatusFailedPrecondition, mkStatusDetails e),
154+
Handler $ \(e :: CanNotMerge) -> do
155+
Log.fatal $ Log.buildString' e
156+
return (StatusFailedPrecondition, mkStatusDetails e),
157+
Handler $ \(e :: ShardNotExist) -> do
158+
Log.fatal $ Log.buildString' e
159+
return (StatusNotFound, mkStatusDetails e)
160+
]
161+
147162
defaultHandlers :: Handlers (StatusCode, StatusDetails)
148163
defaultHandlers = serverExceptionHandlers
149164
++ storeExceptionHandlers

hstream/src/HStream/Server/Handler.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ handlers serverContext@ServerContext{..} =
5555
hstreamApiReadShard = readShardHandler serverContext,
5656
hstreamApiListShards = listShardsHandler serverContext,
5757
hstreamApiSplitShards = splitShardsHandler serverContext,
58+
hstreamApiMergeShards = mergeShardsHandler serverContext,
5859

5960
-- Stats
6061
hstreamApiPerStreamTimeSeriesStats = H.perStreamTimeSeriesStats scStatsHolder,

hstream/src/HStream/Server/Handler/Shard.hs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@ module HStream.Server.Handler.Shard
99
( listShardsHandler
1010
, readShardHandler
1111
, splitShardsHandler
12+
, mergeShardsHandler
1213
)
1314
where
1415

1516
import Control.Exception
1617
import Network.GRPC.HighLevel.Generated
1718

19+
import Control.Monad (when)
20+
import qualified Data.Vector as V
1821
import qualified HStream.Logger as Log
1922
import qualified HStream.Server.Core.Shard as C
2023
import HStream.Server.Exception
@@ -29,7 +32,7 @@ listShardsHandler
2932
:: ServerContext
3033
-> ServerRequest 'Normal ListShardsRequest ListShardsResponse
3134
-> IO (ServerResponse 'Normal ListShardsResponse)
32-
listShardsHandler sc (ServerNormalRequest _metadata request) = do
35+
listShardsHandler sc (ServerNormalRequest _metadata request) = defaultExceptionHandle $ do
3336
Log.debug "Receive List Shards Request"
3437
C.listShards sc request >>= returnResp . ListShardsResponse
3538

@@ -45,14 +48,35 @@ splitShardsHandler
4548
:: ServerContext
4649
-> ServerRequest 'Normal SplitShardsRequest SplitShardsResponse
4750
-> IO (ServerResponse 'Normal SplitShardsResponse)
48-
splitShardsHandler sc (ServerNormalRequest _metadata request) = do
51+
splitShardsHandler sc (ServerNormalRequest _metadata request) = shardExceptionHandle $ do
4952
Log.debug $ "Receive Split Shards Request: " <> Log.buildString' (show request)
5053
C.splitShards sc request >>= returnResp . SplitShardsResponse
5154

55+
mergeShardsHandler
56+
:: ServerContext
57+
-> ServerRequest 'Normal MergeShardsRequest MergeShardsResponse
58+
-> IO (ServerResponse 'Normal MergeShardsResponse)
59+
mergeShardsHandler sc (ServerNormalRequest _metadata request@MergeShardsRequest{..}) = shardExceptionHandle $ do
60+
Log.debug $ "Receive Merge Shards Request: " <> Log.buildString' (show request)
61+
when (V.length mergeShardsRequestShardKeys /= 2) $ throwIO WrongShardCnt
62+
C.mergeShards sc request >>= returnResp . MergeShardsResponse . Just
63+
5264
-----------------------------------------------------------------------------------
5365

5466
readShardExceptionHandle :: ExceptionHandle (ServerResponse 'Normal a)
5567
readShardExceptionHandle = mkExceptionHandle . setRespType mkServerErrResp $
5668
[ Handler (\(err :: Store.NOTFOUND) ->
5769
return (StatusUnavailable, mkStatusDetails err))
5870
] ++ defaultHandlers
71+
72+
shardExceptionHandle :: ExceptionHandle (ServerResponse 'Normal a)
73+
shardExceptionHandle = mkExceptionHandle . setRespType mkServerErrResp $
74+
[ Handler (\(err :: WrongShardCnt) ->
75+
return (StatusInvalidArgument, mkStatusDetails err))
76+
] ++ shardExceptionHandler ++ defaultHandlers
77+
78+
data WrongShardCnt = WrongShardCnt
79+
instance Show WrongShardCnt where
80+
show _ = "Only two shards can be merged at a time"
81+
82+
instance Exception WrongShardCnt where

0 commit comments

Comments
 (0)