Tortoise and the Hare
*********************

Controllers have two methods of talking with Tor…

* **Synchronous** - Most commonly you make a request to Tor then
  receive its reply. The "get_info()" calls in the first tutorial are
  an example of this.

* **Asynchronous** - Controllers can subscribe to be notified when
  various kinds of events occur within Tor (see the "EventType").
  Stem’s users provide a callback function to "add_event_listener()"
  which is then notified when the event occurs.

Try to avoid lengthy operations within event callbacks. They’re
notified by a single dedicated event thread, and blocking this thread
will prevent the delivery of further events.

With that out of the way lets see an example. The following is a
curses application that graphs the bandwidth usage of Tor…

[image]

To do this it listens to **BW events** (the class for which is a
"BandwidthEvent"). These are events that Tor emits each second saying
the number of bytes downloaded and uploaded.

   import curses
   import functools

   from stem.control import EventType, Controller
   from stem.util import str_tools

   # colors that curses can handle

   COLOR_LIST = {
     "red": curses.COLOR_RED,
     "green": curses.COLOR_GREEN,
     "yellow": curses.COLOR_YELLOW,
     "blue": curses.COLOR_BLUE,
     "cyan": curses.COLOR_CYAN,
     "magenta": curses.COLOR_MAGENTA,
     "black": curses.COLOR_BLACK,
     "white": curses.COLOR_WHITE,
   }

   GRAPH_WIDTH = 40
   GRAPH_HEIGHT = 8

   DOWNLOAD_COLOR = "green"
   UPLOAD_COLOR = "blue"

   def main():
     with Controller.from_port(port = 9051) as controller:
       controller.authenticate()

       try:
         # This makes curses initialize and call draw_bandwidth_graph() with a
         # reference to the screen, followed by additional arguments (in this
         # case just the controller).

         curses.wrapper(draw_bandwidth_graph, controller)
       except KeyboardInterrupt:
         pass  # the user hit ctrl+c

   def draw_bandwidth_graph(stdscr, controller):
     window = Window(stdscr)

     # (downloaded, uploaded) tuples for the last 40 seconds

     bandwidth_rates = [(0, 0)] * GRAPH_WIDTH

     # Making a partial that wraps the window and bandwidth_rates with a function
     # for Tor to call when it gets a BW event. This causes the 'window' and
     # 'bandwidth_rates' to be provided as the first two arguments whenever
     # 'bw_event_handler()' is called.

     bw_event_handler = functools.partial(_handle_bandwidth_event, window, bandwidth_rates)

     # Registering this listener with Tor. Tor reports a BW event each second.

     controller.add_event_listener(bw_event_handler, EventType.BW)

     # Pause the main thread until the user hits any key... and no, don't you dare
     # ask where the 'any' key is. :P

     stdscr.getch()

   def _handle_bandwidth_event(window, bandwidth_rates, event):
     # callback for when tor provides us with a BW event

     bandwidth_rates.insert(0, (event.read, event.written))
     bandwidth_rates = bandwidth_rates[:GRAPH_WIDTH]  # truncate old values
     _render_graph(window, bandwidth_rates)

   def _render_graph(window, bandwidth_rates):
     window.erase()

     download_rates = [entry[0] for entry in bandwidth_rates]
     upload_rates = [entry[1] for entry in bandwidth_rates]

     # show the latest values at the top

     label = "Downloaded (%s/s):" % str_tools.size_label(download_rates[0], 1)
     window.addstr(0, 1, label, DOWNLOAD_COLOR, curses.A_BOLD)

     label = "Uploaded (%s/s):" % str_tools.size_label(upload_rates[0], 1)
     window.addstr(0, GRAPH_WIDTH + 7, label, UPLOAD_COLOR, curses.A_BOLD)

     # draw the graph bounds in KB

     max_download_rate = max(download_rates)
     max_upload_rate = max(upload_rates)

     window.addstr(1, 1, "%4i" % (max_download_rate / 1024), DOWNLOAD_COLOR)
     window.addstr(GRAPH_HEIGHT, 1, "   0", DOWNLOAD_COLOR)

     window.addstr(1, GRAPH_WIDTH + 7, "%4i" % (max_upload_rate / 1024), UPLOAD_COLOR)
     window.addstr(GRAPH_HEIGHT, GRAPH_WIDTH + 7, "   0", UPLOAD_COLOR)

     # draw the graph

     for col in range(GRAPH_WIDTH):
       col_height = GRAPH_HEIGHT * download_rates[col] / max(max_download_rate, 1)

       for row in range(col_height):
         window.addstr(GRAPH_HEIGHT - row, col + 6, " ", DOWNLOAD_COLOR, curses.A_STANDOUT)

       col_height = GRAPH_HEIGHT * upload_rates[col] / max(max_upload_rate, 1)

       for row in range(col_height):
         window.addstr(GRAPH_HEIGHT - row, col + GRAPH_WIDTH + 12, " ", UPLOAD_COLOR, curses.A_STANDOUT)

     window.refresh()

   class Window(object):
     """
     Simple wrapper for the curses standard screen object.
     """

     def __init__(self, stdscr):
       self._stdscr = stdscr

       # Mappings of names to the curses color attribute. Initially these all
       # reference black text, but if the terminal can handle color then
       # they're set with that foreground color.

       self._colors = dict([(color, 0) for color in COLOR_LIST])

       # allows for background transparency

       try:
         curses.use_default_colors()
       except curses.error:
         pass

       # makes the cursor invisible

       try:
         curses.curs_set(0)
       except curses.error:
         pass

       # initializes colors if the terminal can handle them

       try:
         if curses.has_colors():
           color_pair = 1

           for name, foreground in COLOR_LIST.items():
             background = -1  # allows for default (possibly transparent) background
             curses.init_pair(color_pair, foreground, background)
             self._colors[name] = curses.color_pair(color_pair)
             color_pair += 1
       except curses.error:
         pass

     def addstr(self, y, x, msg, color = None, attr = curses.A_NORMAL):
       # Curses throws an error if we try to draw a message that spans out of the
       # window's bounds (... seriously?), so doing our best to avoid that.

       if color is not None:
         if color not in self._colors:
           recognized_colors = ", ".join(self._colors.keys())
           raise ValueError("The '%s' color isn't recognized: %s" % (color, recognized_colors))

         attr |= self._colors[color]

       max_y, max_x = self._stdscr.getmaxyx()

       if max_x > x and max_y > y:
         try:
           self._stdscr.addstr(y, x, msg[:max_x - x], attr)
         except:
           pass  # maybe an edge case while resizing the window

     def erase(self):
       self._stdscr.erase()

     def refresh(self):
       self._stdscr.refresh()

   if __name__ == '__main__':
     main()


