Message Broadcasting in Gaff/Shibboleth

Hello all! Today I’m writing another technical post about how message broadcasting is done in Gaff/Shibboleth. The high level idea isn’t very complicated:

  • Listeners are registered as either functions, member functions, or functors.
  • When a listener registers themselves, they are returned a “receipt”. When this receipt is released, it will automatically unregister the listener.
    • While this adds some extra memory usage from having to store all these receipts, the end user no longer needs to remember to unregister manually!
  • ListenerLists are stored in a hash map using the message type’s hash-code as the key. (more on this later)
  • When someone “broadcasts” a message, it adds the message into a queue.
  • Later, the engine will call update(), which has single and multi-threaded versions.
    • Single- threaded version just iterates over each message and then calls the registered functions for each listener.
    • Multi-threaded version takes in a thread pool and will generate a task for each message in the queue. When the thread pool runs the task, the task will iterate over each listener and call the registered function.

This is the general gist of how the message broadcaster works. Let’s get into the nitty gritty, starting from the top of the list.

Function Binding

So, MessageBroadcaster‘s function binding works very similarly to the way function binding works in the Function class in Gaff.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Base function class for function binders to inherit from
class IFunction
{
public:
    virtual ~IFunction(void) {}

    virtual void call(void* message) = 0;
};

// Binder for member functions
template <class Message, class Object>
class MemberFunction : public IFunction
{
public:
    typedef void (Object::*FunctionType)(const Message&);

    MemberFunction(Object* object, FunctionType function);
    void call(void* message);

private:
    FunctionType _function;
    Object* _object;
};

// Binder for normal functions
template <class Message>
class Function : public IFunction
{
public:
    typedef void (*FunctionType)(const Message&);

    Function(FunctionType function);
    void call(void* message);

private:
    FunctionType _function;
};

// Binder for functors
template <class Message, class FunctorT>
class Functor : public IFunction
{
public:
    Functor(const FunctorT& functor);
    void call(void* message);

private:
    FunctorT _functor;
};

As you can see, we have a base IFunction interface that takes a void pointer. For every listener that gets registered, we make one of these three classes (allocated on the heap) and add them to the list of people listening for that message. The implementation is then responsible for casting the void pointer to the message type before passing it along to the listener. I won’t bother posting the implementation, because the code is exactly what I just described and is trivial. Not to mention it would take up a ton of space.

Message Receipts

When a listener registers for a message, it is given a receipt. This receipt is a reference counted data structure that will unregister the listener when the reference count becomes zero.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Remover : public IRefCounted
{
public:
    Remover(MessageBroadcaster<Allocator>* broadcaster,
            const AHashString<Allocator>* message_type,
            const IFunction* listener);
    ~Remover(void);

void addRef(void) const;
void release(void) const;

unsigned int getRefCount(void) const;

private:
    MessageBroadcaster<Allocator>* _broadcaster;
    const AHashString<Allocator>* _message_type;
    const IFunction* _listener;

    mutable volatile unsigned int _ref_count;
};

typedef RefPtr<IRefCounted> MessageReceipt;

Again, this interface is pretty simple and the implementation is trivial. You’ll notice that _ref_count is marked as volatile. This is to support multi-threading and the use of Atomic operations. The MessageReceipt is just a typedef for a RefPtr. The register function returns a RefPtr to the Remover class shown above.

Message Structure

As mentioned above, I am using a HashMap of listener lists. The key for my HashMap is a HashString. My implementation of MessageBroadcaster makes the assumption that the message class you are listening for has a static HashString called g_Hash. This is used as the key into the HashMap of listener lists.

Broadcasting A Message

As mentioned earlier, messages aren’t broadcasted immediately. They are first added to a queue and then sent when the engine is ready to dispatch them. The single-threaded dispatch is trivial. Loop over all the messages and look up the listeners and send the message to each listener. The multi-threaded version is basically this same idea, but in parallel. This is achieved using a thread pool and the BroadcastTask class.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class BroadcastTask : public ITask<Allocator>
{
public:
    BroadcastTask(MessageBroadcaster<Allocator>& broadcaster, const AHashString<Allocator>& msg_type, void* msg);

    void doTask(void);

private:
    Pair<ReadWriteSpinLock, ListenerList>& _listeners;
    MessageBroadcaster<Allocator>& _broadcaster;
    void* _msg;
};

Again, this class is simple. doTask() does exactly what I described in the single-threaded version, but with some threading precautions. Unlike the single-threaded version, we have no idea when these tasks are going to be processed. It could be this frame, or next frame, or a minute from now. If you have things that are dependent on certain tasks, you can wait for the tasks to be completed. However, if your messages are extremely time critical, you probably shouldn’t be using messages.

So, that’s pretty much how message broadcasting is implemented in Gaff/Shibboleth. If you want to see the full source code, you can always click one of the many links in this post to the Bitbucket repositories. I’m always looking for feedback on all of my projects on Bitbucket and would love to hear your thoughts/questions/concerns!

Until next time!

More Reading