API

Channels

class pypeman.channels.BaseChannel(name=None, parent_channel=None, loop=None, message_store_factory=None)

Base channel are generic channels. If you want to create new channel, inherit from the base class and call self.handle(msg) method with generated message.

Parameters:
  • name – Channel name is mandatory and must be unique through the whole project. Name gives a way to get channel in test mode.
  • parent_channel – Used with sub channels. Don’t specify yourself.
  • loop – To specify a custom event loop.
  • message_store_factory – You can specify a message store (see below) at channel initialisation if you want to save all processed message. Use message_store_factory argument with an instance of wanted message store factory.
add(*args)

Add specified nodes to channel (Shortcut for append).

Parameters:args – Nodes to add.
append(*args)

Append specified nodes to channel.

Parameters:args – Nodes to add.
case(*conditions, names=None, message_store_factory=None)

Case between multiple conditions. For each condition specified, a channel is returned by this method in same order as condition are given. When processing a message, conditions are evaluated successively and the first returning true triggers the corresponding channel processing the message. When channel processing is finished, the next node is called.

Parameters:
  • conditions – Multiple conditions, one for each returned channel. Should be a boolean or a function that takes a msg argument and should return a boolean.
  • message_store_factory – Allows you to specify a message store factory for all channel of this case.
Returns:

one channel by condition parameter.

fork(name=None, message_store_factory=None)

Create a new channel that process a copy of the message at this point. Subchannels are executed in parallel of main process.

Returns:The forked channel
get_node(name)

Return node with name in argument. Mainly used in tests.

Parameters:name – The searched node name.
Returns:Instance of Node or None if none found.
graph(prefix='', dot=False)

Generate a text graph for this channel.

graph_dot(end='')

Generate a compatible dot graph for this channel.

handle(msg)

Overload this method only if you know what you are doing but call it from child class to add behaviour.

Parameters:msg – To be processed msg.
Returns:Processed message
handle_and_wait(msg)

Handle a message synchronously. Mainly used for testing purpose.

Parameters:msg – Message to process
Returns:Processed message.
is_stopped()
Returns:True if channel is in stopped or stopping state.
process(msg)

Overload this method only if you know what you are doing. Called by subhandle method.

Parameters:msg – To be processed msg.
Returns:Processed message
replay(msg_id)

This method allows you to replay a message from channel message_store.

Parameters:msg_id – Message id to replay.
Returns:The result of the processing.
start()

Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.

status

Getter for status

stop()

Stop the channel. Called when pypeman shutdown.

subhandle(msg)

Overload this method only if you know what you are doing. Called by handle method.

Parameters:msg – To be processed msg.
Returns:Processed message
when(condition, name=None, message_store_factory=None)

New channel bifurcation that is executed only if condition is True. This channel replace further current channel processing.

Parameters:condition – Can be a value or a function that takes a message argument.
Returns:The conditional path channel.
class pypeman.channels.Case(*args, names=None, parent_channel=None, message_store_factory=None, loop=None)

Case node internally used for .case() BaseChannel method. Don’t use it.

exception pypeman.channels.ChannelStopped

The channel is stopped and can’t process message.

class pypeman.channels.ConditionSubChannel(condition=<function ConditionSubChannel.<lambda>>, **kwargs)

ConditionSubchannel used for make alternative path. This processing replace all further channel processing.

subhandle(msg)

Overload this method only if you know what you are doing. Called by handle method.

Parameters:msg – To be processed msg.
Returns:Processed message
exception pypeman.channels.Dropped

Used to stop process as message is processed. Default success should be returned.

class pypeman.channels.FileWatcherChannel(*args, basedir='', regex='.*', interval=1, binary_file=False, path='', **kwargs)

Watch for file change or creation. File content becomes message payload. filepath is in message meta.

start()

Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.

exception pypeman.channels.Rejected

Used to tell caller the message is invalid with a error return.

