Haskell - actor-based volatility

I am working on a haskell network application and I am using the actor template to control a multi-threaded process. One thing I came across is storing, for example, a set of client sockets / descriptors. Which, of course, should be available for all threads and may change when the client enters / exits.

Since I came from the imperative world, I thought of some kind of mechanism lock, but when I noticed how ugly it was, I thought of โ€œpureโ€ mutability, well, actually it's kind of clean:

import Control.Concurrent import Control.Monad import Network import System.IO import Data.List import Data.Maybe import System.Environment import Control.Exception newStorage :: (Eq a, Show a) => IO (Chan (String, Maybe (Chan [a]), Maybe a)) newStorage = do q <- newChan forkIO $ storage [] q return q newHandleStorage :: IO (Chan (String, Maybe (Chan [Handle]), Maybe Handle)) newHandleStorage = newStorage storage :: (Eq a, Show a) => [a] -> Chan (String, Maybe (Chan [a]), Maybe a) -> IO () storage sq = do let loop = (`storage` q) (req, reply, d) <- readChan q print ("processing " ++ show(d)) case req of "add" -> loop ((fromJust d) : s) "remove" -> loop (delete (fromJust d) s) "get" -> do writeChan (fromJust reply) s loop s store sd = writeChan s ("add", Nothing, Just d) unstore sd = writeChan s ("remove", Nothing, Just d) request s = do chan <- newChan writeChan s ("get", Just chan, Nothing) readChan chan 

The fact is that the thread (actor) manages the list of elements and changes the list in accordance with incoming requests. Since the stream is really cheap, I thought it could be a really good functional alternative.

Of course, this is just a prototype (quick dirty proof of concept). So my question is:

  • Is this a โ€œgoodโ€ way of managing common mutable variables (in the actors world)?
  • Is there already a library for this template? (I already searched, but didn't find anything)

Regards, Chris

+6
source share
2 answers

Here is a quick and dirty example using stm and pipes-network . This will create a simple server that allows clients to connect and increase or decrease the counter. It will display a very simple status bar showing the current counts of all connected clients and remove the client counters from the panel when they are disconnected.

First, I will start from the server, and I generously commented on the code to explain how it works:

 import Control.Concurrent.STM (STM, atomically) import Control.Concurrent.STM.TVar import qualified Data.HashMap.Strict as H import Data.Foldable (forM_) import Control.Concurrent (forkIO, threadDelay) import Control.Monad (unless) import Control.Monad.Trans.State.Strict import qualified Data.ByteString.Char8 as B import Control.Proxy import Control.Proxy.TCP import System.IO main = do hSetBuffering stdout NoBuffering {- These are the internal data structures. They should be an implementation detail and you should never expose these references to the "business logic" part of the application. -} -- I use nRef to keep track of creating fresh Ints (which identify users) nRef <- newTVarIO 0 :: IO (TVar Int) {- hMap associates every user (ie Int) with a counter Notice how I've "striped" the hash map by storing STM references to the values instead of storing the values directly. This means that I only actually write the hashmap when adding or removing users, which reduces contention for the hash map. Since each user gets their own unique STM reference for their counter, modifying counters does not cause contention with other counters or contention with the hash map. -} hMap <- newTVarIO H.empty :: IO (TVar (H.HashMap Int (TVar Int))) {- The following code makes heavy use of Haskell pure closures. Each 'let' binding closes over its current environment, which is safe since Haskell is pure. -} let {- 'getCounters' is the only server-facing command in our STM API. The only permitted operation is retrieving the current set of user counters. 'getCounters' closes over the 'hMap' reference currently in scope so that the server never needs to be aware about our internal implementation. -} getCounters :: STM [Int] getCounters = do refs <- fmap H.elems (readTVar hMap) mapM readTVar refs {- 'init' is the only client-facing command in our STM API. It initializes the client entry in the hash map and returns two commands: the first command is what the client calls to 'increment' their counter and the second command is what the client calls to log off and delete 'delete' command. Notice that those two returned commands each close over the client's unique STM reference so the client never needs to be aware of how exactly 'init' is implemented under the hood. -} init :: STM (STM (), STM ()) init = do n <- readTVar nRef writeTVar nRef $! n + 1 ref <- newTVar 0 modifyTVar' hMap (H.insert n ref) let incrementRef :: STM () incrementRef = do mRef <- fmap (H.lookup n) (readTVar hMap) forM_ mRef $ \ref -> modifyTVar' ref (+ 1) deleteRef :: STM () deleteRef = modifyTVar' hMap (H.delete n) return (incrementRef, deleteRef) {- Now for the actual program logic. Everything past this point only uses the approved STM API (ie 'getCounters' and 'init'). If I wanted I could factor the above approved STM API into a separate module to enforce the encapsulation boundary, but I am lazy. -} {- Fork a thread which polls the current state of the counters and displays it to the console. There is a way to implement this without polling but this gets the job done for now. Most of what it is doing is just some simple tricks to reuse the same console line instead of outputting a stream of lines. Otherwise it would be just: forkIO $ forever $ do ns <- atomically getCounters print ns -} forkIO $ (`evalStateT` 0) $ forever $ do del <- get lift $ do putStr (replicate del '\b') putStr (replicate del ' ' ) putStr (replicate del '\b') ns <- lift $ atomically getCounters let str = show ns lift $ putStr str put $! length str lift $ threadDelay 10000 {- Fork a thread for each incoming connection, which listens to the client's commands and translates them into 'STM' actions -} serve HostAny "8080" $ \(socket, _) -> do (increment, delete) <- atomically init {- Right now, just do the dumb thing and convert all keypresses into increment commands, with the exception of the 'q' key, which will quit -} let handler :: (Proxy p) => () -> Consumer p Char IO () handler () = runIdentityP loop where loop = do c <- request () unless (c == 'q') $ do lift $ atomically increment loop {- This uses my 'pipes' library. It basically is a high-level way to say: * Read binary packets from the socket no bigger than 4096 bytes * Get the first character from each packet and discard the rest * Handle the character using the above 'handler' function -} runProxy $ socketReadS 4096 socket >-> mapD B.head >-> handler {- The above pipeline finishes either when the socket closes or 'handler' stops looping because it received a 'q'. Either case means that the client is done so we log them out using 'delete'. -} atomically delete 

