import random
from Queue import Empty
import logging
import multiprocessing
from threading import Thread
import time
from time import time as _time
import uuid
import datetime
from motorway.exceptions import SocketBlockedException
from motorway.messages import Message
from motorway.mixins import GrouperMixin, SendMessageMixin, ConnectionMixin
from motorway.threads import ThreadRunner
from motorway.utils import set_timeouts_on_socket
import zmq
logger = logging.getLogger(__name__)
[docs]class Intersection(GrouperMixin, SendMessageMixin, ConnectionMixin, ThreadRunner):
"""
Intersections receive messages and generate either:
- A spin-off message
Spin-off messages will keep track of the state of the entire message tree and re-run it if failed. This means that
if you want to re-run the message all the way from the ramp in case of an error, you should make a spin-off message.
Message.new(message, {
{
'word': 'hello',
'count': 1
},
grouping_value='hello'
})
- A brand new message
The message will be created with the intersection as producer. The intersection will not receive feedback if it
is successful or not and hence will not be re-tried in the case of an error.
Message(uuid.uuid4()
"""
send_control_messages = True
def __init__(self, process_uuid=None):
super(Intersection, self).__init__()
self.messages_processed = 0
self.process_uuid = process_uuid.hex if process_uuid else uuid.uuid4().hex
self.process_name = multiprocessing.current_process().name
self.receive_port = None
self.send_socks = {}
self.send_grouper = None
self.controller_sock = None
self.message_batch_start = datetime.datetime.now() # This is used to time how much time messages take
self.process_id_to_name = {} # Maps UUIDs to human readable names
self.process_address_to_uuid = {} # Maps tcp endpoints to human readable names
self.grouper_instance = None
def thread_factory(self, input_stream, output_stream=None, refresh_connection_stream=None, grouper_cls=None, process_uuid=None):
context = zmq.Context()
# Create Thread Factories :-)
thread_update_connections_factory = lambda: Thread(target=self.connection_thread, name="connection_thread", kwargs={
'refresh_connection_stream': refresh_connection_stream,
'context': context,
'input_queue': input_stream,
'output_queue': output_stream,
'grouper_cls': grouper_cls
})
thread_main_factory = lambda: Thread(target=self.receive_messages, name="message_producer", kwargs={
'context': context,
'output_stream': output_stream,
'grouper_cls': grouper_cls,
})
return [thread_update_connections_factory, thread_main_factory]
def _process(self, receive_sock, controller_sock=None):
try:
if getattr(self.process, 'batch_process', None):
poller = zmq.Poller()
poller.register(receive_sock, zmq.POLLIN)
value = []
end_time = _time() + self.process.wait
while end_time > _time() and len(value) < self.process.limit:
socks = dict(poller.poll(timeout=1000))
if socks.get(receive_sock) == zmq.POLLIN:
value.append(receive_sock.recv_json())
message_count = len(value)
else:
poller = zmq.Poller()
poller.register(receive_sock, zmq.POLLIN)
socks = dict(poller.poll(timeout=1000))
if socks.get(receive_sock) == zmq.POLLIN:
value = receive_sock.recv_json()
else:
return
message_count = 1
if value:
self.messages_processed += message_count
if isinstance(value, list):
message = [Message.from_message(m, controller_sock, process_name=self.process_uuid) for m in value]
else:
message = Message.from_message(value, controller_sock, process_name=self.process_uuid)
try:
self.message_batch_start = datetime.datetime.now()
for generated_message in self.process(message):
if generated_message is not None and self.send_socks:
self.send_message(generated_message, self.process_uuid, time_consumed=(datetime.datetime.now() - self.message_batch_start), control_message=self.send_control_messages)
self.message_batch_start = datetime.datetime.now()
except Exception as e:
logger.error(str(e), exc_info=True)
# Don't send control messages if e.g. web server or other system process
if self.send_control_messages:
if isinstance(message, list):
[m.fail() for m in message]
else:
message.fail()
except Empty: # Didn't receive anything from ZMQ
pass
def ack(self, message, retries=100):
for index in xrange(0, retries):
try:
message.ack(time_consumed=(datetime.datetime.now() - self.message_batch_start))
break
except zmq.Again:
time.sleep(random.randrange(1, 3)) # to avoid peak loads when running multiple processes
else: # if for loop exited cleanly (no break)
raise SocketBlockedException("Acknowledge for process %s could not be sent after %s attempts" % (self.process_name, retries))
self.message_batch_start = datetime.datetime.now()
def fail(self, message, **kwargs):
message.fail(**kwargs)
[docs] def process(self, message):
"""
This function is called continuously by the intersection.
:yield: :class:`motorway.messages.Message` instance
:param message: :class:`motorway.messages.Message` instance or :func:`list` if using
:func:`motorway.decorators.batch_process`
"""
raise NotImplementedError()
[docs] def receive_messages(self, context=None, output_stream=None, grouper_cls=None):
"""
Continously read and process using _process function
"""
receive_sock = context.socket(zmq.PULL)
self.receive_port = receive_sock.bind_to_random_port("tcp://*")
set_timeouts_on_socket(receive_sock)
if self.send_control_messages:
while not self.controller_sock:
time.sleep(1)
while True:
self._process(receive_sock, controller_sock=self.controller_sock)