class pypeman.channels.SubChannel(name=None, parent_channel=None, loop=None, message_store_factory=None)

Subchannel used for forking channel processing.

process(msg)

Overload this method only if you know what you are doing. Called by subhandle method.

Parameters:msg – To be processed msg.
Returns:Processed message

Endpoint

Node

class pypeman.nodes.B64Decode(*args, altchars=None, **kwargs)

Decode payload from byte to specified encoding

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.B64Encode(*args, altchars=None, **kwargs)

Encode payload in specified encoding to byte.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.BaseNode(*args, name=None, log_output=False, **kwargs)

Base of all Nodes. If you create a new node, you must inherit from this class and implement process method.

save_data

Parameters:
  • name – Name of node. Used in log or test.
  • log_output – To enable output logging for this node.
Store_output_as:
 

Store output message in msg.ctx as specified key

Store_input_as:

Store input message in msg.ctx as specified key

Passthrough:

If True, node is executed but output message is same as input

async_run(msg)

Used to overload behaviour like thread Node without rewriting handle process

fullpath()

Return the channel name and node name dot concatened.

handle(msg)

Handle message is called by channel to launch process method on it. Some other structural processing take place here. Please, don’t modify unless you know what you are doing.

Parameters:msg – incoming message
Returns:modified message after a process call and some treatment
mock(input=None, output=None)

Allow to mock input or output of a node for testing purpose.

Parameters:
  • input – A message to replace the input in this node.
  • output – A return message to replace processing of this mock.
process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
restore_data(key, default=<object object>)

Restore previously saved data from configured persistence backend.

Parameters:
  • key – Key of restored data.
  • default – if key is missing, don’t raise exception and return this value instead.
Returns:

Saved data if exist or default value if specified.

run(msg)

Used to overload behaviour like thread Node without rewriting handle process

save_data(key, value)

Save data in configured persistence backend for next usage.

Parameters:
  • key – Key of saved data.
  • value – Value saved.
class pypeman.nodes.Decode(*args, **kwargs)

Decode payload from byte to specified encoding

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.Drop(message=None, *args, **kwargs)

Use this node to tell the channel the message is Dropped.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.DropNode(*args, **kwargs)
class pypeman.nodes.Email(*args, host=None, port=None, user=None, password=None, ssl=False, start_tls=False, subject=None, sender=None, recipients=None, content=None, **kwargs)

Node that send Email.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.Empty(*args, name=None, log_output=False, **kwargs)

Return an empty new message.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.Encode(*args, **kwargs)

Encode payload in specified encoding to byte.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.FileReader(filename=None, filepath=None, binary_file=False, *args, **kwargs)

Reads a file and sets payload to the file’s contents.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.FileWriter(filepath=None, binary_mode=False, safe_file=True, *args, **kwargs)

Write a file with the message content.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.JsonToPython(*args, encoding='utf-8', **kwargs)

Convert json message payload to python dict.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.Log(*args, **kwargs)

Node to show some information about node, channel and message. Use for debug.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.Map(*args, **kwargs)

Used to map input message keys->values to another keys->values

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.MappingNode(*args, **kwargs)
class pypeman.nodes.MessageStore(*args, **kwargs)
class pypeman.nodes.PythonToJson(*args, encoding='utf-8', indent=None, **kwargs)

Convert python payload to json.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.RaiseError(*args, name=None, log_output=False, **kwargs)
process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.Save(*args, uri=None, **kwargs)

Save a message in specified uri

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.SaveFileBackend(path, filename, channel)

Backend used to store message with Save node.

class pypeman.nodes.SaveNullBackend

For testing purpose

class pypeman.nodes.SetCtx(ctx_name, *args, **kwargs)

Push the message in the context with the key ctx_name

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.Sleep(*args, duration=1, **kwargs)

Wait duration seconds before returning message.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.nodes.ThreadNode(*args, thread_pool=None, **kwargs)

