User guide¶
One image is better than 100 words:
Channels¶
Channels are chains of processing, the nodes, which use or transform message contents. A channel is specialised for a type of input and can be linked to an endpoint for incoming messages.
A channel receives a message, processes it and returns the response.
Channels are the main components of pypeman. When you want to process a message, you first create a channel then add nodes to process the message.
-
class
pypeman.channels.BaseChannel(name=None, parent_channel=None, loop=None, message_store_factory=None) Base channel are generic channels. If you want to create new channel, inherit from the base class and call
self.handle(msg)method with generated message.Parameters: - name – Channel name is mandatory and must be unique through the whole project. Name gives a way to get channel in test mode.
- parent_channel – Used with sub channels. Don’t specify yourself.
- loop – To specify a custom event loop.
- message_store_factory – You can specify a message store (see below) at channel initialisation if you want to save all processed message. Use message_store_factory argument with an instance of wanted message store factory.
-
add(*args) Add specified nodes to channel (Shortcut for append).
Parameters: args – Nodes to add.
-
append(*args) Append specified nodes to channel.
Parameters: args – Nodes to add.
-
case(*conditions, names=None, message_store_factory=None) Case between multiple conditions. For each condition specified, a channel is returned by this method in same order as condition are given. When processing a message, conditions are evaluated successively and the first returning true triggers the corresponding channel processing the message. When channel processing is finished, the next node is called.
Parameters: - conditions – Multiple conditions, one for each returned channel.
Should be a boolean or a function that takes a
msgargument and should return a boolean. - message_store_factory – Allows you to specify a message store factory for all channel of this case.
Returns: one channel by condition parameter.
- conditions – Multiple conditions, one for each returned channel.
Should be a boolean or a function that takes a
-
fork(name=None, message_store_factory=None) Create a new channel that process a copy of the message at this point. Subchannels are executed in parallel of main process.
Returns: The forked channel
-
get_node(name) Return node with name in argument. Mainly used in tests.
Parameters: name – The searched node name. Returns: Instance of Node or None if none found.
-
graph(prefix='', dot=False) Generate a text graph for this channel.
-
graph_dot(end='') Generate a compatible dot graph for this channel.
-
handle(msg) Overload this method only if you know what you are doing but call it from child class to add behaviour.
Parameters: msg – To be processed msg. Returns: Processed message
-
handle_and_wait(msg) Handle a message synchronously. Mainly used for testing purpose.
Parameters: msg – Message to process Returns: Processed message.
-
is_stopped() Returns: True if channel is in stopped or stopping state.
-
process(msg) Overload this method only if you know what you are doing. Called by
subhandlemethod.Parameters: msg – To be processed msg. Returns: Processed message
-
replay(msg_id) This method allows you to replay a message from channel message_store.
Parameters: msg_id – Message id to replay. Returns: The result of the processing.
-
start() Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
status Getter for status
-
stop() Stop the channel. Called when pypeman shutdown.
-
subhandle(msg) Overload this method only if you know what you are doing. Called by
handlemethod.Parameters: msg – To be processed msg. Returns: Processed message
-
when(condition, name=None, message_store_factory=None) New channel bifurcation that is executed only if condition is True. This channel replace further current channel processing.
Parameters: condition – Can be a value or a function that takes a message argument. Returns: The conditional path channel.
-
class
pypeman.contrib.time.CronChannel(*args, cron='', **kwargs) This channel Launch processing at specified time interval. The first node of the channel receive a payload with the datetime of execution.
Params cron: This set the interval. Accept any aiocron compatible string. -
start() Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
-
class
pypeman.contrib.http.HttpChannel(*args, endpoint=None, method='*', url='/', encoding=None, **kwargs) Channel that handles Http connection. The Http message is the message payload and some headers become metadata of message. Needs
aiohttppython dependency to work.Parameters: - endpoint – HTTP endpoint used to get connections.
- method – Method filter.
- url – Only matching urls messages will be sent to this channel.
- encoding – Encoding of message. Default to ‘utf-8’.
-
start() Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
class
pypeman.channels.FileWatcherChannel(*args, basedir='', regex='.*', interval=1, binary_file=False, path='', **kwargs) Watch for file change or creation. File content becomes message payload.
filepathis in message meta.-
start() Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
-
class
pypeman.contrib.ftp.FTPWatcherChannel(*args, host='', port=21, credentials='', basedir='', regex='.*', interval=60, delete_after=False, encoding='utf-8', thread_pool=None, sort_function=<built-in function sorted>, **kwargs) Channel that watch ftp for file creation.
-
download_file(filename) Download a file from ftp asynchronously.
Parameters: filepath – file path to download. Returns: Content of the downloaded file.
-
get_file_and_process(filename) Download a file from ftp and launch channel processing on msg with result as payload. Also add a filepath header with ftp relative path of downloaded file.
Parameters: filename – file to download relative to basedir. Returns: processed result
-
start() Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
tick() One iteration of watching.
-
watch_for_file() Watch recursively for ftp new files. If file match regex, it is downloaded then processed in a message.
-
-
class
pypeman.contrib.hl7.MLLPChannel(*args, endpoint=None, encoding='utf-8', **kwargs) -
start() Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
Nodes¶
A node is a processing unit in a channel. To create a node, inherit form pypeman.nodes.BaseNode and override process(msg) method.
You can use persistence storage to save data between two pypeman
executions sush last time a specific channel ran by using the two
following methods: save_data and
restore_data
Specific nodes¶
Base for all node. You must inherit from this class if you want to create a new node for your project.
-
class
pypeman.nodes.BaseNode(*args, name=None, log_output=False, **kwargs) Base of all Nodes. If you create a new node, you must inherit from this class and implement process method.
Parameters: - name – Name of node. Used in log or test.
- log_output – To enable output logging for this node.
Store_output_as: Store output message in msg.ctx as specified key
Store_input_as: Store input message in msg.ctx as specified key
Passthrough: If True, node is executed but output message is same as input
-
async_run(msg) Used to overload behaviour like thread Node without rewriting handle process
-
fullpath() Return the channel name and node name dot concatened.
-
handle(msg) Handle message is called by channel to launch process method on it. Some other structural processing take place here. Please, don’t modify unless you know what you are doing.
Parameters: msg – incoming message Returns: modified message after a process call and some treatment
-
mock(input=None, output=None) Allow to mock input or output of a node for testing purpose.
Parameters: - input – A message to replace the input in this node.
- output – A return message to replace processing of this mock.
-
process(msg) Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
restore_data(key, default=<object object>) Restore previously saved data from configured persistence backend.
Parameters: - key – Key of restored data.
- default – if key is missing, don’t raise exception and return this value instead.
Returns: Saved data if exist or default value if specified.
-
run(msg) Used to overload behaviour like thread Node without rewriting handle process
-
save_data(key, value) Save data in configured persistence backend for next usage.
Parameters: - key – Key of saved data.
- value – Value saved.
Thread node allow you to create node that execute is process method in another thread to avoid blocking nodes.
-
class
pypeman.nodes.ThreadNode(*args, thread_pool=None, **kwargs) Inherit from this class instead of BaseNode to avoid long run node blocking main event loop.
-
run(msg) Used to overload behaviour like thread Node without rewriting handle process
-
Other nodes¶
See pypeman.nodes and pypeman.contrib.
Messages¶
Message contains the core information processed by nodes and carried by channel. The message payload may be: Json, Xml, Soap, Hl7, text, Python object…
Useful attributes:
- payload: the message contents.
- meta: message metadata, should be used to add extra information about the payload.
- context: previous messages can be saved in the context dict for further access.
-
class
pypeman.message.Message(content_type='application/text', payload=None, meta=None) A message is the unity of informations exchanged between nodes of a channel.
A message have following properties:
attribute payload: The message content. attribute meta: The message metadata. attribute timestamp: The creation date of message attribute uuid: uuid to identify message attribute content_type: Used ? attribute ctx: Current context when you want to save a message for later use. Parameters: - payload – You can initialise the payload by setting this param.
- meta – Same as payload, you can initialise the meta by setting this param.
-
add_context(key, msg) Add a msg to the .ctx property with specified key.
Parameters: - key – Key to store message.
- msg – Message to store.
-
copy() Copy the message. Useful for channel forking purpose.
Returns: A copy of current message.
-
static
from_dict(data) Convert the input dict previously converted with .as_dict() method in Message object.
Parameters: data – The input dict. Returns: The message message object correponding to given data.
-
static
from_json(data) Create a message from previously saved json string.
Parameters: data – Data to read message from. Returns: A new message instance created from json data.
-
log(logger=<logging.Logger object>, log_level=10, payload=True, meta=True, context=False) Log a message.
Parameters: - logger – Logger
- log_level – log level for all log.
- payload – Whether log payload.
- meta – Whether log meta.
- context – Whether log context.
-
renew() Copy the message but update the timestamp and uuid.
Returns: A copy of current message with new uuid and Timestamp.
-
timestamp_str() Return timestamp formated string
-
to_dict() Convert the current message object to a dict. Payload is pickled.
Returns: A dict with an equivalent of message
-
to_json() Create json string for current message.
Returns: a json string equivalent for message.
-
to_print(payload=True, meta=True, context=False) Return a printable version of message.
Parameters: - payload – Whether print payload.
- meta – Whether print meta.
- context – Whether print context.
Endpoints¶
Endpoints are server instances used by channel to get messages from net protocols like HTTP, Soap or HL7, …. They listen to a specific port for a specific protocol.
-
class
pypeman.contrib.http.HTTPEndpoint(adress=None, address=None, port=None, loop=None, http_args=None, host=None, sock=None, reuse_port=None) Endpoint to receive HTTP connection from outside.
-
class
pypeman.contrib.hl7.MLLPEndpoint(address=None, port=None, encoding='utf-8', loop=None, host=None, sock=None, reuse_port=None)
Message Stores¶
A Message store is really useful to keep a copy of all messages sent to a channel. It’s like a log but with complete message data and metadata. This way you can trace all processing or replay a specific message (Not implemented yet). Each channel can have its message store.
You don’t use message stores directly but a MessageStoreFactory instance to allow reuse of a configuration.
Generic classes¶
-
class
pypeman.msgstore.MessageStoreFactory Message store factory class can generate Message store instance for specific store_id.
-
get_store(store_id) Parameters: store_id – identifier of corresponding message store. Returns: A MessageStore corresponding to correct store_id.
-
-
class
pypeman.msgstore.MessageStore A MessageStore keep an history of processed messages. Mainly used in channels.
-
change_message_state(id, new_state) Change the id message state.
Parameters: - id – Message specific store id.
- new_state – Target state.
-
get(id) Return one message corresponding to given id with his status.
Parameters: id – Message id. Message store dependant. Returns: A dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
-
search(start=0, count=10, order_by='timestamp') Return a list of message with store specific id and processed status.
Parameters: - start – First element.
- count – Count of elements since first element.
- order_by – Message order. Allowed values : [‘timestamp’, ‘status’].
Returns: A list of dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
-
start() Called at startup to initialize store.
-
store(msg) Store a message in the store.
Parameters: msg – The message to store. Returns: Id for this specific message.
-
total() Returns: total count of messages
-
Message store factories¶
-
class
pypeman.msgstore.NullMessageStoreFactory Return an NullMessageStore that do nothing at all.
-
get_store(store_id) Parameters: store_id – identifier of corresponding message store. Returns: A MessageStore corresponding to correct store_id.
-
-
class
pypeman.msgstore.FakeMessageStoreFactory Return an Fake message store
-
get_store(store_id) Parameters: store_id – identifier of corresponding message store. Returns: A MessageStore corresponding to correct store_id.
-
-
class
pypeman.msgstore.MemoryMessageStoreFactory Return a Memory message store. All message are lost at pypeman stop.
-
get_store(store_id) Parameters: store_id – identifier of corresponding message store. Returns: A MessageStore corresponding to correct store_id.
-
-
class
pypeman.msgstore.FileMessageStoreFactory(path) Generate a FileMessageStore message store instance. Store a file in <base_path>/<store_id>/<month>/<day>/ hierachy.
-
get_store(store_id) Parameters: store_id – identifier of corresponding message store. Returns: A MessageStore corresponding to correct store_id.
-