add a timeout parameter: mails processing for too long will be retried
This commit is contained in:
@@ -157,6 +157,25 @@ Mails are flagged according to their state, in the ``process_messages`` method:
|
|||||||
it anymore
|
it anymore
|
||||||
|
|
||||||
|
|
||||||
|
Specifying a timeout
|
||||||
|
~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
To avoid a mail from staying in the "processing" state for too long (for
|
||||||
|
example because a previous ``process_message`` started processing it, but then
|
||||||
|
failed), you may specify a ``timeout`` parameter (in seconds) when
|
||||||
|
instantiating MailBot:
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
from mailbot import MailBot
|
||||||
|
|
||||||
|
|
||||||
|
mailbot = MailBot('imap.myserver.com', 'username', 'password', timeout=180)
|
||||||
|
|
||||||
|
This doesn't mean that the mail will be reset after 3 minutes, but that when
|
||||||
|
``process_messages`` is called, it'll first reset mails that are in the
|
||||||
|
processing state and older than 3 minutes.
|
||||||
|
|
||||||
Specifying rules
|
Specifying rules
|
||||||
----------------
|
----------------
|
||||||
|
|
||||||
|
|||||||
@@ -100,6 +100,32 @@ class MailReceivedTest(MailBotTestCase):
|
|||||||
ids = self.mb.client.search(['Flagged'])
|
ids = self.mb.client.search(['Flagged'])
|
||||||
self.assertEqual(ids, [])
|
self.assertEqual(ids, [])
|
||||||
|
|
||||||
|
def test_reset_timeout_messages(self):
|
||||||
|
self.mb.timeout = -180 # 3 minutes in the future!
|
||||||
|
self.mb.client.append(self.home_folder,
|
||||||
|
message_from_string('').as_string())
|
||||||
|
ids = self.mb.client.search(['Unseen'])
|
||||||
|
self.assertEqual(ids, [1])
|
||||||
|
|
||||||
|
self.mb.mark_processing(1)
|
||||||
|
self.mb.reset_timeout_messages()
|
||||||
|
|
||||||
|
self.assertEquals(self.mb.client.get_flags([1]), {1: ()})
|
||||||
|
|
||||||
|
def test_reset_timeout_messages_no_old_message(self):
|
||||||
|
self.mb.timeout = 180 # 3 minutes ago
|
||||||
|
self.mb.client.append(self.home_folder,
|
||||||
|
message_from_string('').as_string())
|
||||||
|
ids = self.mb.client.search(['Unseen'])
|
||||||
|
self.assertEqual(ids, [1])
|
||||||
|
|
||||||
|
self.mb.mark_processing(1)
|
||||||
|
self.mb.reset_timeout_messages()
|
||||||
|
|
||||||
|
# reset_timeout_messages didn't reset the message
|
||||||
|
self.assertEquals(self.mb.client.get_flags([1]),
|
||||||
|
{1: ('\\Flagged', '\\Seen')})
|
||||||
|
|
||||||
def test_process_messages(self):
|
def test_process_messages(self):
|
||||||
# real mail
|
# real mail
|
||||||
email_file = join(dirname(dirname(__file__)),
|
email_file = join(dirname(dirname(__file__)),
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from datetime import datetime, timedelta
|
||||||
from email import message_from_string
|
from email import message_from_string
|
||||||
|
|
||||||
from imapclient import IMAPClient
|
from imapclient import IMAPClient
|
||||||
@@ -17,11 +18,23 @@ class MailBot(object):
|
|||||||
imapclient = IMAPClient
|
imapclient = IMAPClient
|
||||||
|
|
||||||
def __init__(self, host, username, password, port=None, use_uid=True,
|
def __init__(self, host, username, password, port=None, use_uid=True,
|
||||||
ssl=False, stream=False):
|
ssl=False, stream=False, timeout=None):
|
||||||
|
"""Create, connect and login the MailBot.
|
||||||
|
|
||||||
|
All parameters except from ``timeout`` are used by IMAPClient.
|
||||||
|
|
||||||
|
The timeout parameter is the number of seconds a mail is allowed to
|
||||||
|
stay in the processing state. Mails older than this timeout will have
|
||||||
|
their processing flag removed on the next ``process_messages`` run,
|
||||||
|
allowing MailBot to try processing them again.
|
||||||
|
|
||||||
|
"""
|
||||||
self.client = self.imapclient(host, port=port, use_uid=use_uid,
|
self.client = self.imapclient(host, port=port, use_uid=use_uid,
|
||||||
ssl=ssl, stream=stream)
|
ssl=ssl, stream=stream)
|
||||||
self.client.login(username, password)
|
self.client.login(username, password)
|
||||||
self.client.select_folder(self.home_folder)
|
self.client.select_folder(self.home_folder)
|
||||||
|
self.client.normalise_times = False # deal with UTC everywhere
|
||||||
|
self.timeout = timeout
|
||||||
|
|
||||||
def get_message_ids(self):
|
def get_message_ids(self):
|
||||||
"""Return the list of IDs of messages to process."""
|
"""Return the list of IDs of messages to process."""
|
||||||
@@ -41,6 +54,7 @@ class MailBot(object):
|
|||||||
def process_messages(self):
|
def process_messages(self):
|
||||||
"""Process messages: check which callbacks should be triggered."""
|
"""Process messages: check which callbacks should be triggered."""
|
||||||
from . import CALLBACKS_MAP
|
from . import CALLBACKS_MAP
|
||||||
|
self.reset_timeout_messages()
|
||||||
messages = self.get_messages()
|
messages = self.get_messages()
|
||||||
|
|
||||||
for uid, msg in messages.items():
|
for uid, msg in messages.items():
|
||||||
@@ -50,6 +64,28 @@ class MailBot(object):
|
|||||||
self.process_message(message, callback_class, rules)
|
self.process_message(message, callback_class, rules)
|
||||||
self.mark_processed(uid)
|
self.mark_processed(uid)
|
||||||
|
|
||||||
|
def reset_timeout_messages(self):
|
||||||
|
"""Remove the \\Flagged and \\Seen flags from mails that are too old.
|
||||||
|
|
||||||
|
This makes sure that no mail stays in a processing state without
|
||||||
|
actually being processed. This could happen if a callback timeouts,
|
||||||
|
fails, if MailBot is killed before having finished the processing...
|
||||||
|
|
||||||
|
"""
|
||||||
|
if self.timeout is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
ids = self.client.search(['Flagged', 'Seen'])
|
||||||
|
messages = self.client.fetch(ids, ['INTERNALDATE'])
|
||||||
|
|
||||||
|
# compare datetimes without tzinfo, as UTC
|
||||||
|
date_pivot = datetime.utcnow() - timedelta(seconds=self.timeout)
|
||||||
|
to_reset = [msg_id for msg_id, data in messages.iteritems()
|
||||||
|
if data['INTERNALDATE'].replace(tzinfo=None) < date_pivot]
|
||||||
|
|
||||||
|
if to_reset:
|
||||||
|
self.client.remove_flags(to_reset, ['\\Flagged', '\\Seen'])
|
||||||
|
|
||||||
def mark_processing(self, uid):
|
def mark_processing(self, uid):
|
||||||
"""Mark the message corresponding to uid as being processed."""
|
"""Mark the message corresponding to uid as being processed."""
|
||||||
self.client.add_flags([uid], ['\\Flagged', '\\Seen'])
|
self.client.add_flags([uid], ['\\Flagged', '\\Seen'])
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
from mock import patch, sentinel, Mock, DEFAULT, call
|
from mock import patch, sentinel, Mock, DEFAULT, call
|
||||||
|
|
||||||
from . import MailBotTestCase
|
from . import MailBotTestCase
|
||||||
@@ -10,6 +12,7 @@ class TestableMailBot(MailBot):
|
|||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
self.client = Mock()
|
self.client = Mock()
|
||||||
|
self.timeout = None
|
||||||
|
|
||||||
|
|
||||||
class MailBotClientTest(MailBotTestCase):
|
class MailBotClientTest(MailBotTestCase):
|
||||||
@@ -115,3 +118,26 @@ class MailBotTest(MailBotClientTest):
|
|||||||
['\\Flagged'])
|
['\\Flagged'])
|
||||||
self.bot.client.add_flags.assert_called_once_with([sentinel.id],
|
self.bot.client.add_flags.assert_called_once_with([sentinel.id],
|
||||||
['\\Seen'])
|
['\\Seen'])
|
||||||
|
|
||||||
|
def test_reset_timeout_messages_timeout_none(self):
|
||||||
|
self.bot.timeout = None # don't reset messages, no timeout!
|
||||||
|
self.bot.reset_timeout_messages()
|
||||||
|
self.assertFalse(self.bot.client.search.mock_calls)
|
||||||
|
self.assertFalse(self.bot.client.remove_flags.mock_calls)
|
||||||
|
|
||||||
|
def test_reset_timeout_messages_timeout(self):
|
||||||
|
self.bot.timeout = 0 # always reset messages
|
||||||
|
self.bot.client.search.return_value = [sentinel.id1, sentinel.id2]
|
||||||
|
past = datetime.utcnow() - timedelta(minutes=10)
|
||||||
|
future = datetime.utcnow() + timedelta(minutes=10)
|
||||||
|
self.bot.client.fetch.return_value = {
|
||||||
|
sentinel.id1: {'INTERNALDATE': past, 'SEQ': 1}, # too old: reset
|
||||||
|
sentinel.id2: {'INTERNALDATE': future, 'SEQ': 2}}
|
||||||
|
|
||||||
|
self.bot.reset_timeout_messages()
|
||||||
|
|
||||||
|
self.bot.client.search.assert_called_once_with(['Flagged', 'Seen'])
|
||||||
|
self.bot.client.fetch.assert_called_once_with(
|
||||||
|
[sentinel.id1, sentinel.id2], ['INTERNALDATE'])
|
||||||
|
self.bot.client.remove_flags.assert_called_once_with(
|
||||||
|
[sentinel.id1], ['\\Flagged', '\\Seen'])
|
||||||
|
|||||||
Reference in New Issue
Block a user