Inherit from this class instead of BaseNode to avoid long run node blocking main event loop.

run(msg)

Used to overload behaviour like thread Node without rewriting handle process

class pypeman.nodes.ToOrderedDict(*args, **kwargs)

this node yields an ordered dict with the keys ‘keys’ and the values from the payload if the payload does not contain certain values defaults can be specified with defaults

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
pypeman.nodes.callable_or_value(val, msg)

Return val(msg) if value is a callable else val.

pypeman.nodes.choose_first_not_none(*args)

Choose first non None alternative in args. :param args: alternative list :return: the first non None alternative.

Contrib

FTP

class pypeman.contrib.ftp.FTPConnection(host, port, credentials)

FTP connection manager.

class pypeman.contrib.ftp.FTPFileDeleter(host='', port=21, credentials=None, filepath=None, **kwargs)

Node to delete a file from FTP.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.contrib.ftp.FTPFileReader(host='', port=21, credentials=None, filepath=None, **kwargs)

Node to read a file from FTP.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.contrib.ftp.FTPFileWriter(host='', port=21, credentials=None, filepath=None, **kwargs)

Node to write content to FTP. File is first written with .part concatenated to its name then renamed to avoid partial upload.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.contrib.ftp.FTPHelper(host, port, credentials)

FTP helper to abstract ftp access.

delete(filepath)

Delete an FTP file. :param filepath: File to delete.

download_file(filepath)

Download a file from ftp asynchronously. :param filepath: file path to download. :return: content of the file.

rename(fromfilepath, tofilepath)

Rename a file from path to another path in ftp. :param fromfilepath: original file to rename. :param tofilepath: destination file.

upload_file(filepath, content)

Upload an file to ftp. :param filepath: Path of file to create. :param content: Content to upload.

class pypeman.contrib.ftp.FTPWatcherChannel(*args, host='', port=21, credentials='', basedir='', regex='.*', interval=60, delete_after=False, encoding='utf-8', thread_pool=None, sort_function=<built-in function sorted>, **kwargs)

Channel that watch ftp for file creation.

download_file(filename)

Download a file from ftp asynchronously.

Parameters:filepath – file path to download.
Returns:Content of the downloaded file.
get_file_and_process(filename)

Download a file from ftp and launch channel processing on msg with result as payload. Also add a filepath header with ftp relative path of downloaded file.

Parameters:filename – file to download relative to basedir.
Returns:processed result
start()

Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.

tick()

One iteration of watching.

watch_for_file()

Watch recursively for ftp new files. If file match regex, it is downloaded then processed in a message.

HL7

class pypeman.contrib.hl7.HL7ToPython(*args, **kwargs)

Convert hl7 payload to python struct.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.contrib.hl7.MLLPChannel(*args, endpoint=None, encoding='utf-8', **kwargs)
start()

Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.

class pypeman.contrib.hl7.MLLPProtocol(handler, loop=None)
Minimal Lower-Layer Protocol (MLLP) takes the form:
<VT>[HL7 Message]<FS><CR>
References:
connection_lost(exc)

Called when the connection is lost or closed. The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

connection_made(transport)

Called when a connection is made. The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.

data_received(data)

Called when some data is received. The argument is a bytes object.

class pypeman.contrib.hl7.PythonToHL7(*args, **kwargs)

Convert python payload to HL7. Must be HL7 structure.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message

HTTP

class pypeman.contrib.http.HTTPEndpoint(adress=None, address=None, port=None, loop=None, http_args=None, host=None, sock=None, reuse_port=None)

Endpoint to receive HTTP connection from outside.

class pypeman.contrib.http.HttpChannel(*args, endpoint=None, method='*', url='/', encoding=None, **kwargs)

Channel that handles Http connection. The Http message is the message payload and some headers become metadata of message. Needs aiohttp python dependency to work.

