Source code for motorway.contrib.amazon_sqs.intersections

from motorway.intersection import Intersection
import boto3
from utils import sqs_encode_to_json


[docs]class SQSInsertIntersection(Intersection): queue_name = None def __init__(self, **kwargs): super(SQSInsertIntersection, self).__init__(**kwargs) self.sqs = boto3.resource(**self.connection_parameters()) assert self.queue_name, "Please define attribute queue_name on your SQSRamp" self.queue = self.sqs.create_queue(QueueName=self.queue_name) def connection_parameters(self): return { 'region_name': 'eu-west-1', 'service_name': 'sqs' # Add this or use ENV VARS # 'aws_access_key_id': '', # 'aws_secret_access_key': '' }
[docs] def process(self, message): self.queue.send_message(MessageBody=sqs_encode_to_json(message.content)) self.ack(message) yield