Amazon Kinesis¶
The Kinesis ramp is by far the most advanced available. It actually mimicks the behavior of the Amazon Kinesis Client Library but doesn’t depend on Java like KCL.
The interface is very simple, just subclass motorway.contrib.amazon_kinesis.ramps.KinesisRamp
and add the
attribute “stream_name” according to the name you used for the stream in AWS.
Similarly, there is an intersection which allows you to “dump” content into a Kinesis stream. It works the exact same way.
-
class
motorway.contrib.amazon_kinesis.ramps.
KinesisRamp
(shard_threads_enabled=True, **kwargs)[source]¶ -
can_claim_shard
(shard_id)[source]¶ Determine whether or not a given shard can be claimed because of
- It’s currently not being processed by another process
- It’s unevenly balanced between the consuming nodes/workers
Parameters: shard_id – Returns: bool
-
claim_shard
(shard_id)[source]¶ Atomically update the shard in DynamoDB
Parameters: shard_id – Returns: bool
-
next
()[source]¶ This function is called continuously by the ramp.
Warning
Do not block this for a long time or make a while True loop inside it. Betwen every
motorway.ramp.Ramp.next()
run, some essential operations are run, including receiving acks from themotorway.controller.Controller
Yield: motorway.messages.Message
instance
-
-
class
motorway.contrib.amazon_kinesis.intersections.
KinesisInsertIntersection
(**kwargs)[source]¶ -
process
(messages)[source]¶ wait 1 second and get up to 500 items Each PutRecords request can support up to 500 records. Each record in the request can be as large as 1 MB, up to a limit of 5 MB for the entire request, including partition keys. Each shard can support writes up to 1,000 records per second, up to a maximum data write total of 1 MB per second. This means we can run 2 intersections (2 x 500 records) submitting to the same shard before hitting the write limit (1000 records/sec) If we hit the write limit we wait 2 seconds and try to send the records that failed again, rinse and repeat If any other error than ProvisionedThroughputExceededException or InternalFailure is returned in the response we log it using loglevel error and dump the message for replayability instead of raising an exception that would drop the whole batch. So if you are going to use this intersection in production be sure to monitor and handle the messages with log level error! :param messages: :return:
-