Amazon Kinesis

The Kinesis ramp is by far the most advanced available. It actually mimicks the behavior of the Amazon Kinesis Client Library but doesn’t depend on Java like KCL.

The interface is very simple, just subclass motorway.contrib.amazon_kinesis.ramps.KinesisRamp and add the attribute “stream_name” according to the name you used for the stream in AWS.

Similarly, there is an intersection which allows you to “dump” content into a Kinesis stream. It works the exact same way.

class motorway.contrib.amazon_kinesis.ramps.KinesisRamp(shard_threads_enabled=True, **kwargs)[source]
can_claim_shard(shard_id)[source]

Determine whether or not a given shard can be claimed because of

  1. It’s currently not being processed by another process
  2. It’s unevenly balanced between the consuming nodes/workers
Parameters:shard_id
Returns:bool
claim_shard(shard_id)[source]

Atomically update the shard in DynamoDB

Parameters:shard_id
Returns:bool
next()[source]

This function is called continuously by the ramp.

Warning

Do not block this for a long time or make a while True loop inside it. Betwen every motorway.ramp.Ramp.next() run, some essential operations are run, including receiving acks from the motorway.controller.Controller

Yield:motorway.messages.Message instance
process_shard(shard_id)[source]

Every shard (at startup) has an active thread that runs this function to either consume or wait to be ready to consume data from a shard

Parameters:shard_id
Returns:
success(_id)[source]

Called when a message was successfully ack’ed through the entire pipeline.

Parameters:_id – The id of the message that was successful
class motorway.contrib.amazon_kinesis.intersections.KinesisInsertIntersection(**kwargs)[source]
process(messages)[source]

wait 1 second and get up to 500 items Each PutRecords request can support up to 500 records. Each record in the request can be as large as 1 MB, up to a limit of 5 MB for the entire request, including partition keys. Each shard can support writes up to 1,000 records per second, up to a maximum data write total of 1 MB per second. This means we can run 2 intersections (2 x 500 records) submitting to the same shard before hitting the write limit (1000 records/sec) If we hit the write limit we wait 2 seconds and try to send the records that failed again, rinse and repeat If any other error than ProvisionedThroughputExceededException or InternalFailure is returned in the response we log it using loglevel error and dump the message for replayability instead of raising an exception that would drop the whole batch. So if you are going to use this intersection in production be sure to monitor and handle the messages with log level error! :param messages: :return: