You need one CPGThread object per process. This object is set up to be a Singleton, so whenever you call its constructor you will always get the same object back. You also need one CPG object (or subclass thereof) for each CPG you want to join.
A standard pattern would work a bit like this:
from corosync.CPG import CPG
from corosync.CPGThread import CPGThread
class MyCPG(CPG):
def config_change(self, members, left, joined):
# Act on configuration changes here. This will be called
# in the CPG Thread
pass
def message_received(self, address, message):
# Act on received messages here. This will be called
# in the CPG Thread
pass
if __name__ == "__main__":
cpg = MyCPG("CPGNAME")
try:
CPGThread().add(cpg)
CPGThread().start()
cpg.send_message("Some message or other")
except:
CPGThread().stop()
When using the CPGThread() class in your own programs, you almost certainly do not want to subclass it,
but will want to make use of its Singleton nature.
CPG objects may be given to the CPGThread().add() method before or after calling CPGThread().start()
to start the thread. Adding one afterwards may lead to a very small delay (at most 1s) in its initial operation,
as the thread will be sleeping for at most that long.
The CPGThread() is a daemon thread, so it will exit unceremoniously when all other non-daemon threads have exited.
You do not need to explicitly stop it. However, if you choose to CPGThread.wait() you might want to use
CPGThread().stop() from a callback to make your wait complete.
Adds the identified CPG object to the monitoring thread. In order to remove it, you should call deactivate()
on the CPG object itself.
The CPGThread module also exports the Singleton class which is used as the metaclass for the CPGThread()
class. While the CPG() class itself is not a Singleton, it is likely that users will want to make at least
some of their subclasses of it Singletons, which they can do by
from corosync.CPGThread import Singleton
from corosync.CPG import CPG
class MySingletonCPG(CPG, metaclass=Singleton):
pass
Constructor. Creates a new CPG object managing membership of the named group. There is a maximum length for a group name set by Corosync which is 128 octets.
This is called when the configuration of the CPG changes. Its arguments are lists of tuples, with each tuple
representing an address, in the form (node, pid, reason).
nodeis the Corosync Node IDpidis the Process ID of the process on that nodereasonis the reason for the memership change. It is only useful in theleftandjoinedlists, and is one ofCPG.REASON_JOIN,CPG.REASON_LEAVE,CPG.REASON_NODEUPorCPG.REASON_NODEDOWN
This method is called from the CPGThread() thread.
This is called when a message is received. The address is a tuple as show above, where the reason is not
useful. The message is a bytes-like object.
This method is called from the CPGThread() thread.
This is a utility method which is not strictly related to Closed Process Groups. It asks corosync if it is quorate, and returns a Boolean
This is a utility method which is not strictly related to Closed Process Groups. It fetches a corosync configuration item e.g.
print(f"Node 0 ID = {cpg.cmap_get('nodelist.node.0.nodeid')}")
print(f"Node 0 Address = {cpg.cmap_get('nodelist.node.0.ring0_addr')}")
Returns a tuple identifying the address of this process in the CPG. This is only valid when the CPG object is actually part of a CPG, but it is safe to call in one of the callback functions to determine whether a message is your own.
Causes the CPG to become active. This is actually called for you by the CPGThread shortly after you call CPGThread.add() but is documented here to remind users that it
does not become active immediately. The CPGThread() will call it, and the actual activation is not complete until the first call to config_change() which includes your own
address in the joined list. You will see this message, but all that needs doing has been handled internally so you do not need to add special processing for it yourself.
Deactivates the CPG. This is also not instant. It begins the process of leaving the group, which will be completed when the config_change() is received which includes your own
address in the left list. You will see this message but all that needs doing has been handled internally so you do not need to add special processing for it yourself.
This will also cause the CPG to be removed from the CPGThread(), though you can re-join by feeding it to CPGThread().add() again.
Sends a message to the group. Strictly speaking, there is another argument, guarantee which can be supplied indicating the message delivery guarantee, but only two types of guarantee are
implemented in corosync at present and they are equivalent, so there is no reason to provide anything different.
When called right after calling CPGThread().add(), this method may block for a short time. The message cannot be sent until the process of joining the group is complete, and that
is an asynchronous process. This call will wait for it to complete if needed. When this happens, there will be a call to the config_change() callback in the CPGThread() handling thread
before this method returns.