Parameters:
  • endpoint – HTTP endpoint used to get connections.
  • method – Method filter.
  • url – Only matching urls messages will be sent to this channel.
  • encoding – Encoding of message. Default to ‘utf-8’.
start()

Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.

class pypeman.contrib.http.HttpRequest(url, *args, method=None, headers=None, auth=None, verify=True, params=None, client_cert=None, **kwargs)

Http request node :param url: url to send. :param method: ‘get’, ‘put’ or ‘post’, use meta[‘method’] if None, Default to ‘get’. :param headers: headers for request, use meta[‘headers’] if None. :param auth: tuple or aiohttp.BasicAuth object. :param verify: verify ssl. Default True. :param params: get params in dict. List for multiple elements, ex :

{‘param1’: ‘omega’, param2: [‘alpha’, ‘beta’]}
Parameters:client_cert – tuple with .crt and .key path
handle_request(msg)

generate url and handle request

process(msg)

handles request

class pypeman.contrib.http.RequestNode(*args, **kwargs)

Time

class pypeman.contrib.time.CronChannel(*args, cron='', **kwargs)

This channel Launch processing at specified time interval. The first node of the channel receive a payload with the datetime of execution.

Params cron:This set the interval. Accept any aiocron compatible string.
start()

Start the channel. Called before starting processus. Can be overloaded to specify specific start procedure.

XML

class pypeman.contrib.xml.PythonToXML(*args, **kwargs)

Convert python payload to XML.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message
class pypeman.contrib.xml.XMLToPython(*args, **kwargs)

Convert XML message payload to python dict.

process(msg)

Implement this function in child classes to create a new Node.

Parameters:msg – The incoming message
Returns:The processed message

Message

class pypeman.message.Message(content_type='application/text', payload=None, meta=None)

A message is the unity of informations exchanged between nodes of a channel.

A message have following properties:

attribute payload:
 The message content.
attribute meta:The message metadata.
attribute timestamp:
 The creation date of message
attribute uuid:uuid to identify message
attribute content_type:
 Used ?
attribute ctx:Current context when you want to save a message for later use.
Parameters:
  • payload – You can initialise the payload by setting this param.
  • meta – Same as payload, you can initialise the meta by setting this param.
add_context(key, msg)

Add a msg to the .ctx property with specified key.

Parameters:
  • key – Key to store message.
  • msg – Message to store.
copy()

Copy the message. Useful for channel forking purpose.

Returns:A copy of current message.
static from_dict(data)

Convert the input dict previously converted with .as_dict() method in Message object.

Parameters:data – The input dict.
Returns:The message message object correponding to given data.
static from_json(data)

Create a message from previously saved json string.

Parameters:data – Data to read message from.
Returns:A new message instance created from json data.
log(logger=<logging.Logger object>, log_level=10, payload=True, meta=True, context=False)

Log a message.

Parameters:
  • logger – Logger
  • log_level – log level for all log.
  • payload – Whether log payload.
  • meta – Whether log meta.
  • context – Whether log context.
renew()

Copy the message but update the timestamp and uuid.

Returns:A copy of current message with new uuid and Timestamp.
timestamp_str()

Return timestamp formated string

to_dict()

Convert the current message object to a dict. Payload is pickled.

Returns:A dict with an equivalent of message
to_json()

Create json string for current message.

Returns:a json string equivalent for message.
to_print(payload=True, meta=True, context=False)

Return a printable version of message.

Parameters:
  • payload – Whether print payload.
  • meta – Whether print meta.
  • context – Whether print context.

Message store

class pypeman.msgstore.FakeMessageStore

For testing purpose

get(id)

Return one message corresponding to given id with his status.

Parameters:id – Message id. Message store dependant.
Returns:A dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
search(**kwargs)

Return a list of message with store specific id and processed status.

Parameters:
  • start – First element.
  • count – Count of elements since first element.
  • order_by – Message order. Allowed values : [‘timestamp’, ‘status’].
Returns:

A list of dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.

store(msg)

Store a message in the store.

