From fac95631812f0b310c52fd9ee92238b81b8a6d2d Mon Sep 17 00:00:00 2001 From: YangKian <1207783292@qq.com> Date: Wed, 13 Jul 2022 10:54:13 +0800 Subject: [PATCH 1/2] init shard module --- hstream/hstream.cabal | 2 + hstream/src/HStream/Server/Core/Shard.hs | 108 +++++++++++++++++++ hstream/src/HStream/Server/Core/Stream.hs | 93 ++-------------- hstream/src/HStream/Server/Handler.hs | 4 +- hstream/src/HStream/Server/Handler/Shard.hs | 58 ++++++++++ hstream/src/HStream/Server/Handler/Stream.hs | 25 ----- 6 files changed, 178 insertions(+), 112 deletions(-) create mode 100644 hstream/src/HStream/Server/Core/Shard.hs create mode 100644 hstream/src/HStream/Server/Handler/Shard.hs diff --git a/hstream/hstream.cabal b/hstream/hstream.cabal index 04eda3ff9..d731bf49d 100644 --- a/hstream/hstream.cabal +++ b/hstream/hstream.cabal @@ -67,6 +67,7 @@ library HStream.Server.Core.Stream HStream.Server.Core.Subscription HStream.Server.Core.View + HStream.Server.Core.Shard HStream.Server.Handler.Admin HStream.Server.Handler.Cluster HStream.Server.Handler.Connector @@ -76,6 +77,7 @@ library HStream.Server.Handler.Stream HStream.Server.Handler.Subscription HStream.Server.Handler.View + HStream.Server.Handler.Shard HStream.Server.Persistence.Common HStream.Server.Persistence.MemoryStore HStream.Server.Persistence.Object diff --git a/hstream/src/HStream/Server/Core/Shard.hs b/hstream/src/HStream/Server/Core/Shard.hs new file mode 100644 index 000000000..e1d2329fb --- /dev/null +++ b/hstream/src/HStream/Server/Core/Shard.hs @@ -0,0 +1,108 @@ +module HStream.Server.Core.Shard + ( readShard, + listShards, + splitShards, + ) +where + +import Control.Exception (bracket) +import Control.Monad (void) +import Data.Foldable (foldl') +import qualified Data.Map.Strict as M +import qualified Data.Vector as V +import GHC.Stack (HasCallStack) +import Network.GRPC.HighLevel.Generated +import qualified Z.Data.CBytes as CB + +import Data.Maybe (fromJust) +import HStream.Connector.HStore (transToStreamName) +import HStream.Server.Exception (InvalidArgument (..), + StreamNotExist (..)) +import HStream.Server.Handler.Common (decodeRecordBatch) +import qualified HStream.Server.HStreamApi as API +import HStream.Server.ReaderPool (getReader, putReader) +import HStream.Server.Types (ServerContext (..)) +import qualified HStream.Store as S +import HStream.ThirdParty.Protobuf as PB +import HStream.Utils +import Proto3.Suite (Enumerated (Enumerated)) + +------------------------------------------------------------------------------- + +listShards + :: HasCallStack + => ServerContext + -> API.ListShardsRequest + -> IO (V.Vector API.Shard) +listShards ServerContext{..} API.ListShardsRequest{..} = do + shards <- M.elems <$> S.listStreamPartitions scLDClient streamId + V.foldM' getShardInfo V.empty $ V.fromList shards + where + streamId = transToStreamName listShardsRequestStreamName + startKey = CB.pack "startKey" + endKey = CB.pack "endKey" + epoch = CB.pack "epoch" + + getShardInfo shards logId = do + attr <- S.getStreamPartitionExtraAttrs scLDClient logId + case getInfo attr of + Nothing -> return . V.snoc shards $ + API.Shard { API.shardStreamName = listShardsRequestStreamName + , API.shardShardId = logId + , API.shardIsActive = True + } + Just(sKey, eKey, ep) -> return . V.snoc shards $ + API.Shard { API.shardStreamName = listShardsRequestStreamName + , API.shardShardId = logId + , API.shardStartHashRangeKey = sKey + , API.shardEndHashRangeKey = eKey + , API.shardEpoch = ep + -- FIXME: neet a way to find if this shard is active + , API.shardIsActive = True + } + + getInfo mp = do + startHashRangeKey <- cBytesToText <$> M.lookup startKey mp + endHashRangeKey <- cBytesToText <$> M.lookup endKey mp + shardEpoch <- read . CB.unpack <$> M.lookup epoch mp + return (startHashRangeKey, endHashRangeKey, shardEpoch) + +readShard + :: HasCallStack + => ServerContext + -> API.ReadShardRequest + -> IO (V.Vector API.ReceivedRecord) +readShard ServerContext{..} API.ReadShardRequest{..} = do + logId <- S.getUnderlyingLogId scLDClient streamId (Just shard) + startLSN <- getStartLSN logId + + bracket + (getReader readerPool) + (flip S.readerStopReading logId >> putReader readerPool) + (\reader -> readData reader logId startLSN) + where + streamId = transToStreamName readShardRequestStreamName + shard = textToCBytes readShardRequestShardId + + getStartLSN :: S.C_LogID -> IO S.LSN + getStartLSN logId = + case fromJust . API.shardOffsetOffset . fromJust $ readShardRequestOffset of + API.ShardOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetEARLIEST)) -> return S.LSN_MIN + API.ShardOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetLATEST)) -> (+ 1) <$> S.getTailLSN scLDClient logId + API.ShardOffsetOffsetRecordOffset API.RecordId{..} -> return recordIdBatchId + _ -> error "wrong shard offset" + + readData :: S.LDReader -> S.C_LogID -> S.LSN -> IO (V.Vector API.ReceivedRecord) + readData reader logId startLSN = do + void $ S.readerStartReading reader logId startLSN (startLSN + fromIntegral readShardRequestMaxRead) + S.readerSetTimeout reader 0 + records <- S.readerRead reader (fromIntegral readShardRequestMaxRead) + let receivedRecordsVecs = decodeRecordBatch <$> records + return $ foldl' (\acc (_, _, _, record) -> acc <> record) V.empty receivedRecordsVecs + +splitShards + :: HasCallStack + => ServerContext + -> API.SplitShardsRequest + -> IO (V.Vector API.Shard) +splitShards = error "" diff --git a/hstream/src/HStream/Server/Core/Stream.hs b/hstream/src/HStream/Server/Core/Stream.hs index f0a1c4fa0..24653bc9f 100644 --- a/hstream/src/HStream/Server/Core/Stream.hs +++ b/hstream/src/HStream/Server/Core/Stream.hs @@ -9,20 +9,15 @@ module HStream.Server.Core.Stream , append , appendStream , append0Stream - , readShard - , listShards , FoundSubscription (..) , StreamExists (..) , RecordTooBig (..) ) where -import Control.Concurrent (MVar, modifyMVar, - modifyMVar_) import Control.Concurrent.STM (readTVarIO) import Control.Exception (Exception (displayException), - bracket, catch, throwIO) -import Control.Monad (foldM, forM, unless, void, - when) + catch, throwIO) +import Control.Monad (foldM, forM, unless, when) import qualified Data.ByteString as BS import Data.Foldable (foldl') import qualified Data.HashMap.Strict as HM @@ -32,9 +27,13 @@ import Data.Text (Text) import qualified Data.Text as Text import qualified Data.Vector as V import GHC.Stack (HasCallStack) +import Network.GRPC.HighLevel.Generated import Proto3.Suite (Enumerated (Enumerated)) import qualified Z.Data.CBytes as CB +import Control.Concurrent (MVar, modifyMVar, + modifyMVar_) +import HStream.Connector.HStore (transToStreamName) import HStream.Common.ConsistentHashing (getAllocatedNodeId) import qualified HStream.Logger as Log import HStream.Server.Core.Common (decodeRecordBatch) @@ -44,7 +43,6 @@ import HStream.Server.Exception (InvalidArgument (..), import qualified HStream.Server.HStreamApi as API import HStream.Server.Persistence.Object (getSubscriptionWithStream, updateSubscription) -import HStream.Server.ReaderPool (getReader, putReader) import HStream.Server.Shard (Shard (..), cBytesToKey, createShard, devideKeySpace, hashShardKey, @@ -56,6 +54,7 @@ import HStream.Server.Types (ServerContext (..), transToStreamName) import qualified HStream.Stats as Stats import qualified HStream.Store as S +import HStream.ThirdParty.Protobuf as PB import HStream.Utils ------------------------------------------------------------------------------- @@ -163,39 +162,6 @@ appendStream ServerContext{..} API.AppendRequest {appendRequestStreamName = sNam where streamName = textToCBytes sName -readShard - :: HasCallStack - => ServerContext - -> API.ReadShardRequest - -> IO (V.Vector API.ReceivedRecord) -readShard ServerContext{..} API.ReadShardRequest{..} = do - logId <- S.getUnderlyingLogId scLDClient streamId (Just shard) - startLSN <- getStartLSN logId - - bracket - (getReader readerPool) - (flip S.readerStopReading logId >> putReader readerPool) - (\reader -> readData reader logId startLSN) - where - streamId = transToStreamName readShardRequestStreamName - shard = textToCBytes readShardRequestShardId - - getStartLSN :: S.C_LogID -> IO S.LSN - getStartLSN logId = - case fromJust . API.shardOffsetOffset . fromJust $ readShardRequestOffset of - API.ShardOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetEARLIEST)) -> return S.LSN_MIN - API.ShardOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetLATEST)) -> (+ 1) <$> S.getTailLSN scLDClient logId - API.ShardOffsetOffsetRecordOffset API.RecordId{..} -> return recordIdBatchId - _ -> error "wrong shard offset" - - readData :: S.LDReader -> S.C_LogID -> S.LSN -> IO (V.Vector API.ReceivedRecord) - readData reader logId startLSN = do - void $ S.readerStartReading reader logId startLSN (startLSN + fromIntegral readShardRequestMaxRead) - S.readerSetTimeout reader 0 - records <- S.readerRead reader (fromIntegral readShardRequestMaxRead) - let receivedRecordsVecs = decodeRecordBatch <$> records - return $ foldl' (\acc (_, _, _, record) -> acc <> record) V.empty receivedRecordsVecs - -------------------------------------------------------------------------------- append0Stream :: ServerContext -> API.AppendRequest -> Maybe Text -> IO API.AppendResponse @@ -249,46 +215,6 @@ getShardId client shardTable sName partitionKey = do -------------------------------------------------------------------------------- -listShards - :: HasCallStack - => ServerContext - -> API.ListShardsRequest - -> IO (V.Vector API.Shard) -listShards ServerContext{..} API.ListShardsRequest{..} = do - shards <- M.elems <$> S.listStreamPartitions scLDClient streamId - V.foldM' getShardInfo V.empty $ V.fromList shards - where - streamId = transToStreamName listShardsRequestStreamName - startKey = CB.pack "startKey" - endKey = CB.pack "endKey" - epoch = CB.pack "epoch" - - getShardInfo shards logId = do - attr <- S.getStreamPartitionExtraAttrs scLDClient logId - case getInfo attr of - Nothing -> return . V.snoc shards $ - API.Shard { API.shardStreamName = listShardsRequestStreamName - , API.shardShardId = logId - , API.shardIsActive = True - } - Just(sKey, eKey, ep) -> return . V.snoc shards $ - API.Shard { API.shardStreamName = listShardsRequestStreamName - , API.shardShardId = logId - , API.shardStartHashRangeKey = sKey - , API.shardEndHashRangeKey = eKey - , API.shardEpoch = ep - -- FIXME: neet a way to find if this shard is active - , API.shardIsActive = True - } - - getInfo mp = do - startHashRangeKey <- cBytesToText <$> M.lookup startKey mp - endHashRangeKey <- cBytesToText <$> M.lookup endKey mp - shardEpoch <- read . CB.unpack <$> M.lookup epoch mp - return (startHashRangeKey, endHashRangeKey, shardEpoch) - --------------------------------------------------------------------------------- - data FoundSubscription = FoundSubscription deriving (Show) instance Exception FoundSubscription @@ -301,8 +227,3 @@ data StreamExists = StreamExists deriving (Show) instance Exception StreamExists where displayException StreamExists = "StreamExists: Stream has been created" - -newtype ShardKeyNotFound = ShardKeyNotFound S.C_LogID - deriving (Show) -instance Exception ShardKeyNotFound where - displayException (ShardKeyNotFound shardId) = "Can't get shardKey for shard " <> show shardId diff --git a/hstream/src/HStream/Server/Handler.hs b/hstream/src/HStream/Server/Handler.hs index 8f2d70a0a..d4f495759 100644 --- a/hstream/src/HStream/Server/Handler.hs +++ b/hstream/src/HStream/Server/Handler.hs @@ -17,6 +17,7 @@ import HStream.Server.Handler.Admin import HStream.Server.Handler.Cluster import HStream.Server.Handler.Connector import HStream.Server.Handler.Query +import HStream.Server.Handler.Shard import qualified HStream.Server.Handler.Stats as H import HStream.Server.Handler.StoreAdmin import HStream.Server.Handler.Stream @@ -50,9 +51,10 @@ handlers serverContext@ServerContext{..} = hstreamApiStreamingFetch = streamingFetchHandler serverContext, - hstreamApiReadShard = readShardHandler serverContext, -- Shards + hstreamApiReadShard = readShardHandler serverContext, hstreamApiListShards = listShardsHandler serverContext, + hstreamApiSplitShards = splitShardsHandler serverContext, -- Stats hstreamApiPerStreamTimeSeriesStats = H.perStreamTimeSeriesStats scStatsHolder, diff --git a/hstream/src/HStream/Server/Handler/Shard.hs b/hstream/src/HStream/Server/Handler/Shard.hs new file mode 100644 index 000000000..dac12e26a --- /dev/null +++ b/hstream/src/HStream/Server/Handler/Shard.hs @@ -0,0 +1,58 @@ +{-# LANGUAGE BlockArguments #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module HStream.Server.Handler.Shard + ( listShardsHandler + , readShardHandler + , splitShardsHandler + ) +where + +import Control.Exception +import Network.GRPC.HighLevel.Generated + +import qualified HStream.Logger as Log +import qualified HStream.Server.Core.Shard as C +import HStream.Server.Exception +import HStream.Server.HStreamApi +import HStream.Server.Types (ServerContext (..)) +import qualified HStream.Store as Store +import HStream.Utils + +----------------------------------------------------------------------------------- + +listShardsHandler + :: ServerContext + -> ServerRequest 'Normal ListShardsRequest ListShardsResponse + -> IO (ServerResponse 'Normal ListShardsResponse) +listShardsHandler sc (ServerNormalRequest _metadata request) = do + Log.debug "Receive List Shards Request" + C.listShards sc request >>= returnResp . ListShardsResponse + +readShardHandler + :: ServerContext + -> ServerRequest 'Normal ReadShardRequest ReadShardResponse + -> IO (ServerResponse 'Normal ReadShardResponse) +readShardHandler sc (ServerNormalRequest _metadata request) = readShardExceptionHandle $ do + Log.debug $ "Receive read shard Request: " <> Log.buildString (show request) + C.readShard sc request >>= returnResp . ReadShardResponse + +splitShardsHandler + :: ServerContext + -> ServerRequest 'Normal SplitShardsRequest SplitShardsResponse + -> IO (ServerResponse 'Normal SplitShardsResponse) +splitShardsHandler sc (ServerNormalRequest _metadata request) = do + Log.debug $ "Receive Split Shards Request: " <> Log.buildString' (show request) + C.splitShards sc request >>= returnResp . SplitShardsResponse + +----------------------------------------------------------------------------------- + +readShardExceptionHandle :: ExceptionHandle (ServerResponse 'Normal a) +readShardExceptionHandle = mkExceptionHandle . setRespType mkServerErrResp $ + [ Handler (\(err :: Store.NOTFOUND) -> + return (StatusUnavailable, mkStatusDetails err)) + ] ++ defaultHandlers diff --git a/hstream/src/HStream/Server/Handler/Stream.hs b/hstream/src/HStream/Server/Handler/Stream.hs index c2c0473dd..de829935c 100644 --- a/hstream/src/HStream/Server/Handler/Stream.hs +++ b/hstream/src/HStream/Server/Handler/Stream.hs @@ -9,10 +9,8 @@ module HStream.Server.Handler.Stream ( createStreamHandler , deleteStreamHandler , listStreamsHandler - , listShardsHandler , appendHandler , append0Handler - , readShardHandler ) where @@ -104,23 +102,6 @@ append0Handler sc@ServerContext{..} (ServerNormalRequest _metadata request@Appen inc_failed = Stats.stream_stat_add_append_failed scStatsHolder cStreamName 1 cStreamName = textToCBytes appendRequestStreamName - -listShardsHandler - :: ServerContext - -> ServerRequest 'Normal ListShardsRequest ListShardsResponse - -> IO (ServerResponse 'Normal ListShardsResponse) -listShardsHandler sc (ServerNormalRequest _metadata request) = do - Log.debug "Receive List Shards Request" - C.listShards sc request >>= returnResp . ListShardsResponse - -readShardHandler - :: ServerContext - -> ServerRequest 'Normal ReadShardRequest ReadShardResponse - -> IO (ServerResponse 'Normal ReadShardResponse) -readShardHandler sc (ServerNormalRequest _metadata request) = readShardExceptionHandle $ do - Log.debug $ "Receive read shard Request: " <> Log.buildString (show request) - C.readShard sc request >>= returnResp . ReadShardResponse - -------------------------------------------------------------------------------- -- Exception Handlers @@ -163,9 +144,3 @@ deleteStreamExceptionHandle = mkExceptionHandle . setRespType mkServerErrResp $ Log.warning $ Log.buildString' err return (StatusFailedPrecondition, "Stream still has subscription")) ] - -readShardExceptionHandle :: ExceptionHandle (ServerResponse 'Normal a) -readShardExceptionHandle = mkExceptionHandle . setRespType mkServerErrResp $ - [ Handler (\(err :: Store.NOTFOUND) -> - return (StatusUnavailable, mkStatusDetails err)) - ] ++ defaultHandlers From 002bc55c4d4eedd904522115c6aad3a058853b77 Mon Sep 17 00:00:00 2001 From: YangKian <1207783292@qq.com> Date: Wed, 13 Jul 2022 17:21:33 +0800 Subject: [PATCH 2/2] implement split/merge shard --- hstream/src/HStream/Server/Core/Shard.hs | 166 +++++++++++++++----- hstream/src/HStream/Server/Core/Stream.hs | 8 +- hstream/src/HStream/Server/Exception.hs | 17 +- hstream/src/HStream/Server/Handler.hs | 1 + hstream/src/HStream/Server/Handler/Shard.hs | 28 +++- hstream/src/HStream/Server/Shard.hs | 18 ++- 6 files changed, 183 insertions(+), 55 deletions(-) diff --git a/hstream/src/HStream/Server/Core/Shard.hs b/hstream/src/HStream/Server/Core/Shard.hs index e1d2329fb..3ce495b0d 100644 --- a/hstream/src/HStream/Server/Core/Shard.hs +++ b/hstream/src/HStream/Server/Core/Shard.hs @@ -1,31 +1,42 @@ +{-# LANGUAGE LambdaCase #-} + module HStream.Server.Core.Shard - ( readShard, - listShards, - splitShards, - ) + ( readShard + , listShards + , splitShards + , mergeShards + ) where -import Control.Exception (bracket) -import Control.Monad (void) -import Data.Foldable (foldl') -import qualified Data.Map.Strict as M -import qualified Data.Vector as V -import GHC.Stack (HasCallStack) -import Network.GRPC.HighLevel.Generated -import qualified Z.Data.CBytes as CB - -import Data.Maybe (fromJust) -import HStream.Connector.HStore (transToStreamName) -import HStream.Server.Exception (InvalidArgument (..), - StreamNotExist (..)) -import HStream.Server.Handler.Common (decodeRecordBatch) -import qualified HStream.Server.HStreamApi as API -import HStream.Server.ReaderPool (getReader, putReader) -import HStream.Server.Types (ServerContext (..)) -import qualified HStream.Store as S -import HStream.ThirdParty.Protobuf as PB +import Control.Exception (bracket) +import Control.Monad (foldM, void) +import Data.Foldable (foldl') +import qualified Data.HashMap.Strict as HM +import qualified Data.Map.Strict as M +import qualified Data.Vector as V +import GHC.Stack (HasCallStack) +import qualified Z.Data.CBytes as CB + +import Control.Concurrent (MVar, modifyMVar, modifyMVar_, + readMVar) +import Data.Maybe (fromJust, fromMaybe) +import Data.Text (Text) +import Data.Word (Word64) +import HStream.Server.Core.Common (decodeRecordBatch) +import qualified HStream.Server.HStreamApi as API +import HStream.Server.ReaderPool (getReader, putReader) +import HStream.Server.Shard (Shard (..), ShardKey (..), + SharedShardMap, cBytesToKey, + mergeTwoShard, mkShard, + mkSharedShardMapWithShards, + shardKeyToText, splitByKey, + splitHalf, textToShardKey) +import HStream.Server.Types (ServerContext (..), + transToStreamName) +import qualified HStream.Store as S import HStream.Utils -import Proto3.Suite (Enumerated (Enumerated)) +import Proto3.Suite (Enumerated (Enumerated)) +import Z.Data.CBytes (CBytes) ------------------------------------------------------------------------------- @@ -36,16 +47,13 @@ listShards -> IO (V.Vector API.Shard) listShards ServerContext{..} API.ListShardsRequest{..} = do shards <- M.elems <$> S.listStreamPartitions scLDClient streamId - V.foldM' getShardInfo V.empty $ V.fromList shards + V.foldM' constructShard V.empty $ V.fromList shards where streamId = transToStreamName listShardsRequestStreamName - startKey = CB.pack "startKey" - endKey = CB.pack "endKey" - epoch = CB.pack "epoch" - getShardInfo shards logId = do + constructShard shards logId = do attr <- S.getStreamPartitionExtraAttrs scLDClient logId - case getInfo attr of + case getShardInfo attr of Nothing -> return . V.snoc shards $ API.Shard { API.shardStreamName = listShardsRequestStreamName , API.shardShardId = logId @@ -54,19 +62,13 @@ listShards ServerContext{..} API.ListShardsRequest{..} = do Just(sKey, eKey, ep) -> return . V.snoc shards $ API.Shard { API.shardStreamName = listShardsRequestStreamName , API.shardShardId = logId - , API.shardStartHashRangeKey = sKey - , API.shardEndHashRangeKey = eKey + , API.shardStartHashRangeKey = shardKeyToText sKey + , API.shardEndHashRangeKey = shardKeyToText eKey , API.shardEpoch = ep -- FIXME: neet a way to find if this shard is active , API.shardIsActive = True } - getInfo mp = do - startHashRangeKey <- cBytesToText <$> M.lookup startKey mp - endHashRangeKey <- cBytesToText <$> M.lookup endKey mp - shardEpoch <- read . CB.unpack <$> M.lookup epoch mp - return (startHashRangeKey, endHashRangeKey, shardEpoch) - readShard :: HasCallStack => ServerContext @@ -101,8 +103,90 @@ readShard ServerContext{..} API.ReadShardRequest{..} = do return $ foldl' (\acc (_, _, _, record) -> acc <> record) V.empty receivedRecordsVecs splitShards - :: HasCallStack - => ServerContext + :: ServerContext -> API.SplitShardsRequest -> IO (V.Vector API.Shard) -splitShards = error "" +splitShards ServerContext{..} API.SplitShardsRequest{..} = do + sharedShardMp <- getShardMap scLDClient shardInfo splitShardsRequestStreamName + newShards <- splitShard sharedShardMp + updateShardTable newShards + return . V.map (shardToPb splitShardsRequestStreamName) $ V.fromList newShards + where + splitKey = textToShardKey splitShardsRequestSplitKey + + split :: Bool -> ShardKey -> SharedShardMap -> IO (Shard, Shard) + split True key mps = splitHalf scLDClient mps key + split False key mps = splitByKey scLDClient mps key + + splitShard sharedShardMp = + modifyMVar shardInfo $ \info -> do + (s1, s2) <- split splitShardsRequestHalfSplit splitKey sharedShardMp + return (HM.insert splitShardsRequestStreamName sharedShardMp info, [s1, s2]) + + updateShardTable newShards = + modifyMVar_ shardTable $ \mp -> do + let dict = fromMaybe M.empty $ HM.lookup splitShardsRequestStreamName mp + dict' = foldl' (\acc Shard{startKey=sKey, shardId=sId} -> M.insert sKey sId acc) dict newShards + return $ HM.insert splitShardsRequestStreamName dict' mp + +mergeShards + :: ServerContext + -> API.MergeShardsRequest + -> IO API.Shard +mergeShards ServerContext{..} API.MergeShardsRequest{..} = do + sharedShardMp <- getShardMap scLDClient shardInfo mergeShardsRequestStreamName + (newShard, removedKey) <- mergeShard sharedShardMp + updateShardTable newShard removedKey + return . shardToPb mergeShardsRequestStreamName $ newShard + where + mergeShard sharedShardMp = do + modifyMVar shardInfo $ \info -> do + let [shardKey1, shardKey2] = V.toList . V.map textToShardKey $ mergeShardsRequestShardKeys + res <- mergeTwoShard scLDClient sharedShardMp shardKey1 shardKey2 + return (HM.insert mergeShardsRequestStreamName sharedShardMp info, res) + + updateShardTable Shard{startKey=sKey, shardId=sId} removedKey = + modifyMVar_ shardTable $ \mp -> do + let dict = fromMaybe M.empty $ HM.lookup mergeShardsRequestStreamName mp + dict' = M.insert sKey sId dict + dict'' = M.delete removedKey dict' + return $ HM.insert mergeShardsRequestStreamName dict'' mp + +getShardMap :: S.LDClient -> MVar (HM.HashMap Text SharedShardMap) -> Text -> IO SharedShardMap +getShardMap client shardInfo streamName = do + let streamId = transToStreamName streamName + readMVar shardInfo >>= pure <$> HM.lookup streamName >>= \case + Just mps -> return mps + Nothing -> loadSharedShardMap client streamId + +loadSharedShardMap :: S.LDClient -> S.StreamId -> IO SharedShardMap +loadSharedShardMap client streamId = do + shardIds <- M.elems <$> S.listStreamPartitions client streamId + mkSharedShardMapWithShards =<< foldM createShard [] shardIds + where + createShard acc shardId = do + attrs <- S.getStreamPartitionExtraAttrs client shardId + case getShardInfo attrs of + Nothing -> return acc + Just (sKey, eKey, epoch) -> return $ mkShard shardId streamId sKey eKey epoch : acc + +getShardInfo :: M.Map CBytes CBytes -> Maybe (ShardKey, ShardKey, Word64) +getShardInfo mp = do + startHashRangeKey <- cBytesToKey <$> M.lookup startKey mp + endHashRangeKey <- cBytesToKey <$> M.lookup endKey mp + shardEpoch <- read . CB.unpack <$> M.lookup epoch mp + return (startHashRangeKey, endHashRangeKey, shardEpoch) + where + startKey = CB.pack "startKey" + endKey = CB.pack "endKey" + epoch = CB.pack "epoch" + +shardToPb :: Text -> Shard -> API.Shard +shardToPb sName Shard{..} = API.Shard + { API.shardShardId = shardId + , API.shardStreamName = sName + , API.shardStartHashRangeKey = shardKeyToText startKey + , API.shardEndHashRangeKey = shardKeyToText endKey + , API.shardEpoch = epoch + , API.shardIsActive = True + } diff --git a/hstream/src/HStream/Server/Core/Stream.hs b/hstream/src/HStream/Server/Core/Stream.hs index 24653bc9f..954a54348 100644 --- a/hstream/src/HStream/Server/Core/Stream.hs +++ b/hstream/src/HStream/Server/Core/Stream.hs @@ -27,16 +27,11 @@ import Data.Text (Text) import qualified Data.Text as Text import qualified Data.Vector as V import GHC.Stack (HasCallStack) -import Network.GRPC.HighLevel.Generated -import Proto3.Suite (Enumerated (Enumerated)) -import qualified Z.Data.CBytes as CB import Control.Concurrent (MVar, modifyMVar, modifyMVar_) -import HStream.Connector.HStore (transToStreamName) import HStream.Common.ConsistentHashing (getAllocatedNodeId) import qualified HStream.Logger as Log -import HStream.Server.Core.Common (decodeRecordBatch) import HStream.Server.Exception (InvalidArgument (..), StreamNotExist (..), WrongServer (..)) @@ -54,13 +49,12 @@ import HStream.Server.Types (ServerContext (..), transToStreamName) import qualified HStream.Stats as Stats import qualified HStream.Store as S -import HStream.ThirdParty.Protobuf as PB import HStream.Utils ------------------------------------------------------------------------------- createStream :: HasCallStack => ServerContext -> API.Stream -> IO () -createStream ServerContext{..} stream@API.Stream{ +createStream ServerContext{..} API.Stream{ streamBacklogDuration = backlogSec, streamShardCount = shardCount, ..} = do when (streamReplicationFactor == 0) $ throwIO (InvalidArgument "Stream replicationFactor cannot be zero") diff --git a/hstream/src/HStream/Server/Exception.hs b/hstream/src/HStream/Server/Exception.hs index 1f6af143f..14d91b87a 100644 --- a/hstream/src/HStream/Server/Exception.hs +++ b/hstream/src/HStream/Server/Exception.hs @@ -21,6 +21,8 @@ import ZooKeeper.Exception import qualified HStream.Logger as Log import HStream.Server.Persistence.Exception (PersistenceException) +import HStream.Server.Shard (CanNotMerge, CanNotSplit, + ShardNotExist) import qualified HStream.Store as Store import HStream.Utils (mkServerErrResp) @@ -106,7 +108,7 @@ finalExceptionHandlers = [ Log.fatal $ Log.buildString' err return (StatusInternal, mkStatusDetails err) , - Handler $ \(err :: AsyncCancelled) -> do + Handler $ \(_ :: AsyncCancelled) -> do return (StatusOk, "") , Handler $ \(err :: SomeException) -> do @@ -144,6 +146,19 @@ zooKeeperExceptionHandler = [ Handler $ \(e :: ZooException ) -> handleZKException e StatusInternal ] +shardExceptionHandler :: Handlers (StatusCode, StatusDetails) +shardExceptionHandler = [ + Handler $ \(e :: CanNotSplit) -> do + Log.fatal $ Log.buildString' e + return (StatusFailedPrecondition, mkStatusDetails e), + Handler $ \(e :: CanNotMerge) -> do + Log.fatal $ Log.buildString' e + return (StatusFailedPrecondition, mkStatusDetails e), + Handler $ \(e :: ShardNotExist) -> do + Log.fatal $ Log.buildString' e + return (StatusNotFound, mkStatusDetails e) + ] + defaultHandlers :: Handlers (StatusCode, StatusDetails) defaultHandlers = serverExceptionHandlers ++ storeExceptionHandlers diff --git a/hstream/src/HStream/Server/Handler.hs b/hstream/src/HStream/Server/Handler.hs index d4f495759..1fb341138 100644 --- a/hstream/src/HStream/Server/Handler.hs +++ b/hstream/src/HStream/Server/Handler.hs @@ -55,6 +55,7 @@ handlers serverContext@ServerContext{..} = hstreamApiReadShard = readShardHandler serverContext, hstreamApiListShards = listShardsHandler serverContext, hstreamApiSplitShards = splitShardsHandler serverContext, + hstreamApiMergeShards = mergeShardsHandler serverContext, -- Stats hstreamApiPerStreamTimeSeriesStats = H.perStreamTimeSeriesStats scStatsHolder, diff --git a/hstream/src/HStream/Server/Handler/Shard.hs b/hstream/src/HStream/Server/Handler/Shard.hs index dac12e26a..02c9b01d3 100644 --- a/hstream/src/HStream/Server/Handler/Shard.hs +++ b/hstream/src/HStream/Server/Handler/Shard.hs @@ -9,12 +9,15 @@ module HStream.Server.Handler.Shard ( listShardsHandler , readShardHandler , splitShardsHandler + , mergeShardsHandler ) where import Control.Exception import Network.GRPC.HighLevel.Generated +import Control.Monad (when) +import qualified Data.Vector as V import qualified HStream.Logger as Log import qualified HStream.Server.Core.Shard as C import HStream.Server.Exception @@ -29,7 +32,7 @@ listShardsHandler :: ServerContext -> ServerRequest 'Normal ListShardsRequest ListShardsResponse -> IO (ServerResponse 'Normal ListShardsResponse) -listShardsHandler sc (ServerNormalRequest _metadata request) = do +listShardsHandler sc (ServerNormalRequest _metadata request) = defaultExceptionHandle $ do Log.debug "Receive List Shards Request" C.listShards sc request >>= returnResp . ListShardsResponse @@ -45,10 +48,19 @@ splitShardsHandler :: ServerContext -> ServerRequest 'Normal SplitShardsRequest SplitShardsResponse -> IO (ServerResponse 'Normal SplitShardsResponse) -splitShardsHandler sc (ServerNormalRequest _metadata request) = do +splitShardsHandler sc (ServerNormalRequest _metadata request) = shardExceptionHandle $ do Log.debug $ "Receive Split Shards Request: " <> Log.buildString' (show request) C.splitShards sc request >>= returnResp . SplitShardsResponse +mergeShardsHandler + :: ServerContext + -> ServerRequest 'Normal MergeShardsRequest MergeShardsResponse + -> IO (ServerResponse 'Normal MergeShardsResponse) +mergeShardsHandler sc (ServerNormalRequest _metadata request@MergeShardsRequest{..}) = shardExceptionHandle $ do + Log.debug $ "Receive Merge Shards Request: " <> Log.buildString' (show request) + when (V.length mergeShardsRequestShardKeys /= 2) $ throwIO WrongShardCnt + C.mergeShards sc request >>= returnResp . MergeShardsResponse . Just + ----------------------------------------------------------------------------------- readShardExceptionHandle :: ExceptionHandle (ServerResponse 'Normal a) @@ -56,3 +68,15 @@ readShardExceptionHandle = mkExceptionHandle . setRespType mkServerErrResp $ [ Handler (\(err :: Store.NOTFOUND) -> return (StatusUnavailable, mkStatusDetails err)) ] ++ defaultHandlers + +shardExceptionHandle :: ExceptionHandle (ServerResponse 'Normal a) +shardExceptionHandle = mkExceptionHandle . setRespType mkServerErrResp $ + [ Handler (\(err :: WrongShardCnt) -> + return (StatusInvalidArgument, mkStatusDetails err)) + ] ++ shardExceptionHandler ++ defaultHandlers + +data WrongShardCnt = WrongShardCnt +instance Show WrongShardCnt where + show _ = "Only two shards can be merged at a time" + +instance Exception WrongShardCnt where diff --git a/hstream/src/HStream/Server/Shard.hs b/hstream/src/HStream/Server/Shard.hs index d0ee1eca9..d4d98d934 100644 --- a/hstream/src/HStream/Server/Shard.hs +++ b/hstream/src/HStream/Server/Shard.hs @@ -39,6 +39,8 @@ module HStream.Server.Shard( hashShardKey, keyToCBytes, cBytesToKey, + shardKeyToText, + textToShardKey, shardStartKey, shardEndKey, shardEpoch @@ -79,6 +81,12 @@ hashShardKey key = let w8KeyList = BA.unpack (CH.hash . encodeUtf8 $ key :: CH.Digest CH.MD5) in ShardKey $ foldl' (\acc c -> (.|.) (acc `shiftL` 8) (fromIntegral c)) (0 :: Integer) w8KeyList +shardKeyToText :: ShardKey -> T.Text +shardKeyToText (ShardKey key) = T.pack (show key) + +textToShardKey :: T.Text -> ShardKey +textToShardKey = ShardKey . read . T.unpack + keyToCBytes :: ShardKey -> CB.CBytes keyToCBytes (ShardKey key) = CB.pack . show $ key @@ -234,14 +242,14 @@ getShardByKey mp key = do type SplitStrategies = ShardMap -> ShardKey -> Either ShardException (Shard, Shard) -- | Split Shard with specific ShardKey -splitByKey :: S.LDClient -> SharedShardMap -> ShardKey -> IO () +splitByKey :: S.LDClient -> SharedShardMap -> ShardKey -> IO (Shard, Shard) splitByKey = splitShardInternal getSplitedShard -- | Split Shard by half -splitHalf :: S.LDClient -> SharedShardMap -> ShardKey -> IO () +splitHalf :: S.LDClient -> SharedShardMap -> ShardKey -> IO (Shard, Shard) splitHalf = splitShardInternal getHalfSplitedShard -splitShardInternal :: SplitStrategies -> S.LDClient -> SharedShardMap -> ShardKey -> IO () +splitShardInternal :: SplitStrategies -> S.LDClient -> SharedShardMap -> ShardKey -> IO (Shard, Shard) splitShardInternal stratege client sharedMp key = do let hash1 = getShardMapIdx key bracket @@ -278,9 +286,10 @@ splitShardInternal stratege client sharedMp key = do atomically $ do putShardMap sharedMp hash1' newMp1 putShardMap sharedMp hash2' newMp2 + return (s1', s2') ) -mergeTwoShard :: S.LDClient -> SharedShardMap -> ShardKey -> ShardKey -> IO () +mergeTwoShard :: S.LDClient -> SharedShardMap -> ShardKey -> ShardKey -> IO (Shard, ShardKey) mergeTwoShard client mp key1 key2 = do let hash1 = getShardMapIdx key1 let hash2 = getShardMapIdx key2 @@ -303,6 +312,7 @@ mergeTwoShard client mp key1 key2 = do atomically $ do putShardMap mp (getShardMapIdx removedKey) removedShardMp putShardMap mp (getShardMapIdx startKey) updateShardMp + return (newShard, removedKey) ) where getShards hash1 hash2