User guide

One image is better than 100 words:

_images/general_view.png

Channels

Channels are chains of processing, the nodes, which use or transform message contents. A channel is specialised for a type of input and can be linked to an endpoint for incoming messages.

A channel receives a message, processes it and returns the response.

Channels are the main components of pypeman. When you want to process a message, you first create a channel then add nodes to process the message.

class pypeman.channels.BaseChannel(name=None, parent_channel=None, loop=None, message_store_factory=None)

Base channel are generic channels. If you want to create new channel, inherit from the base class and call self.handle(msg) method with generated message.

Parameters:
  • name – Channel name is mandatory and must be unique through the whole project. Name gives a way to get channel in test mode.
  • parent_channel – Used with sub channels. Don’t specify yourself.
  • loop – To specify a custom event loop.
  • message_store_factory – You can specify a message store (see below) at channel initialisation if you want to save all processed message. Use message_store_factory argument with an instance of wanted message store factory.
add(*args)

Add specified nodes to channel (Shortcut for append).

Parameters:args – Nodes to add.
append(*args)

Append specified nodes to channel.

Parameters:args – Nodes to add.
case(*conditions, names=None, message_store_factory=None)

Case between multiple conditions. For each condition specified, a channel is returned by this method in same order as condition are given. When processing a message, conditions are evaluated successively and the first returning true triggers the corresponding channel processing the message. When channel processing is finished, the next node is called.

Parameters:
  • conditions – Multiple conditions, one for each returned channel. Should be a boolean or a function that takes a msg argument and should return a boolean.
  • message_store_factory – Allows you to specify a message store factory for all channel of this case.
Returns:

one channel by condition parameter.

fork(name=None, message_store_factory=None)

Create a new channel that process a copy of the message at this point. Subchannels are executed in parallel of main process.

Returns:The forked channel
get_node(name)

Return node with name in argument. Mainly used in tests.

Parameters:name – The searched node name.
Returns:Instance of Node or None if none found.
graph(prefix='', dot=False)

Generate a text graph for this channel.

graph_dot(end='')

Generate a compatible dot graph for this channel.

handle(msg)

Overload this method only if you know what you are doing but call it from child class to add behaviour.

Parameters:msg – To be processed msg.
Returns:Processed message
handle_and_wait(msg)

Handle a message synchronously. Mainly used for testing purpose.

Parameters:msg – Message to process
Returns:Processed message.
is_stopped()
Returns:True if channel is in stopped or stopping state.
process(msg)

Overload this method only if you know what you are doing. Called by subhandle method.

Parameters:msg – To be processed msg.
Returns:Processed message
replay(msg_id)

This method allows you to replay a message from channel message_store.

Parameters:msg_id – Message id to replay.
Returns:The result of the processing.
start()

Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.

status

Getter for status

stop()

Stop the channel. Called when pypeman shutdown.

subhandle(msg)

Overload this method only if you know what you are doing. Called by handle method.

Parameters:msg – To be processed msg.
Returns:Processed message
when(condition, name=None, message_store_factory=None)

New channel bifurcation that is executed only if condition is True. This channel replace further current channel processing.

Parameters:condition – Can be a value or a function that takes a message argument.
Returns:The conditional path channel.
class pypeman.contrib.time.CronChannel(*args, cron='', **kwargs)

This channel Launch processing at specified time interval. The first node of the channel receive a payload with the datetime of execution.

Params cron:This set the interval. Accept any aiocron compatible string.
start()

Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.

class pypeman.contrib.http.HttpChannel(*args, endpoint=None, method='*', url='/', encoding=None, **kwargs)

Channel that handles Http connection. The Http message is the message payload and some headers become metadata of message. Needs aiohttp python dependency to work.

Parameters:
  • endpoint – HTTP endpoint used to get connections.
  • method – Method filter.
  • url – Only matching urls messages will be sent to this channel.
  • encoding – Encoding of message. Default to ‘utf-8’.
start()

Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.

class pypeman.channels.FileWatcherChannel(*args, basedir='', regex='.*', interval=1, binary_file=False, path='', **kwargs)

