API¶
Channels¶
-
class
pypeman.channels.BaseChannel(name=None, parent_channel=None, loop=None, message_store_factory=None)¶ Base channel are generic channels. If you want to create new channel, inherit from the base class and call
self.handle(msg)method with generated message.Parameters: - name – Channel name is mandatory and must be unique through the whole project. Name gives a way to get channel in test mode.
- parent_channel – Used with sub channels. Don’t specify yourself.
- loop – To specify a custom event loop.
- message_store_factory – You can specify a message store (see below) at channel initialisation if you want to save all processed message. Use message_store_factory argument with an instance of wanted message store factory.
-
add(*args)¶ Add specified nodes to channel (Shortcut for append).
Parameters: args – Nodes to add.
-
append(*args)¶ Append specified nodes to channel.
Parameters: args – Nodes to add.
-
case(*conditions, names=None, message_store_factory=None)¶ Case between multiple conditions. For each condition specified, a channel is returned by this method in same order as condition are given. When processing a message, conditions are evaluated successively and the first returning true triggers the corresponding channel processing the message. When channel processing is finished, the next node is called.
Parameters: - conditions – Multiple conditions, one for each returned channel.
Should be a boolean or a function that takes a
msgargument and should return a boolean. - message_store_factory – Allows you to specify a message store factory for all channel of this case.
Returns: one channel by condition parameter.
- conditions – Multiple conditions, one for each returned channel.
Should be a boolean or a function that takes a
-
fork(name=None, message_store_factory=None)¶ Create a new channel that process a copy of the message at this point. Subchannels are executed in parallel of main process.
Returns: The forked channel
-
get_node(name)¶ Return node with name in argument. Mainly used in tests.
Parameters: name – The searched node name. Returns: Instance of Node or None if none found.
-
graph(prefix='', dot=False)¶ Generate a text graph for this channel.
-
graph_dot(end='')¶ Generate a compatible dot graph for this channel.
-
handle(msg)¶ Overload this method only if you know what you are doing but call it from child class to add behaviour.
Parameters: msg – To be processed msg. Returns: Processed message
-
handle_and_wait(msg)¶ Handle a message synchronously. Mainly used for testing purpose.
Parameters: msg – Message to process Returns: Processed message.
-
is_stopped()¶ Returns: True if channel is in stopped or stopping state.
-
process(msg)¶ Overload this method only if you know what you are doing. Called by
subhandlemethod.Parameters: msg – To be processed msg. Returns: Processed message
-
replay(msg_id)¶ This method allows you to replay a message from channel message_store.
Parameters: msg_id – Message id to replay. Returns: The result of the processing.
-
start()¶ Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
status¶ Getter for status
-
stop()¶ Stop the channel. Called when pypeman shutdown.
-
subhandle(msg)¶ Overload this method only if you know what you are doing. Called by
handlemethod.Parameters: msg – To be processed msg. Returns: Processed message
-
when(condition, name=None, message_store_factory=None)¶ New channel bifurcation that is executed only if condition is True. This channel replace further current channel processing.
Parameters: condition – Can be a value or a function that takes a message argument. Returns: The conditional path channel.
-
class
pypeman.channels.Case(*args, names=None, parent_channel=None, message_store_factory=None, loop=None)¶ Case node internally used for .case() BaseChannel method. Don’t use it.
-
exception
pypeman.channels.ChannelStopped¶ The channel is stopped and can’t process message.
-
class
pypeman.channels.ConditionSubChannel(condition=<function ConditionSubChannel.<lambda>>, **kwargs)¶ ConditionSubchannel used for make alternative path. This processing replace all further channel processing.
-
subhandle(msg)¶ Overload this method only if you know what you are doing. Called by
handlemethod.Parameters: msg – To be processed msg. Returns: Processed message
-
-
exception
pypeman.channels.Dropped¶ Used to stop process as message is processed. Default success should be returned.
-
class
pypeman.channels.FileWatcherChannel(*args, basedir='', regex='.*', interval=1, binary_file=False, path='', **kwargs)¶ Watch for file change or creation. File content becomes message payload.
filepathis in message meta.-
start()¶ Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
-
exception
pypeman.channels.Rejected¶ Used to tell caller the message is invalid with a error return.
-
class
pypeman.channels.SubChannel(name=None, parent_channel=None, loop=None, message_store_factory=None)¶ Subchannel used for forking channel processing.
-
process(msg)¶ Overload this method only if you know what you are doing. Called by
subhandlemethod.Parameters: msg – To be processed msg. Returns: Processed message
-
Endpoint¶
Node¶
-
class
pypeman.nodes.B64Decode(*args, altchars=None, **kwargs)¶ Decode payload from byte to specified encoding
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.B64Encode(*args, altchars=None, **kwargs)¶ Encode payload in specified encoding to byte.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.BaseNode(*args, name=None, log_output=False, **kwargs)¶ Base of all Nodes. If you create a new node, you must inherit from this class and implement process method.
Parameters: - name – Name of node. Used in log or test.
- log_output – To enable output logging for this node.
Store_output_as: Store output message in msg.ctx as specified key
Store_input_as: Store input message in msg.ctx as specified key
Passthrough: If True, node is executed but output message is same as input
-
async_run(msg)¶ Used to overload behaviour like thread Node without rewriting handle process
-
fullpath()¶ Return the channel name and node name dot concatened.
-
handle(msg)¶ Handle message is called by channel to launch process method on it. Some other structural processing take place here. Please, don’t modify unless you know what you are doing.
Parameters: msg – incoming message Returns: modified message after a process call and some treatment
-
mock(input=None, output=None)¶ Allow to mock input or output of a node for testing purpose.
Parameters: - input – A message to replace the input in this node.
- output – A return message to replace processing of this mock.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
restore_data(key, default=<object object>)¶ Restore previously saved data from configured persistence backend.
Parameters: - key – Key of restored data.
- default – if key is missing, don’t raise exception and return this value instead.
Returns: Saved data if exist or default value if specified.
-
run(msg)¶ Used to overload behaviour like thread Node without rewriting handle process
-
save_data(key, value)¶ Save data in configured persistence backend for next usage.
Parameters: - key – Key of saved data.
- value – Value saved.
-
class
pypeman.nodes.Decode(*args, **kwargs)¶ Decode payload from byte to specified encoding
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.Drop(message=None, *args, **kwargs)¶ Use this node to tell the channel the message is Dropped.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.DropNode(*args, **kwargs)¶
-
class
pypeman.nodes.Email(*args, host=None, port=None, user=None, password=None, ssl=False, start_tls=False, subject=None, sender=None, recipients=None, content=None, **kwargs)¶ Node that send Email.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.Empty(*args, name=None, log_output=False, **kwargs)¶ Return an empty new message.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.Encode(*args, **kwargs)¶ Encode payload in specified encoding to byte.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.FileReader(filename=None, filepath=None, binary_file=False, *args, **kwargs)¶ Reads a file and sets payload to the file’s contents.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.FileWriter(filepath=None, binary_mode=False, safe_file=True, *args, **kwargs)¶ Write a file with the message content.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.JsonToPython(*args, encoding='utf-8', **kwargs)¶ Convert json message payload to python dict.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.Log(*args, **kwargs)¶ Node to show some information about node, channel and message. Use for debug.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.Map(*args, **kwargs)¶ Used to map input message keys->values to another keys->values
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.MappingNode(*args, **kwargs)¶
-
class
pypeman.nodes.MessageStore(*args, **kwargs)¶
-
class
pypeman.nodes.PythonToJson(*args, encoding='utf-8', indent=None, **kwargs)¶ Convert python payload to json.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.RaiseError(*args, name=None, log_output=False, **kwargs)¶ -
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.Save(*args, uri=None, **kwargs)¶ Save a message in specified uri
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.SaveFileBackend(path, filename, channel)¶ Backend used to store message with
Savenode.
-
class
pypeman.nodes.SaveNullBackend¶ For testing purpose
-
class
pypeman.nodes.SetCtx(ctx_name, *args, **kwargs)¶ Push the message in the context with the key ctx_name
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.Sleep(*args, duration=1, **kwargs)¶ Wait duration seconds before returning message.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.nodes.ThreadNode(*args, thread_pool=None, **kwargs)¶ Inherit from this class instead of BaseNode to avoid long run node blocking main event loop.
-
run(msg)¶ Used to overload behaviour like thread Node without rewriting handle process
-
-
class
pypeman.nodes.ToOrderedDict(*args, **kwargs)¶ this node yields an ordered dict with the keys ‘keys’ and the values from the payload if the payload does not contain certain values defaults can be specified with defaults
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
pypeman.nodes.callable_or_value(val, msg)¶ Return val(msg) if value is a callable else val.
-
pypeman.nodes.choose_first_not_none(*args)¶ Choose first non None alternative in args. :param args: alternative list :return: the first non None alternative.
Contrib¶
FTP¶
-
class
pypeman.contrib.ftp.FTPConnection(host, port, credentials)¶ FTP connection manager.
-
class
pypeman.contrib.ftp.FTPFileDeleter(host='', port=21, credentials=None, filepath=None, **kwargs)¶ Node to delete a file from FTP.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.contrib.ftp.FTPFileReader(host='', port=21, credentials=None, filepath=None, **kwargs)¶ Node to read a file from FTP.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.contrib.ftp.FTPFileWriter(host='', port=21, credentials=None, filepath=None, **kwargs)¶ Node to write content to FTP. File is first written with .part concatenated to its name then renamed to avoid partial upload.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.contrib.ftp.FTPHelper(host, port, credentials)¶ FTP helper to abstract ftp access.
-
delete(filepath)¶ Delete an FTP file. :param filepath: File to delete.
-
download_file(filepath)¶ Download a file from ftp asynchronously. :param filepath: file path to download. :return: content of the file.
-
rename(fromfilepath, tofilepath)¶ Rename a file from path to another path in ftp. :param fromfilepath: original file to rename. :param tofilepath: destination file.
-
upload_file(filepath, content)¶ Upload an file to ftp. :param filepath: Path of file to create. :param content: Content to upload.
-
-
class
pypeman.contrib.ftp.FTPWatcherChannel(*args, host='', port=21, credentials='', basedir='', regex='.*', interval=60, delete_after=False, encoding='utf-8', thread_pool=None, sort_function=<built-in function sorted>, **kwargs)¶ Channel that watch ftp for file creation.
-
download_file(filename)¶ Download a file from ftp asynchronously.
Parameters: filepath – file path to download. Returns: Content of the downloaded file.
-
get_file_and_process(filename)¶ Download a file from ftp and launch channel processing on msg with result as payload. Also add a filepath header with ftp relative path of downloaded file.
Parameters: filename – file to download relative to basedir. Returns: processed result
-
start()¶ Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
tick()¶ One iteration of watching.
-
watch_for_file()¶ Watch recursively for ftp new files. If file match regex, it is downloaded then processed in a message.
-
HL7¶
-
class
pypeman.contrib.hl7.HL7ToPython(*args, **kwargs)¶ Convert hl7 payload to python struct.
-
process(msg)¶ Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
-
class
pypeman.contrib.hl7.MLLPChannel(*args, endpoint=None, encoding='utf-8', **kwargs)¶ -
start()¶ Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
-
class
pypeman.contrib.hl7.MLLPProtocol(handler, loop=None)¶ - Minimal Lower-Layer Protocol (MLLP) takes the form:
- <VT>[HL7 Message]<FS><CR>
- References:
-
connection_lost(exc)¶ Called when the connection is lost or closed. The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).
-
connection_made(transport)¶ Called when a connection is made. The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.
-
data_received(data)¶ Called when some data is received. The argument is a bytes object.
HTTP¶
-
class
pypeman.contrib.http.HTTPEndpoint(adress=None, address=None, port=None, loop=None, http_args=None, host=None, sock=None, reuse_port=None)¶ Endpoint to receive HTTP connection from outside.
-
class
pypeman.contrib.http.HttpChannel(*args, endpoint=None, method='*', url='/', encoding=None, **kwargs)¶ Channel that handles Http connection. The Http message is the message payload and some headers become metadata of message. Needs
aiohttppython dependency to work.Parameters: - endpoint – HTTP endpoint used to get connections.
- method – Method filter.
- url – Only matching urls messages will be sent to this channel.
- encoding – Encoding of message. Default to ‘utf-8’.
-
start()¶ Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
class
pypeman.contrib.http.HttpRequest(url, *args, method=None, headers=None, auth=None, verify=True, params=None, client_cert=None, **kwargs)¶ Http request node :param url: url to send. :param method: ‘get’, ‘put’ or ‘post’, use meta[‘method’] if None, Default to ‘get’. :param headers: headers for request, use meta[‘headers’] if None. :param auth: tuple or aiohttp.BasicAuth object. :param verify: verify ssl. Default True. :param params: get params in dict. List for multiple elements, ex :
{‘param1’: ‘omega’, param2: [‘alpha’, ‘beta’]}Parameters: client_cert – tuple with .crt and .key path -
handle_request(msg)¶ generate url and handle request
-
process(msg)¶ handles request
-
-
class
pypeman.contrib.http.RequestNode(*args, **kwargs)¶
Time¶
-
class
pypeman.contrib.time.CronChannel(*args, cron='', **kwargs)¶ This channel Launch processing at specified time interval. The first node of the channel receive a payload with the datetime of execution.
Params cron: This set the interval. Accept any aiocron compatible string. -
start()¶ Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
Message¶
-
class
pypeman.message.Message(content_type='application/text', payload=None, meta=None)¶ A message is the unity of informations exchanged between nodes of a channel.
A message have following properties:
attribute payload: The message content. attribute meta: The message metadata. attribute timestamp: The creation date of message attribute uuid: uuid to identify message attribute content_type: Used ? attribute ctx: Current context when you want to save a message for later use. Parameters: - payload – You can initialise the payload by setting this param.
- meta – Same as payload, you can initialise the meta by setting this param.
-
add_context(key, msg)¶ Add a msg to the .ctx property with specified key.
Parameters: - key – Key to store message.
- msg – Message to store.
-
copy()¶ Copy the message. Useful for channel forking purpose.
Returns: A copy of current message.
-
static
from_dict(data)¶ Convert the input dict previously converted with .as_dict() method in Message object.
Parameters: data – The input dict. Returns: The message message object correponding to given data.
-
static
from_json(data)¶ Create a message from previously saved json string.
Parameters: data – Data to read message from. Returns: A new message instance created from json data.
-
log(logger=<logging.Logger object>, log_level=10, payload=True, meta=True, context=False)¶ Log a message.
Parameters: - logger – Logger
- log_level – log level for all log.
- payload – Whether log payload.
- meta – Whether log meta.
- context – Whether log context.
-
renew()¶ Copy the message but update the timestamp and uuid.
Returns: A copy of current message with new uuid and Timestamp.
-
timestamp_str()¶ Return timestamp formated string
-
to_dict()¶ Convert the current message object to a dict. Payload is pickled.
Returns: A dict with an equivalent of message
-
to_json()¶ Create json string for current message.
Returns: a json string equivalent for message.
-
to_print(payload=True, meta=True, context=False)¶ Return a printable version of message.
Parameters: - payload – Whether print payload.
- meta – Whether print meta.
- context – Whether print context.
Message store¶
-
class
pypeman.msgstore.FakeMessageStore¶ For testing purpose
-
get(id)¶ Return one message corresponding to given id with his status.
Parameters: id – Message id. Message store dependant. Returns: A dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
-
search(**kwargs)¶ Return a list of message with store specific id and processed status.
Parameters: - start – First element.
- count – Count of elements since first element.
- order_by – Message order. Allowed values : [‘timestamp’, ‘status’].
Returns: A list of dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
-
store(msg)¶ Store a message in the store.
Parameters: msg – The message to store. Returns: Id for this specific message.
-
total()¶ Returns: total count of messages
-
-
class
pypeman.msgstore.FakeMessageStoreFactory¶ Return an Fake message store
-
get_store(store_id)¶ Parameters: store_id – identifier of corresponding message store. Returns: A MessageStore corresponding to correct store_id.
-
-
class
pypeman.msgstore.FileMessageStore(path, store_id)¶ Store a file in <base_path>/<store_id>/<month>/<day>/ hierachy.
-
change_message_state(id, new_state)¶ Change the id message state.
Parameters: - id – Message specific store id.
- new_state – Target state.
-
count_msgs()¶ Count message by listing all directories. To be used at startup.
-
get(id)¶ Return one message corresponding to given id with his status.
Parameters: id – Message id. Message store dependant. Returns: A dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
-
search(start=0, count=10, order_by='timestamp')¶ Return a list of message with store specific id and processed status.
Parameters: - start – First element.
- count – Count of elements since first element.
- order_by – Message order. Allowed values : [‘timestamp’, ‘status’].
Returns: A list of dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
-
sorted_list_directories(path, reverse=True)¶ Parameters: - path – Base path
- reverse – reverse order
Returns: List of directories in specified path ordered
-
start()¶ Called at startup to initialize store.
-
store(msg)¶ Store a file in <base_path>/<store_id>/<month>/<day>/ hierachy.
-
total()¶ Returns: total count of messages
-
-
class
pypeman.msgstore.FileMessageStoreFactory(path)¶ Generate a FileMessageStore message store instance. Store a file in <base_path>/<store_id>/<month>/<day>/ hierachy.
-
get_store(store_id)¶ Parameters: store_id – identifier of corresponding message store. Returns: A MessageStore corresponding to correct store_id.
-
-
class
pypeman.msgstore.MemoryMessageStore(base_dict, store_id)¶ Store messages in memory
-
change_message_state(id, new_state)¶ Change the id message state.
Parameters: - id – Message specific store id.
- new_state – Target state.
-
get(id)¶ Return one message corresponding to given id with his status.
Parameters: id – Message id. Message store dependant. Returns: A dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
-
search(start=0, count=10, order_by='timestamp')¶ Return a list of message with store specific id and processed status.
Parameters: - start – First element.
- count – Count of elements since first element.
- order_by – Message order. Allowed values : [‘timestamp’, ‘status’].
Returns: A list of dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
-
store(msg)¶ Store a message in the store.
Parameters: msg – The message to store. Returns: Id for this specific message.
-
total()¶ Returns: total count of messages
-
-
class
pypeman.msgstore.MemoryMessageStoreFactory¶ Return a Memory message store. All message are lost at pypeman stop.
-
get_store(store_id)¶ Parameters: store_id – identifier of corresponding message store. Returns: A MessageStore corresponding to correct store_id.
-
-
class
pypeman.msgstore.MessageStore¶ A MessageStore keep an history of processed messages. Mainly used in channels.
-
change_message_state(id, new_state)¶ Change the id message state.
Parameters: - id – Message specific store id.
- new_state – Target state.
-
get(id)¶ Return one message corresponding to given id with his status.
Parameters: id – Message id. Message store dependant. Returns: A dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
-
search(start=0, count=10, order_by='timestamp')¶ Return a list of message with store specific id and processed status.
Parameters: - start – First element.
- count – Count of elements since first element.
- order_by – Message order. Allowed values : [‘timestamp’, ‘status’].
Returns: A list of dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
-
start()¶ Called at startup to initialize store.
-
store(msg)¶ Store a message in the store.
Parameters: msg – The message to store. Returns: Id for this specific message.
-
total()¶ Returns: total count of messages
-
-
class
pypeman.msgstore.MessageStoreFactory¶ Message store factory class can generate Message store instance for specific store_id.
-
get_store(store_id)¶ Parameters: store_id – identifier of corresponding message store. Returns: A MessageStore corresponding to correct store_id.
-
-
class
pypeman.msgstore.NullMessageStore¶ For testing purpose
-
get(id)¶ Return one message corresponding to given id with his status.
Parameters: id – Message id. Message store dependant. Returns: A dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
-
search(**kwargs)¶ Return a list of message with store specific id and processed status.
Parameters: - start – First element.
- count – Count of elements since first element.
- order_by – Message order. Allowed values : [‘timestamp’, ‘status’].
Returns: A list of dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
-
store(msg)¶ Store a message in the store.
Parameters: msg – The message to store. Returns: Id for this specific message.
-
total()¶ Returns: total count of messages
-
Events¶
-
class
pypeman.events.Event¶ Asyncio Event class.
-
add_handler(handler)¶ Add a new handler for this event.
-
fire(*args, **kargs)¶ Fire current event. All handler are going to be executed.
-
getHandlerCount()¶ Return declared handler count.
-
receiver(handler)¶ Function decorator to add an handler.
-
remove_handler(handler)¶ Remove a previously defined handler for this event.
-
Remote admin¶
-
class
pypeman.remoteadmin.PypemanShell(url)¶ -
do_channels(arg)¶ List avaible channels
-
do_exit(arg)¶ Exit program
-
do_list(channel, arg)¶ List messages of selected channel. You can specify start, end and order_by arguments
-
do_push(channel, arg)¶ Inject message with text as payload for selected channel
-
do_replay(channel, arg)¶ Replay a message list by their ids
-
do_select(arg)¶ Select a channel by is name. Mandatory for channel oriented command
-
do_start(arg)¶ Start a channel by his name
-
do_stop(arg)¶ Stop a channel by his name
-
-
class
pypeman.remoteadmin.RemoteAdminClient(loop=None, url='ws://localhost:8091')¶ Remote admin client. To be use by ipython shell or pypeman shell.
Params url: Pypeman Websocket url. -
channels()¶ Return a list of available channels on remote instance.
-
exec(command)¶ Execute any valid python code on remote instance and return stdout result.
-
list_msg(channel, start=0, count=10, order_by='timestamp')¶ List first 10 messages on specified channel from remote instance.
Params channel: The channel name. Params start: Start index of listing. Params count: Count from index. Params order_by: Message order. only ‘timestamp’ and ‘-timestamp’ handled for now. Returns: list of message with status.
-
push_msg(channel, text)¶ Push a new message from text param to the channel.
Params channel: The channel name. Params text: This text will be the payload of the message.
-
replay_msg(channel, msg_ids)¶ Replay specified message from id list of specified channel on remote instance.
Params channel: The channel name. Params msg_ids: Message id list to replay Returns: List of result for each message. Result can be {‘error’: <msg_error>} for one id if error occurs.
-
send_command(command, args=None)¶ Send a command to remote instance
-
start(channel)¶ Start the specified channel on remote instance.
Params channel: The channel name.
-
stop(channel)¶ Stop the specified channel on remote instance.
Params channel: The channel name.
-
-
class
pypeman.remoteadmin.RemoteAdminServer(loop=None, host='localhost', port='8091', ssl=None, url=None)¶ Expose json/rpc function to a client by a websocket.
-
channels()¶ Return a list of available channels.
-
command(websocket, path)¶ Generic function to handle a command from client.
-
exec(command)¶ Execute a python command on this instance and return the stdout result.
Parameters: command – The python command to execute. Can be multiline. Returns: Command stdout result.
-
get_channel(name)¶ return channel by is name.all
-
list_msg(channel, start=0, count=10, order_by='timestamp')¶ List first count messages from message store of specified channel.
Params channel: The channel name.
-
push_msg(channel, text)¶ Push a message in the channel.
Params channel: The channel name. Params msg_ids: The text added to the payload.
-
replay_msg(channel, msg_ids)¶ Replay messages from message store.
Params channel: The channel name. Params msg_ids: The message ids list to replay.
-
start()¶ Start remote admin server
-
start_channel(channel)¶ Start the specified channel
Params channel: The channel name to start.
-
stop_channel(channel)¶ Stop the specified channel
Params channel: The channel name to stop.
-