Advanced Listeners
==================

When you attach a listener to a "Controller" events are processed
within a dedicated thread. This is convenient for simple uses, but can
make troubleshooting your code confusing. For example, exceptions have
nowhere to propagate…

   import time

   from stem.control import EventType, Controller


   def broken_handler(event):
     print('start of broken_handler')
     raise ValueError('boom')
     print('end of broken_handler')


   with Controller.from_port() as controller:
     controller.authenticate()
     controller.add_event_listener(broken_handler, EventType.BW)
     time.sleep(2)

   % python demo.py
   start of broken_handler
   start of broken_handler
   start of broken_handler

… and processing events slower than they’re received will make your
listener fall behind. This can result in a memory leak for long
running processes…

   import time

   from stem.control import EventType, Controller


   with Controller.from_port() as controller:
     def slow_handler(event):
       age = time.time() - event.arrived_at
       unprocessed_count = controller._event_queue.qsize()

       print("processing a BW event that's %0.1f seconds old (%i more events are waiting)" % (age, unprocessed_count))
       time.sleep(5)

     controller.authenticate()
     controller.add_event_listener(slow_handler, EventType.BW)
     time.sleep(10)

   % python demo.py
   processing a BW event that's 0.9 seconds old (0 more events are waiting)
   processing a BW event that's 4.9 seconds old (3 more events are waiting)
   processing a BW event that's 8.9 seconds old (7 more events are waiting)

Avoid performing heavy business logic directly within listeners. For
example, a producer/consumer pattern sidesteps these issues…

   import queue
   import time

   from stem.control import EventType, Controller


   with Controller.from_port() as controller:
     controller.authenticate()

     start_time = time.time()
     event_queue = queue.Queue()

     controller.add_event_listener(lambda event: event_queue.put(event), EventType.BW)

     while time.time() - start_time < 2:
       event = event_queue.get()
       print('I got a BW event for %i bytes downloaded and %i bytes uploaded' % (event.read, event.written))

   % python demo.py
   I got a BW event for 20634 bytes downloaded and 2686 bytes uploaded
   I got a BW event for 0 bytes downloaded and 0 bytes uploaded
   I got a BW event for 0 bytes downloaded and 0 bytes uploaded