The next client is a client that simply opens connections and forwards all keystrokes as separate packets:

 import Control.Monad import Control.Proxy import Control.Proxy.Safe import Control.Proxy.TCP.Safe import Data.ByteString.Char8 (pack) import System.IO main = do hSetBuffering stdin NoBuffering hSetEcho stdin False {- Again, this uses my 'pipes' library. It basically says: * Read characters from the console using 'commands' * Pack them into a binary format * send them to a server running at 127.0.0.1:8080 This finishes looping when the user types a 'q' or the connection is closed for whatever reason. -} runSafeIO $ runProxy $ runEitherK $ try . commands >-> mapD (\c -> pack [c]) >-> connectWriteD Nothing "127.0.0.1" "8080" commands :: (Proxy p) => () -> Producer p Char IO () commands () = runIdentityP loop where loop = do c <- lift getChar respond c unless (c == 'q') loop 

It's quite simple: commands generates a Char s stream, which is then converted to a ByteString and then sent as packets to the server.

If you start the server and several clients and you have each type in several keys, a list will appear on the screen of your server showing how many keys are dialed to each client:

 [1,6,4] 

... and if some of the clients disconnect, they will be removed from the list:

 [1,4] 

Note that the pipes component of these examples will be greatly simplified in the upcoming version of pipes-4.0.0 , but the existing pipes ecosystem is still doing the job as-is.

+6
source

First, I definitely recommend using your own data type to represent commands. When using (String, Maybe (Chan [a]), Maybe a) an improper client can crash your actor simply by sending an unknown command or sending ("add", Nothing, Nothing) , etc. I would suggest something like

 data Command a = Add a | Remove a | Get (Chan [a]) 

Then you can match pattern matching in storage commands in economy mode.

Actors have their advantages, but I also feel that they have some disadvantages. For example, receiving a response from an actor requires sending him a command, and then waiting for a response. And the client cannot be completely sure that he will receive the answer and that the answer will be of a certain type - you cannot say that I only need answers of this type (and how many of them) for this particular team.

So, as an example, I will give a simple STM solution. It would be better to use a hash table or set (balanced tree), but since Handle does not implement either Ord or Hashable , we cannot use these data structures, so I will continue to use lists.

 module ThreadSet ( TSet, add, remove, get ) where import Control.Monad import Control.Monad.STM import Control.Concurrent.STM.TVar import Data.List (delete) newtype TSet a = TSet (TVar [a]) add :: (Eq a) => a -> TSet a -> STM () add x (TSet v) = readTVar v >>= writeTVar v . (x :) remove :: (Eq a) => a -> TSet a -> STM () remove x (TSet v) = readTVar v >>= writeTVar v . delete x get :: (Eq a) => TSet a -> STM [a] get (TSet v) = readTVar v 

This module implements a set of arbitrary elements based on STM . You can have several such sets and use them together in the same STM transaction, which immediately failed or worked. for instance

 -- | Ensures that there is exactly one element `x` in the set. add1 :: (Eq a) => a -> TSet a -> STM () add1 xv = remove xv >> add xv 

It would be difficult with the actors, you would need to add it as another team for the actor, you cannot compose it from existing actions and still have atomicity.

Update: There is an interesting article explaining why Clojure designers decided not to use actors. For example, using participants, even if you have a lot of readings and only very little is written in a mutable structure, they are all serialized, which can greatly affect performance.

+3
source

Source: https://habr.com/ru/post/949736/


All Articles