Parameters:msg – The message to store.
Returns:Id for this specific message.
total()
Returns:total count of messages
class pypeman.msgstore.FakeMessageStoreFactory

Return an Fake message store

get_store(store_id)
Parameters:store_id – identifier of corresponding message store.
Returns:A MessageStore corresponding to correct store_id.
class pypeman.msgstore.FileMessageStore(path, store_id)

Store a file in <base_path>/<store_id>/<month>/<day>/ hierachy.

change_message_state(id, new_state)

Change the id message state.

Parameters:
  • id – Message specific store id.
  • new_state – Target state.
count_msgs()

Count message by listing all directories. To be used at startup.

get(id)

Return one message corresponding to given id with his status.

Parameters:id – Message id. Message store dependant.
Returns:A dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
search(start=0, count=10, order_by='timestamp')

Return a list of message with store specific id and processed status.

Parameters:
  • start – First element.
  • count – Count of elements since first element.
  • order_by – Message order. Allowed values : [‘timestamp’, ‘status’].
Returns:

A list of dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.

sorted_list_directories(path, reverse=True)
Parameters:
  • path – Base path
  • reverse – reverse order
Returns:

List of directories in specified path ordered

start()

Called at startup to initialize store.

store(msg)

Store a file in <base_path>/<store_id>/<month>/<day>/ hierachy.

total()
Returns:total count of messages
class pypeman.msgstore.FileMessageStoreFactory(path)

Generate a FileMessageStore message store instance. Store a file in <base_path>/<store_id>/<month>/<day>/ hierachy.

get_store(store_id)
Parameters:store_id – identifier of corresponding message store.
Returns:A MessageStore corresponding to correct store_id.
class pypeman.msgstore.MemoryMessageStore(base_dict, store_id)

Store messages in memory

change_message_state(id, new_state)

Change the id message state.

Parameters:
  • id – Message specific store id.
  • new_state – Target state.
get(id)

Return one message corresponding to given id with his status.

Parameters:id – Message id. Message store dependant.
Returns:A dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
search(start=0, count=10, order_by='timestamp')

Return a list of message with store specific id and processed status.

Parameters:
  • start – First element.
  • count – Count of elements since first element.
  • order_by – Message order. Allowed values : [‘timestamp’, ‘status’].
Returns:

A list of dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.

store(msg)

Store a message in the store.

Parameters:msg – The message to store.
Returns:Id for this specific message.
total()
Returns:total count of messages
class pypeman.msgstore.MemoryMessageStoreFactory

Return a Memory message store. All message are lost at pypeman stop.

get_store(store_id)
Parameters:store_id – identifier of corresponding message store.
Returns:A MessageStore corresponding to correct store_id.
class pypeman.msgstore.MessageStore

A MessageStore keep an history of processed messages. Mainly used in channels.

change_message_state(id, new_state)

Change the id message state.

Parameters:
  • id – Message specific store id.
  • new_state – Target state.
get(id)

Return one message corresponding to given id with his status.

Parameters:id – Message id. Message store dependant.
Returns:A dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
search(start=0, count=10, order_by='timestamp')

Return a list of message with store specific id and processed status.

Parameters:
  • start – First element.
  • count – Count of elements since first element.
  • order_by – Message order. Allowed values : [‘timestamp’, ‘status’].
Returns:

A list of dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.

start()

Called at startup to initialize store.

store(msg)

Store a message in the store.

Parameters:msg – The message to store.
Returns:Id for this specific message.
total()
Returns:total count of messages
class pypeman.msgstore.MessageStoreFactory

Message store factory class can generate Message store instance for specific store_id.

get_store(store_id)
Parameters:store_id – identifier of corresponding message store.
Returns:A MessageStore corresponding to correct store_id.
class pypeman.msgstore.NullMessageStore

For testing purpose

get(id)

Return one message corresponding to given id with his status.