Watch for file change or creation. File content becomes message payload. filepath is in message meta.

start()

Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.

class pypeman.contrib.ftp.FTPWatcherChannel(*args, host='', port=21, credentials='', basedir='', regex='.*', interval=60, delete_after=False, encoding='utf-8', thread_pool=None, sort_function=<built-in function sorted>, **kwargs)

Channel that watch ftp for file creation.

download_file(filename)

Download a file from ftp asynchronously.

Parameters:filepath – file path to download.
Returns:Content of the downloaded file.
get_file_and_process(filename)

Download a file from ftp and launch channel processing on msg with result as payload. Also add a filepath header with ftp relative path of downloaded file.

Parameters:filename – file to download relative to basedir.
Returns:processed result
start()

Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.

tick()

One iteration of watching.

watch_for_file()

Watch recursively for ftp new files. If file match regex, it is downloaded then processed in a message.

class pypeman.contrib.hl7.MLLPChannel(*args, endpoint=None, encoding='utf-8', **kwargs)
start()

Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.

Nodes

A node is a processing unit in a channel. To create a node, inherit form pypeman.nodes.BaseNode and override process(msg) method.

You can use persistence storage to save data between two pypeman executions sush last time a specific channel ran by using the two following methods: save_data and restore_data

Specific nodes

Base for all node. You must inherit from this class if you want to create a new node for your project.

class pypeman.nodes.BaseNode(*args, name=None, log_output=False, **kwargs)

Base of all Nodes. If you create a new node, you must inherit from this class and implement process method.

save_data

Parameters:
  • name – Name of node. Used in log or test.
  • log_output – To enable output logging for this node.
Store_output_as:
 

Store output message in msg.ctx as specified key

Store_input_as:

Store input message in msg.ctx as specified key

Passthrough:

If True, node is executed but output message is same as input

async_run(msg)

Used to overload behaviour like thread Node without rewriting handle process

fullpath()

Return the channel name and node name dot concatened.

handle(msg)

Handle message is called by channel to launch process method on it. Some other structural processing take place here. Please, don’t modify unless you know what you are doing.

Parameters:msg – incoming message
Returns:modified message after a process call and some treatment
mock(input=None, output=None)

Allow to mock input or output of a node for testing purpose.

Parameters:
  • input – A message to replace the input in this node.
  • output – A return message to replace processing of this mock.
process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
restore_data(key, default=<object object>)

Restore previously saved data from configured persistence backend.

Parameters:
  • key – Key of restored data.
  • default – if key is missing, don’t raise exception and return this value instead.
Returns:

Saved data if exist or default value if specified.

run(msg)

Used to overload behaviour like thread Node without rewriting handle process

save_data(key, value)

Save data in configured persistence backend for next usage.

Parameters:
  • key – Key of saved data.
  • value – Value saved.

Thread node allow you to create node that execute is process method in another thread to avoid blocking nodes.

class pypeman.nodes.ThreadNode(*args, thread_pool=None, **kwargs)

Inherit from this class instead of BaseNode to avoid long run node blocking main event loop.

run(msg)

Used to overload behaviour like thread Node without rewriting handle process

Other nodes

See pypeman.nodes and pypeman.contrib.

Messages

Message contains the core information processed by nodes and carried by channel. The message payload may be: Json, Xml, Soap, Hl7, text, Python object…

Useful attributes:

  • payload: the message contents.
  • meta: message metadata, should be used to add extra information about the payload.
  • context: previous messages can be saved in the context dict for further access.
class pypeman.message.Message(content_type='application/text', payload=None, meta=None)

A message is the unity of informations exchanged between nodes of a channel.

A message have following properties:

attribute payload:
 The message content.
attribute meta:The message metadata.
attribute timestamp:
 The creation date of message
attribute uuid:uuid to identify message
attribute content_type:
 Used ?
attribute ctx:Current context when you want to save a message for later use.
Parameters:
  • payload – You can initialise the payload by setting this param.
  • meta – Same as payload, you can initialise the meta by setting this param.
add_context(key, msg)

Add a msg to the .ctx property with specified key.

Parameters:
  • key – Key to store message.
  • msg – Message to store.
