Mocking Up Aaron Turon's Reagents ================================= A woefully incomplete reimplementation of Aaron Turon's Reagents (http://dl.acm.org/citation.cfm?id=2254084) Mostly a test of "do I understand what the paper is doing" rather than anything meant to be serious. This page's raw RST source is a literate Haskell file. .. highlight:: haskell .. TODO --------------------------------------------------------------- {{{ There are a number of disclaimers I feel necessary to state up front: * This misses the "dissolve" from 3.5 * It uses a singularly... poor coding for references * It lacks postCommit handling * It lacks any sane form of backoff * Endpoints are untested * Choice is untested * Swap is unimplemented * It lacks the higher-level combinators and any instances you'd wish for * It has not been benchmarked with Criterion * None of the real data structures in the paper have been implemented * It is using an initial tagged representation of Reagents * There is essentially no documentation It would be interesting to contrast this with an acquire-as-you-go locking scheme ala Coyotos, complete with inter-thread "get out of my way". It already has distinct "read" and "commit" phases; the change would make that "read and acquire" and "commit". On the other hand, "testRacers 10 10000 addOneRea" works .. -------------------------------------------------------------------- }}} .. Prelude ------------------------------------------------------------ {{{ We start off with the usual pile of LANGUAGE pragmas and module imports:: > {-# LANGUAGE ExistentialQuantification #-} > {-# LANGUAGE GADTs #-} > {-# LANGUAGE Rank2Types #-} > {-# LANGUAGE ScopedTypeVariables #-} > > module Reagents where > > import Control.Concurrent > import Control.Monad > import Data.Function > import Data.IORef > import qualified Data.Map as M > import Data.Maybe > import Debug.Trace > import qualified Data.Set as S > import System.Mem.StableName > -- import System.Mem.Weak > import System.IO.Unsafe > import Unsafe.Coerce .. -------------------------------------------------------------------- }}} .. References --------------------------------------------------------- {{{ We need some way to make a reference object:: > newtype Ref a = Ref { unRef :: Eq a => IORef (Maybe a) } > > newRef :: a -> IO (Ref a) > newRef a = do > ior <- newIORef (Just a) > return $ Ref ior > > casRef (Ref ior) ex nv = atomicModifyIORef ior cas > where cas iorv | iorv == (Just ex) = (Just nv,True) > cas iorv | otherwise = (iorv,False) And moreover need to maintain logs of reference objects, existentially hiding their data type:: > data ExRef = forall a . ExRef (Ref a) > data RefLogEnt = forall a . (Eq a) => RLE > { rle_ref :: Ref a > , rle_val :: !a > } > > -- Man I do not feel good about these. > -- ("Your code is bad and you should feel bad") > mkersn :: ExRef -> StableName (Ref b) > mkersn r = case r of > (ExRef r') -> r' `seq` unsafeCoerce$unsafePerformIO$makeStableName r' > instance Eq ExRef where > (==) = (==) `on` mkersn > instance Ord ExRef where > compare = compare `on` (hashStableName . mkersn) .. -------------------------------------------------------------------- }}} .. Offers ------------------------------------------------------------- {{{ An Offer:: > data OfferD a = Pending > | Rescinded > | Final a > -- Additionally, make sure that Pending objects are not discarded > -- when seen (e.g. use p@Pending, not just Pending, in match), > -- as we might want to use Weak magic to hook Pending objects > > instance Eq (OfferD a) where > Pending == Pending = True > Rescinded == Rescinded = True > (Final _) == (Final _) = error "Equality of Final Offer" > _ == _ = False > > newtype Offer a = Offer (Ref (OfferD a)) > > newOffer :: IO (Offer a) > newOffer = do > ior <- newIORef (Just Pending) > return $ Offer $ Ref ior > > tryRescindOffer :: Offer a -> IO (Maybe a) > tryRescindOffer (Offer (Ref ior)) = atomicModifyIORef ior cas > where > cas r@(Just (Final a)) = (r, Just a) > cas x = (x, Nothing) .. -------------------------------------------------------------------- }}} .. Endpoints ---------------------------------------------------------- {{{ Endpoints:: > data Failure = Retry | Block > newtype Result a = Result { unResult :: Either Failure a } > > data Message a b = forall c . Message > { m_payload :: a > , m_sendrx :: Reaction > , m_sendk :: Reagent b c > , m_offer :: Offer c > } > > data Endpoint a b = Endpoint > { e_chan :: IORef [Message a b] > , e_dual :: Endpoint b a > } > > mkChan :: IO (Endpoint a b, Endpoint b a) > mkChan = do > ior1 <- newIORef [] > ior2 <- newIORef [] > let r@(e1, e2) = (Endpoint ior1 e2, Endpoint ior2 e1) > return r .. -------------------------------------------------------------------- }}} .. Reactions ---------------------------------------------------------- {{{ Reactions:: > data Reaction = Reaction > { rx_redoLog :: M.Map Int RefLogEnt -- desired values of refs > , rx_undoLog :: M.Map Int RefLogEnt -- expected values of refs > } > > rx_empty :: Reaction > rx_empty = Reaction M.empty M.empty > > rx_withcas :: (Eq a) => Reaction -> Ref a -> a -> a -> Reaction > rx_withcas (Reaction re un) ra ex ne = > let ix = hashStableName $ mkersn $ ExRef ra in > Reaction (M.insert ix (RLE ra ne) re) > (M.insert ix (RLE ra ex) un) > > rx_commit :: Reaction -> IO Bool > rx_commit (Reaction re un) = do > -- traceShow ("RXC") $ return () > ok <- acquireAll [] (map snd $ M.toAscList un) > if ok > then setAll (map snd $ M.toList re) >> return True > else return False > where > acquireAll _ [] = return True > acquireAll us (p:ps) = do > case p of RLE ref val -> do > -- traceShow ("AcqAll", hashStableName $ mkersn (ExRef ref)) $ return () > ok <- atomicModifyIORef (unRef ref) > (\a -> if a == Just val > then (Nothing, True) > else (a, False)) > if ok > then acquireAll (p:us) ps > else setAll us >> return False > > setAll :: [RefLogEnt] -> IO () > setAll = mapM_ (\rle -> case rle of RLE ref val -> writeIORef (unRef ref) (Just val)) > > rx_hascas :: Reaction -> Bool > rx_hascas (Reaction r _) = M.null r > > offerFulfill :: Reaction -> Offer a -> a -> Reaction > offerFulfill r (Offer o) a = rx_withcas r o Pending (Final a) .. -------------------------------------------------------------------- }}} .. Reagents ----------------------------------------------------------- {{{ Ah, finally we can start getting close to the thing we're after. A Reagent is just a thing in one of a few states:: > data Reagent a b where > -- The paper gets this definition wrong, I think; they pass > -- () to the tryReact sub-call at the end of the CAS case > -- of figure 6, but claim that the continuation is > -- "Reagent a r". > Cas :: (Eq a) => Ref a -> a -> a -> Reagent () r -> Reagent () r > Choice :: Reagent a b -> Reagent a b -> Reagent a b > Commit :: Reagent a a > Computed :: (a -> Maybe (Reagent () b)) -> Reagent a b > -- Not given in figure 6, but is hinted at in figure 3 > Read :: (Eq a) => Ref a -> Reagent a r -> Reagent () r > Swap :: Endpoint a b -> Reagent b r -> Reagent a r And we need some tests on those states:: > re_hascas :: Reagent a b -> Bool > re_hascas (Cas _ _ _ _) = True > re_hascas (Choice r1 r2) = re_hascas r1 || re_hascas r2 > re_hascas Commit = False > re_hascas (Computed _) = True -- incorrect but safe > re_hascas (Read _ r) = re_hascas r > re_hascas (Swap _ r) = re_hascas r > > re_maysync :: Reagent a b -> Bool > re_maysync (Cas _ _ _ _) = False > re_maysync (Choice r1 r2) = re_maysync r1 || re_maysync r2 > re_maysync Commit = False > re_maysync (Computed _) = True -- incorrect but safe > re_maysync (Read _ r) = re_maysync r > re_maysync (Swap _ _) = True And now, the meat of paper (figure 6):: > tryReact :: Reagent a b -> a -> Reaction -> (Maybe (Offer b)) -> IO (Result b) > > tryReact Commit a rx Nothing = do > rxok <- rx_commit rx > return . Result $ if rxok then Right a else Left Retry > tryReact Commit a rx (Just off) = do > rescinded <- tryRescindOffer off > case rescinded of > Just a' -> return $ Result $ Right a' > Nothing -> tryReact Commit a rx Nothing > > tryReact (Cas ref ov nv k) () rx off = do > -- traceShow ("TR Cas") $ return () > if (not (rx_hascas rx)) && (not (re_hascas k)) > then do > casok <- casRef ref ov nv > if casok > then tryReact k () rx off > else return $ Result $ Left Retry > else tryReact k () (rx_withcas rx ref ov nv) off > > tryReact (Choice r1 r2) a rx off = do > tr1 <- tryReact r1 a rx off > case unResult tr1 of > Left Retry -> do > tr2 <- tryReact r2 a rx off > case unResult tr2 of > Left _ -> return $ Result $ Left $ Retry > a'@(Right _) -> return $ Result a' > Left Block -> tryReact r2 a rx off > a'@(Right _) -> return $ Result a' > > tryReact (Computed c) a rx off = do > case c a of > Nothing -> return $ Result $ Left Block > Just r -> tryReact r () rx off > > tryReact (Read ref k) () rx off = do > val <- readIORef $ unRef ref > case val of > Nothing -> return $ Result $ Left Retry > Just v' -> tryReact k v' rx off > > -- tryReact (Swap e k) a rx Nothing = do > -- XXX The paper has a typo here and starts tryFrom with > -- a failureMode of Retry; tryFrom never transitions to > -- Block. The prose of the paper says that tryFrom Blocks > -- only if all observed offers Block, so I believe this > -- to be the correct fix. > > -- tryReact (Swap e k) a rx (Just off) = do > -- atomicModifyIORef (e_chan e) (\ms -> ((Message a rx k off):ms, ms)) > -- ... > > > -- "tryFrom" but with arguments flipped > tryReactFrom :: a -> Failure -> [Message a b] -> IO (Result b) > > tryReactFrom a fm [] = return $ Result $ Left fm > tryReactFrom a fm ((Message b rx' k' off'):ms) = do > undefined -- forkIO $ tryReact k' rx' .. > {- .. > .. > upd :: Ref a -> ((a,b) -> (a,c)) -> Reagent b c .. > upd = undefined .. > -} .. -------------------------------------------------------------------- }}} .. Bang --------------------------------------------------------------- {{{ Here's the core of the whole thing:: > bang :: Reagent a b -> a -> IO b > bang r a = withoutOffer > where > withoutOffer = do > trace (show ("WithoutOffer")) $ return () > res <- tryReact r a rx_empty Nothing > case unResult res of > Left Block -> withOffer > Left Retry -> do > backoffOnce > if re_maysync r then withOffer else withoutOffer > Right ans -> return ans > > withOffer = do > traceShow ("WithOffer") $ return () > off <- newOffer > res <- tryReact r a rx_empty (Just off) > case unResult res of > Left Block -> backoffOnce {- XXX -} >> retry off > Left Retry -> backoffOnce >> retry off > Right b -> return b > > retry off = do > res <- tryRescindOffer off > case res of > Nothing -> withOffer > Just b -> traceShow ("Offer met") $ return b .. -------------------------------------------------------------------- }}} .. Backoff ------------------------------------------------------------ {{{ Well, this is one way to do it:: > backoffOnce :: IO () > backoffOnce = threadDelay 10 .. -------------------------------------------------------------------- }}} .. Tests -------------------------------------------------------------- {{{ Wouldn't be complete without test cases:: > oneCas ref = Cas ref 0 1 Commit > twoCas ref = Cas ref 1 2 (Cas ref 0 1 Commit) > > testIntZero rea = do > ref <- newRef (0 :: Int) > bang (rea ref) () > res <- readIORef $ unRef $ ref > return $ fromJust res > > addOneRea ref = Read ref (Computed (\a -> Just $ Cas ref a (a+1) Commit)) > > testRacers :: Int -> Int -> (Ref Int -> Reagent () ()) -> IO Int > testRacers n m rea = do > ref <- newRef (0 :: Int) > > mvs <- replicateM n newEmptyMVar > mapM_ (\mv -> forkIO $ racer ref >> putMVar mv ()) mvs > > mapM_ takeMVar mvs > > res <- readIORef $ unRef $ ref > return $ fromJust res > > where > racer ref = mapM_ (const $ bang (rea ref) ()) [1..m] .. -------------------------------------------------------------------- }}} .. vim: set foldmethod=marker:ts=4