Parameters:id – Message id. Message store dependant.
Returns:A dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.
search(**kwargs)

Return a list of message with store specific id and processed status.

Parameters:
  • start – First element.
  • count – Count of elements since first element.
  • order_by – Message order. Allowed values : [‘timestamp’, ‘status’].
Returns:

A list of dict {‘id’:<message_id>, ‘state’: <message_state>, ‘message’: <message_object>}.

store(msg)

Store a message in the store.

Parameters:msg – The message to store.
Returns:Id for this specific message.
total()
Returns:total count of messages
class pypeman.msgstore.NullMessageStoreFactory

Return an NullMessageStore that do nothing at all.

get_store(store_id)
Parameters:store_id – identifier of corresponding message store.
Returns:A MessageStore corresponding to correct store_id.

Events

class pypeman.events.Event

Asyncio Event class.

add_handler(handler)

Add a new handler for this event.

fire(*args, **kargs)

Fire current event. All handler are going to be executed.

getHandlerCount()

Return declared handler count.

receiver(handler)

Function decorator to add an handler.

remove_handler(handler)

Remove a previously defined handler for this event.

Remote admin

class pypeman.remoteadmin.PypemanShell(url)
do_channels(arg)

List avaible channels

do_exit(arg)

Exit program

do_list(channel, arg)

List messages of selected channel. You can specify start, end and order_by arguments

do_push(channel, arg)

Inject message with text as payload for selected channel

do_replay(channel, arg)

Replay a message list by their ids

do_select(arg)

Select a channel by is name. Mandatory for channel oriented command

do_start(arg)

Start a channel by his name

do_stop(arg)

Stop a channel by his name

class pypeman.remoteadmin.RemoteAdminClient(loop=None, url='ws://localhost:8091')

Remote admin client. To be use by ipython shell or pypeman shell.

Params url:Pypeman Websocket url.
channels()

Return a list of available channels on remote instance.

exec(command)

Execute any valid python code on remote instance and return stdout result.

list_msg(channel, start=0, count=10, order_by='timestamp')

List first 10 messages on specified channel from remote instance.

Params channel:The channel name.
Params start:Start index of listing.
Params count:Count from index.
Params order_by:
 Message order. only ‘timestamp’ and ‘-timestamp’ handled for now.
Returns:list of message with status.
push_msg(channel, text)

Push a new message from text param to the channel.

Params channel:The channel name.
Params text:This text will be the payload of the message.
replay_msg(channel, msg_ids)

Replay specified message from id list of specified channel on remote instance.

Params channel:The channel name.
Params msg_ids:Message id list to replay
Returns:List of result for each message. Result can be {‘error’: <msg_error>} for one id if error occurs.
send_command(command, args=None)

Send a command to remote instance

start(channel)

Start the specified channel on remote instance.

Params channel:The channel name.
stop(channel)

Stop the specified channel on remote instance.

Params channel:The channel name.
class pypeman.remoteadmin.RemoteAdminServer(loop=None, host='localhost', port='8091', ssl=None, url=None)

Expose json/rpc function to a client by a websocket.

channels()

Return a list of available channels.

command(websocket, path)

Generic function to handle a command from client.

exec(command)

Execute a python command on this instance and return the stdout result.

Parameters:command – The python command to execute. Can be multiline.
Returns:Command stdout result.
get_channel(name)

return channel by is name.all

list_msg(channel, start=0, count=10, order_by='timestamp')

List first count messages from message store of specified channel.

Params channel:The channel name.
push_msg(channel, text)

Push a message in the channel.

Params channel:The channel name.
Params msg_ids:The text added to the payload.
replay_msg(channel, msg_ids)

Replay messages from message store.

Params channel:The channel name.
Params msg_ids:The message ids list to replay.
start()

Start remote admin server

start_channel(channel)

Start the specified channel

Params channel:The channel name to start.
stop_channel(channel)

Stop the specified channel

Params channel:The channel name to stop.