Storing arbitrary function calls by threads

I am trying to write a library to reproduce the semantics of Qt streaming: signals can be connected to slots, and all slots are executed in a known stream, so that slots tied to the same stream are thread safe relative to each other.

I have the following API:

data Signal a = Signal Unique a data Slot a = Slot Unique ThreadId (a -> IO ()) mkSignal :: IO (Signal a) mkSlot :: ThreadId -> (Slot a -> a -> IO ()) -> IO (Slot a) connect :: Signal a -> Slot a -> IO () -- callable from any thread emit :: Signal a -> a -> IO () -- runs in Slot thread as a result of `emit` execute :: Slot a -> a -> IO () execute (Slot _ _ f) arg = f arg 

The problem arises from emit to execute . The argument must be saved at runtime in some way, and then the I / O action is executed, but I can't get past the type checking.

That's what I need:

  • Security Type: Signals should not connect to connectors that expect a different type.
  • Independence type: there can be more than one slot for any given type (perhaps this can be weakened with newtype and / or TH).
  • Ease of use: since this is a library, signals and slots should be easily created.

What I tried:

  • Data.Dynamic : all this is really fragile, and I have not found a way to execute the correctly entered I / O action on a Dynamic . There is dynApply , but it is clean.
  • Existential types : I need to execute the function passed to mkSlot , as opposed to an arbitrary function based on the type.
  • Data.HList : I'm not smart enough to figure this out.

What am I missing?

+4
source share
1 answer

First, are you sure that the slots really want to execute in a specific thread? It’s easy to write thread-safe code in Haskell, and threads are very easy in GHC, so you don’t type much by binding all the actions of the event handler to a specific Haskell thread.

In addition, the slot itself does not have to be provided for the mkSlot callback: you can use the recursive do-notation to bind the slot in your callback without adding the problem of binding the node to mkSlot .

In any case, you do not need anything complicated like these solutions. I expect when you talk about existential types, you are thinking of sending something like (a -> IO (), a) via TChan (which you mentioned in the comments) and apply it on the other end, but you want TChan accept values ​​of this type for any a, and not just for one particular a. The key understanding here is that if you have (a -> IO (), a) and you don't know what a is, the only thing you can do is apply the function to the value, giving you IO () , so we can just send them through instead!

Here is an example:

 import Data.Unique import Control.Applicative import Control.Monad import Control.Concurrent import Control.Concurrent.STM newtype SlotGroup = SlotGroup (IO () -> IO ()) data Signal a = Signal Unique (TVar [Slot a]) data Slot a = Slot Unique SlotGroup (a -> IO ()) -- When executed, this produces a function taking an IO action and returning -- an IO action that writes that action to the internal TChan. The advantage -- of this approach is that it impossible for clients of newSlotGroup to -- misuse the internals by reading the TChan or similar, and the interface is -- kept abstract. newSlotGroup :: IO SlotGroup newSlotGroup = do chan <- newTChanIO _ <- forkIO . forever . join . atomically . readTChan $ chan return $ SlotGroup (atomically . writeTChan chan) mkSignal :: IO (Signal a) mkSignal = Signal <$> newUnique <*> newTVarIO [] mkSlot :: SlotGroup -> (a -> IO ()) -> IO (Slot a) mkSlot group f = Slot <$> newUnique <*> pure group <*> pure f connect :: Signal a -> Slot a -> IO () connect (Signal _ v) slot = atomically $ do slots <- readTVar v writeTVar v (slot:slots) emit :: Signal a -> a -> IO () emit (Signal _ v) a = atomically (readTVar v) >>= mapM_ (`execute` a) execute :: Slot a -> a -> IO () execute (Slot _ (SlotGroup send) f) a = send (fa) 

This uses TChan to send actions to the workflow to which each slot is bound.

Please note that I am not very familiar with Qt, so maybe I missed some of the subtleties of the model. You can also disable slots with this:

 disconnect :: Signal a -> Slot a -> IO () disconnect (Signal _ v) (Slot u _ _) = atomically $ do slots <- readTVar v writeTVar v $ filter keep slots where keep (Slot u' _) = u' /= u 

You may need something like Map Unique (Slot a) instead of [Slot a] if this is likely to be a bottleneck.

So, the solution here is to (a) recognize that you have something that is based on a volatile state, and use a mutable variable to structure it; (b) understand that the functions and input-output operations are first-class, like everything else, so you do not need to do anything to create them at runtime :)

By the way, I suggest saving the implementation of the Signal and Slot tags without exporting their constructors from the module that defines them; There are many ways to solve this approach without changing the API.

+3
source

Source: https://habr.com/ru/post/1388698/


All Articles