copy()

Copy the message. Useful for channel forking purpose.

Returns:A copy of current message.
static from_dict(data)

Convert the input dict previously converted with .as_dict() method in Message object.

Parameters:data – The input dict.
Returns:The message message object correponding to given data.
static from_json(data)

Create a message from previously saved json string.

Parameters:data – Data to read message from.
Returns:A new message instance created from json data.
log(logger=<logging.Logger object>, log_level=10, payload=True, meta=True, context=False)

Log a message.

Parameters:
  • logger – Logger
  • log_level – log level for all log.
  • payload – Whether log payload.
  • meta – Whether log meta.
  • context – Whether log context.
renew()

Copy the message but update the timestamp and uuid.

Returns:A copy of current message with new uuid and Timestamp.
timestamp_str()

Return timestamp formated string

to_dict()

Convert the current message object to a dict. Payload is pickled.

Returns:A dict with an equivalent of message
to_json()

Create json string for current message.

Returns:a json string equivalent for message.
to_print(payload=True, meta=True, context=False)

Return a printable version of message.

Parameters:
  • payload – Whether print payload.
  • meta – Whether print meta.
  • context – Whether print context.

Endpoints

Endpoints are server instances used by channel to get messages from net protocols like HTTP, Soap or HL7, …. They listen to a specific port for a specific protocol.

class pypeman.contrib.http.HTTPEndpoint(adress=None, address=None, port=None, loop=None, http_args=None, host=None, sock=None, reuse_port=None)

Endpoint to receive HTTP connection from outside.

class pypeman.contrib.hl7.MLLPEndpoint(address=None, port=None, encoding='utf-8', loop=None, host=None, sock=None, reuse_port=None)

Message Stores

A Message store is really useful to keep a copy of all messages sent to a channel. It’s like a log but with complete message data and metadata. This way you can trace all processing or replay a specific message (Not implemented yet). Each channel can have its message store.

You don’t use message stores directly but a MessageStoreFactory instance to allow reuse of a configuration.

Generic classes

class pypeman.msgstore.MessageStoreFactory

Message store factory class can generate Message store instance for specific store_id.

get_store(store_id)
Parameters:store_id – identifier of corresponding message store.
Returns:A MessageStore corresponding to correct store_id.
class pypeman.msgstore.MessageStore

A MessageStore keep an history of processed messages. Mainly used in channels.

change_message_state(id, new_state)

Change the id message state.

Parameters:
  • id – Message specific store id.
  • new_state – Target state.
get(id)

Return one message corresponding to given id with his status.

Parameters:id – Message id. Message store dependant.
Returns:A dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
search(start=0, count=10, order_by='timestamp')

Return a list of message with store specific id and processed status.

Parameters:
  • start – First element.
  • count – Count of elements since first element.
  • order_by – Message order. Allowed values : [‘timestamp’, ‘status’].
Returns:

A list of dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.

start()

Called at startup to initialize store.

store(msg)

Store a message in the store.

Parameters:msg – The message to store.
Returns:Id for this specific message.
total()
Returns:total count of messages

Message store factories

class pypeman.msgstore.NullMessageStoreFactory

Return an NullMessageStore that do nothing at all.

get_store(store_id)
Parameters:store_id – identifier of corresponding message store.
Returns:A MessageStore corresponding to correct store_id.
class pypeman.msgstore.FakeMessageStoreFactory

Return an Fake message store

get_store(store_id)
Parameters:store_id – identifier of corresponding message store.
Returns:A MessageStore corresponding to correct store_id.
class pypeman.msgstore.MemoryMessageStoreFactory

Return a Memory message store. All message are lost at pypeman stop.

get_store(store_id)
Parameters:store_id – identifier of corresponding message store.
Returns:A MessageStore corresponding to correct store_id.
class pypeman.msgstore.FileMessageStoreFactory(path)

Generate a FileMessageStore message store instance. Store a file in <base_path>/<store_id>/<month>/<day>/ hierachy.

get_store(store_id)
Parameters:store_id – identifier of corresponding message store.
Returns:A MessageStore corresponding to correct store_id.