Welcome to motorway’s documentation!¶
Contents:
Pipeline¶
-
class
motorway.pipeline.
Pipeline
(controller_bind_address='0.0.0.0:7007', run_controller=True, run_webserver=True, run_connection_discovery=True)[source]¶ -
definition
()[source]¶ Extend this method in your
motorway.pipeline.Pipeline
subclass, e.g.:class WordCountPipeline(Pipeline): def definition(self): self.add_ramp(WordRamp, 'sentence') self.add_intersection(SentenceSplitIntersection, 'sentence', 'word', processes=2) self.add_intersection(WordCountIntersection, 'word', 'word_count', grouper_cls=HashRingGrouper, processes=2) self.add_intersection(AggregateIntersection, 'word_count', grouper_cls=HashRingGrouper, processes=1)
-
Ramps¶
-
class
motorway.ramp.
Ramp
(runs_on_controller=False, process_uuid=None)[source]¶ All messages must at some point start at a ramp, which ingests data into the pipeline from an external system or generates data it self (such as random words in the tutorial)
-
failed
(_id)[source]¶ Called when a message failed somewhere in the pipeline. The message might not be entirely finished processing at this point and this function might be called multiple times.
Parameters: _id – The id of the message that failed
-
next
()[source]¶ This function is called continuously by the ramp.
Warning
Do not block this for a long time or make a while True loop inside it. Betwen every
motorway.ramp.Ramp.next()
run, some essential operations are run, including receiving acks from themotorway.controller.Controller
Yield: motorway.messages.Message
instance
-
Intersections¶
-
class
motorway.intersection.
Intersection
(process_uuid=None)[source]¶ Intersections receive messages and generate either:
- A spin-off message
Spin-off messages will keep track of the state of the entire message tree and re-run it if failed. This means that if you want to re-run the message all the way from the ramp in case of an error, you should make a spin-off message.
- Message.new(message, {
- {
- ‘word’: ‘hello’, ‘count’: 1
}, grouping_value=’hello’
})
- A brand new message
The message will be created with the intersection as producer. The intersection will not receive feedback if it is successful or not and hence will not be re-tried in the case of an error.
Message(uuid.uuid4()
-
process
(message)[source]¶ This function is called continuously by the intersection.
Yield: motorway.messages.Message
instanceParameters: message – motorway.messages.Message
instance orlist()
if usingmotorway.decorators.batch_process()
Messages¶
-
class
motorway.messages.
Message
(ramp_unique_id, content=None, ack_value=None, controller_queue=None, grouping_value=None, error_message=None, process_name=None, producer_uuid=None, destination_endpoint=None, destination_uuid=None)[source]¶ Parameters: - ramp_unique_id – the unique message ID delivered back upon completion to the ramp
- content – any json serializable content
- grouping_value – String that can be used for routing messages consistently to the same receiver
Returns: -
ack
(time_consumed=None)[source]¶ Send a message to the controller that this message was properly processed
-
fail
(error_message='', capture_exception=True)[source]¶ Send a message to the controller that this message failed to process
-
classmethod
from_message
(message, controller_queue, process_name=None)[source]¶ Parameters: - message – Message dict (converted from JSON)
- controller_queue –
- process_name – UUID of the process processing this message (as string)
Returns:
-
classmethod
new
(message, content, grouping_value=None, error_message=None)[source]¶ - Creates a new message, based on an existing message. This has the consequence that it will be tracked together
- and the tap will not be notified until every message in the chain is properly ack’ed.
Parameters: - message – Message instance, as received by the intersection
- content – Any value that can be serialized into json
- grouping_value – String that can be used for routing messages consistently to the same receiver
-
send_control_message
(controller_queue, time_consumed=None, process_name=None, destination_endpoint=None, destination_uuid=None, sender=None)[source]¶ Control messages are notifications that a new message have been created, so the controller can keep track of this particular message and let the ramp know once the entire tree of messages has been completed.
This is called implicitly on yield Message(_id, ‘message’)
Parameters: process_name – UUID of the process processing this message (as string)
Contrib modules¶
These are add-ons which is shipped with motorway, but not a part of the “core”
Amazon Kinesis¶
The Kinesis ramp is by far the most advanced available. It actually mimicks the behavior of the Amazon Kinesis Client Library but doesn’t depend on Java like KCL.
The interface is very simple, just subclass motorway.contrib.amazon_kinesis.ramps.KinesisRamp
and add the
attribute “stream_name” according to the name you used for the stream in AWS.
Similarly, there is an intersection which allows you to “dump” content into a Kinesis stream. It works the exact same way.
-
class
motorway.contrib.amazon_kinesis.ramps.
KinesisRamp
(shard_threads_enabled=True, **kwargs)[source]¶ -
can_claim_shard
(shard_id)[source]¶ Determine whether or not a given shard can be claimed because of
- It’s currently not being processed by another process
- It’s unevenly balanced between the consuming nodes/workers
Parameters: shard_id – Returns: bool
-
claim_shard
(shard_id)[source]¶ Atomically update the shard in DynamoDB
Parameters: shard_id – Returns: bool
-
next
()[source]¶ This function is called continuously by the ramp.
Warning
Do not block this for a long time or make a while True loop inside it. Betwen every
motorway.ramp.Ramp.next()
run, some essential operations are run, including receiving acks from themotorway.controller.Controller
Yield: motorway.messages.Message
instance
-
-
class
motorway.contrib.amazon_kinesis.intersections.
KinesisInsertIntersection
(**kwargs)[source]¶ -
process
(messages)[source]¶ wait 1 second and get up to 500 items Each PutRecords request can support up to 500 records. Each record in the request can be as large as 1 MB, up to a limit of 5 MB for the entire request, including partition keys. Each shard can support writes up to 1,000 records per second, up to a maximum data write total of 1 MB per second. This means we can run 2 intersections (2 x 500 records) submitting to the same shard before hitting the write limit (1000 records/sec) If we hit the write limit we wait 2 seconds and try to send the records that failed again, rinse and repeat If any other error than ProvisionedThroughputExceededException or InternalFailure is returned in the response we log it using loglevel error and dump the message for replayability instead of raising an exception that would drop the whole batch. So if you are going to use this intersection in production be sure to monitor and handle the messages with log level error! :param messages: :return:
-
Amazon SQS¶
-
class
motorway.contrib.amazon_sqs.ramps.
SQSRamp
(*args, **kwargs)[source]¶ -
next
()[source]¶ This function is called continuously by the ramp.
Warning
Do not block this for a long time or make a while True loop inside it. Betwen every
motorway.ramp.Ramp.next()
run, some essential operations are run, including receiving acks from themotorway.controller.Controller
Yield: motorway.messages.Message
instance
-
-
class
motorway.contrib.amazon_sqs.intersections.
SQSInsertIntersection
(**kwargs)[source]¶ -
process
(message)[source]¶ This function is called continuously by the intersection.
Yield: motorway.messages.Message
instanceParameters: message – motorway.messages.Message
instance orlist()
if usingmotorway.decorators.batch_process()
-

License¶
Copyright 2014 Plecto ApS
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.