User guide¶
One image is better than 100 words:
Channels¶
Channels are chains of processing, the nodes, which use or transform message contents. A channel is specialised for a type of input and can be linked to an endpoint for incoming messages.
A channel receives a message, processes it and returns the response.
Channels are the main components of pypeman. When you want to process a message, you first create a channel then add nodes to process the message.
-
class
pypeman.channels.
BaseChannel
(name=None, parent_channel=None, loop=None, message_store_factory=None) Base channel are generic channels. If you want to create new channel, inherit from the base class and call
self.handle(msg)
method with generated message.Parameters: - name – Channel name is mandatory and must be unique through the whole project. Name gives a way to get channel in test mode.
- parent_channel – Used with sub channels. Don’t specify yourself.
- loop – To specify a custom event loop.
- message_store_factory – You can specify a message store (see below) at channel initialisation if you want to save all processed message. Use message_store_factory argument with an instance of wanted message store factory.
-
add
(*args) Add specified nodes to channel (Shortcut for append).
Parameters: args – Nodes to add.
-
append
(*args) Append specified nodes to channel.
Parameters: args – Nodes to add.
-
case
(*conditions, names=None, message_store_factory=None) Case between multiple conditions. For each condition specified, a channel is returned by this method in same order as condition are given. When processing a message, conditions are evaluated successively and the first returning true triggers the corresponding channel processing the message. When channel processing is finished, the next node is called.
Parameters: - conditions – Multiple conditions, one for each returned channel.
Should be a boolean or a function that takes a
msg
argument and should return a boolean. - message_store_factory – Allows you to specify a message store factory for all channel of this case.
Returns: one channel by condition parameter.
- conditions – Multiple conditions, one for each returned channel.
Should be a boolean or a function that takes a
-
fork
(name=None, message_store_factory=None) Create a new channel that process a copy of the message at this point. Subchannels are executed in parallel of main process.
Returns: The forked channel
-
get_node
(name) Return node with name in argument. Mainly used in tests.
Parameters: name – The searched node name. Returns: Instance of Node or None if none found.
-
graph
(prefix='', dot=False) Generate a text graph for this channel.
-
graph_dot
(end='') Generate a compatible dot graph for this channel.
-
handle
(msg) Overload this method only if you know what you are doing but call it from child class to add behaviour.
Parameters: msg – To be processed msg. Returns: Processed message
-
handle_and_wait
(msg) Handle a message synchronously. Mainly used for testing purpose.
Parameters: msg – Message to process Returns: Processed message.
-
is_stopped
() Returns: True if channel is in stopped or stopping state.
-
process
(msg) Overload this method only if you know what you are doing. Called by
subhandle
method.Parameters: msg – To be processed msg. Returns: Processed message
-
replay
(msg_id) This method allows you to replay a message from channel message_store.
Parameters: msg_id – Message id to replay. Returns: The result of the processing.
-
start
() Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
status
Getter for status
-
stop
() Stop the channel. Called when pypeman shutdown.
-
subhandle
(msg) Overload this method only if you know what you are doing. Called by
handle
method.Parameters: msg – To be processed msg. Returns: Processed message
-
when
(condition, name=None, message_store_factory=None) New channel bifurcation that is executed only if condition is True. This channel replace further current channel processing.
Parameters: condition – Can be a value or a function that takes a message argument. Returns: The conditional path channel.
-
class
pypeman.contrib.time.
CronChannel
(*args, cron='', **kwargs) This channel Launch processing at specified time interval. The first node of the channel receive a payload with the datetime of execution.
Params cron: This set the interval. Accept any aiocron compatible string. -
start
() Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
-
class
pypeman.contrib.http.
HttpChannel
(*args, endpoint=None, method='*', url='/', encoding=None, **kwargs) Channel that handles Http connection. The Http message is the message payload and some headers become metadata of message. Needs
aiohttp
python dependency to work.Parameters: - endpoint – HTTP endpoint used to get connections.
- method – Method filter.
- url – Only matching urls messages will be sent to this channel.
- encoding – Encoding of message. Default to ‘utf-8’.
-
start
() Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
class
pypeman.channels.
FileWatcherChannel
(*args, basedir='', regex='.*', interval=1, binary_file=False, path='', **kwargs) Watch for file change or creation. File content becomes message payload.
filepath
is in message meta.-
start
() Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
-
class
pypeman.contrib.ftp.
FTPWatcherChannel
(*args, host='', port=21, credentials='', basedir='', regex='.*', interval=60, delete_after=False, encoding='utf-8', thread_pool=None, sort_function=<built-in function sorted>, **kwargs) Channel that watch ftp for file creation.
-
download_file
(filename) Download a file from ftp asynchronously.
Parameters: filepath – file path to download. Returns: Content of the downloaded file.
-
get_file_and_process
(filename) Download a file from ftp and launch channel processing on msg with result as payload. Also add a filepath header with ftp relative path of downloaded file.
Parameters: filename – file to download relative to basedir. Returns: processed result
-
start
() Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
tick
() One iteration of watching.
-
watch_for_file
() Watch recursively for ftp new files. If file match regex, it is downloaded then processed in a message.
-
-
class
pypeman.contrib.hl7.
MLLPChannel
(*args, endpoint=None, encoding='utf-8', **kwargs) -
start
() Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.
-
Nodes¶
A node is a processing unit in a channel. To create a node, inherit form pypeman.nodes.BaseNode and override process(msg) method.
You can use persistence storage to save data between two pypeman
executions sush last time a specific channel ran by using the two
following methods: save_data
and
restore_data
Specific nodes¶
Base for all node. You must inherit from this class if you want to create a new node for your project.
-
class
pypeman.nodes.
BaseNode
(*args, name=None, log_output=False, **kwargs) Base of all Nodes. If you create a new node, you must inherit from this class and implement process method.
Parameters: - name – Name of node. Used in log or test.
- log_output – To enable output logging for this node.
Store_output_as: Store output message in msg.ctx as specified key
Store_input_as: Store input message in msg.ctx as specified key
Passthrough: If True, node is executed but output message is same as input
-
async_run
(msg) Used to overload behaviour like thread Node without rewriting handle process
-
fullpath
() Return the channel name and node name dot concatened.
-
handle
(msg) Handle message is called by channel to launch process method on it. Some other structural processing take place here. Please, don’t modify unless you know what you are doing.
Parameters: msg – incoming message Returns: modified message after a process call and some treatment
-
mock
(input=None, output=None) Allow to mock input or output of a node for testing purpose.
Parameters: - input – A message to replace the input in this node.
- output – A return message to replace processing of this mock.
-
process
(msg) Implement this function in child classes to create a new Node.
Parameters: msg – The incoming message Returns: The processed message
-
restore_data
(key, default=<object object>) Restore previously saved data from configured persistence backend.
Parameters: - key – Key of restored data.
- default – if key is missing, don’t raise exception and return this value instead.
Returns: Saved data if exist or default value if specified.
-
run
(msg) Used to overload behaviour like thread Node without rewriting handle process
-
save_data
(key, value) Save data in configured persistence backend for next usage.
Parameters: - key – Key of saved data.
- value – Value saved.
Thread node allow you to create node that execute is process method in another thread to avoid blocking nodes.
-
class
pypeman.nodes.
ThreadNode
(*args, thread_pool=None, **kwargs) Inherit from this class instead of BaseNode to avoid long run node blocking main event loop.
-
run
(msg) Used to overload behaviour like thread Node without rewriting handle process
-
Other nodes¶
See pypeman.nodes
and pypeman.contrib
.
Messages¶
Message contains the core information processed by nodes and carried by channel. The message payload may be: Json, Xml, Soap, Hl7, text, Python object…
Useful attributes:
- payload: the message contents.
- meta: message metadata, should be used to add extra information about the payload.
- context: previous messages can be saved in the context dict for further access.
-
class
pypeman.message.
Message
(content_type='application/text', payload=None, meta=None) A message is the unity of informations exchanged between nodes of a channel.
A message have following properties:
attribute payload: The message content. attribute meta: The message metadata. attribute timestamp: The creation date of message attribute uuid: uuid to identify message attribute content_type: Used ? attribute ctx: Current context when you want to save a message for later use. Parameters: - payload – You can initialise the payload by setting this param.
- meta – Same as payload, you can initialise the meta by setting this param.
-
add_context
(key, msg) Add a msg to the .ctx property with specified key.
Parameters: - key – Key to store message.
- msg – Message to store.
-
copy
() Copy the message. Useful for channel forking purpose.
Returns: A copy of current message.
-
static
from_dict
(data) Convert the input dict previously converted with .as_dict() method in Message object.
Parameters: data – The input dict. Returns: The message message object correponding to given data.
-
static
from_json
(data) Create a message from previously saved json string.
Parameters: data – Data to read message from. Returns: A new message instance created from json data.
-
log
(logger=<logging.Logger object>, log_level=10, payload=True, meta=True, context=False) Log a message.
Parameters: - logger – Logger
- log_level – log level for all log.
- payload – Whether log payload.
- meta – Whether log meta.
- context – Whether log context.
-
renew
() Copy the message but update the timestamp and uuid.
Returns: A copy of current message with new uuid and Timestamp.
-
timestamp_str
() Return timestamp formated string
-
to_dict
() Convert the current message object to a dict. Payload is pickled.
Returns: A dict with an equivalent of message
-
to_json
() Create json string for current message.
Returns: a json string equivalent for message.
-
to_print
(payload=True, meta=True, context=False) Return a printable version of message.
Parameters: - payload – Whether print payload.
- meta – Whether print meta.
- context – Whether print context.
Endpoints¶
Endpoints are server instances used by channel to get messages from net protocols like HTTP, Soap or HL7, …. They listen to a specific port for a specific protocol.
-
class
pypeman.contrib.http.
HTTPEndpoint
(adress=None, address=None, port=None, loop=None, http_args=None, host=None, sock=None, reuse_port=None) Endpoint to receive HTTP connection from outside.
-
class
pypeman.contrib.hl7.
MLLPEndpoint
(address=None, port=None, encoding='utf-8', loop=None, host=None, sock=None, reuse_port=None)
Message Stores¶
A Message store is really useful to keep a copy of all messages sent to a channel. It’s like a log but with complete message data and metadata. This way you can trace all processing or replay a specific message (Not implemented yet). Each channel can have its message store.
You don’t use message stores directly but a MessageStoreFactory instance to allow reuse of a configuration.
Generic classes¶
-
class
pypeman.msgstore.
MessageStoreFactory
Message store factory class can generate Message store instance for specific store_id.
-
get_store
(store_id) Parameters: store_id – identifier of corresponding message store. Returns: A MessageStore corresponding to correct store_id.
-
-
class
pypeman.msgstore.
MessageStore
A MessageStore keep an history of processed messages. Mainly used in channels.
-
change_message_state
(id, new_state) Change the id message state.
Parameters: - id – Message specific store id.
- new_state – Target state.
-
get
(id) Return one message corresponding to given id with his status.
Parameters: id – Message id. Message store dependant. Returns: A dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
-
search
(start=0, count=10, order_by='timestamp') Return a list of message with store specific id and processed status.
Parameters: - start – First element.
- count – Count of elements since first element.
- order_by – Message order. Allowed values : [‘timestamp’, ‘status’].
Returns: A list of dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
-
start
() Called at startup to initialize store.
-
store
(msg) Store a message in the store.
Parameters: msg – The message to store. Returns: Id for this specific message.
-
total
() Returns: total count of messages
-
Message store factories¶
-
class
pypeman.msgstore.
NullMessageStoreFactory
Return an NullMessageStore that do nothing at all.
-
get_store
(store_id) Parameters: store_id – identifier of corresponding message store. Returns: A MessageStore corresponding to correct store_id.
-
-
class
pypeman.msgstore.
FakeMessageStoreFactory
Return an Fake message store
-
get_store
(store_id) Parameters: store_id – identifier of corresponding message store. Returns: A MessageStore corresponding to correct store_id.
-
-
class
pypeman.msgstore.
MemoryMessageStoreFactory
Return a Memory message store. All message are lost at pypeman stop.
-
get_store
(store_id) Parameters: store_id – identifier of corresponding message store. Returns: A MessageStore corresponding to correct store_id.
-
-
class
pypeman.msgstore.
FileMessageStoreFactory
(path) Generate a FileMessageStore message store instance. Store a file in <base_path>/<store_id>/<month>/<day>/ hierachy.
-
get_store
(store_id) Parameters: store_id – identifier of corresponding message store. Returns: A MessageStore corresponding to correct store_id.
-