decode headers (and body) before checking against rules

This commit is contained in:
Mathieu Agopian
2013-03-28 15:26:40 +01:00
parent 43e23412e4
commit dafc93198b
3 changed files with 85 additions and 2 deletions

View File

@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
from collections import defaultdict
from email.header import decode_header
from re import findall
@@ -47,7 +48,13 @@ class Callback(object):
return None
# if item is not in header, then item == 'body'
value = message.get(item, self.get_email_body(message))
if item == 'body':
value = self.get_email_body(message)
else:
value = message[item]
# decode header (might be encoded as latin-1, utf-8...
value = ' '.join(chunk.decode(encoding or 'ASCII')
for chunk, encoding in decode_header(value))
for regexp in regexps: # store all captures for easy access
self.matches[item] += findall(regexp, value)
@@ -72,7 +79,9 @@ class Callback(object):
filename = part.get_filename()
if content_type == 'text/plain' and filename is None:
# text body of the mail, not an attachment
return part.get_payload()
encoding = part.get_content_charset() or 'ASCII'
return part.get_payload(decode=True).decode(encoding)
return ''
def trigger(self):

View File

@@ -0,0 +1,56 @@
Delivered-To: foo+RANDOM_KEY@example.com
Received: by 10.194.34.7 with SMTP id v7csp101053wji;
Fri, 15 Mar 2013 02:28:52 -0700 (PDT)
Return-Path: <foo+RANDOM_KEY@example.com>
Received-SPF: pass (example.com: domain of foo+RANDOM_KEY@example.com designates 1.2.3.4 as permitted sender) client-ip=1.2.3.4
Authentication-Results: mr.google.com;
spf=pass (example.com: domain of foo+RANDOM_KEY@example.com designates 1.2.3.4 as permitted sender) smtp.mail=foo+RANDOM_KEY@example.com;
dkim=pass header.i=@example.com
X-Received: from mr.google.com ([10.182.31.109])
by 10.182.31.109 with SMTP id z13mr2632031obh.37.1363339731787 (num_hops = 1);
Fri, 15 Mar 2013 02:28:51 -0700 (PDT)
DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
d=example.com; s=20120113;
h=mime-version:x-received:date:message-id:subject:from:to
:content-type;
bh=EDdIiN1bkSUqRxA5ZGCbAxWo/K7ayqdf9ZDEQqAGvDU=;
b=nAVPcbc78q8Uyq8ENfiLD4R1x0Oi7kw5nMAI+eppmCqPxzeM2FITiyyz8M2WQ8rnJl
28ONzknzAEXl6Hm09EDmwgrVLXxM+x2fbNQ8DWkXtFx+3GlOP0OlE2KC2ObWZK2BxVo0
FIEsAZpt/mH4KikhOsHR6J868f/vB/0W6M7JtQGzFhbd6xjEbETDIVlPloYfmZBHs4Rp
nO7fP/VBRvWLFV/VK/OlYVXdS0FhptdCV7Zd4UKTIg5kd6rlAaZuW0KhGe6RXr0ou+aU
nqq0vSoMVK7BeKKGsA61f4YJ5qTAx4eSbOw8mYhQtnLI7qoNrS4h8iiXLWoNnxCEW9UI
YBXA==
MIME-Version: 1.0
X-Received: by 10.182.31.109 with SMTP id z13mr2632031obh.37.1363339731783;
Fri, 15 Mar 2013 02:28:51 -0700 (PDT)
Received: by 10.182.98.129 with HTTP; Fri, 15 Mar 2013 02:28:51 -0700 (PDT)
Date: Fri, 15 Mar 2013 10:28:51 +0100
Message-ID: <CAB-JLVBXqYpS1GzujSAopk3cS1Xo8C8A+bQew0_jkOpAJu1pFw@mail.example.com>
Subject: =?UTF-8?Q?test_cr=C3=A9ation_bannette?=
From: Foo Bar <foo.bar@example.com>
To: =?ISO-8859-1?Q?test_cr=E9ation?= <testmagopian@gmail.com>
Cc: foo@example.com, bar@example.com
Content-Type: multipart/mixed; boundary=14dae93b5c806bd71504d7f3442a
--14dae93b5c806bd71504d7f3442a
Content-Type: multipart/alternative; boundary=14dae93b5c806bd71204d7f34428
--14dae93b5c806bd71204d7f34428
Content-Type: text/plain; charset=UTF-8
Test de création de bannette
--14dae93b5c806bd71204d7f34428
Content-Type: text/html; charset=UTF-8
<div dir="ltr">Test de création de bannette<br></div>
--14dae93b5c806bd71204d7f34428--
--14dae93b5c806bd71504d7f3442a
Content-Type: text/plain; charset=US-ASCII; name="test.txt"
Content-Disposition: attachment; filename="test.txt"
Content-Transfer-Encoding: base64
X-Attachment-Id: f_heb58ogq0
dGVzdCBmaWxlCg==
--14dae93b5c806bd71504d7f3442a--

View File

@@ -73,6 +73,15 @@ class CallbackTest(MailBotTestCase):
self.assertEqual(callback.matches['to'],
['RANDOM_KEY', 'RANDOM_KEY_2'])
def test_check_item_to_encoded(self):
# "to" may be a list of several emails
email_file = join(dirname(__file__), 'mails/mail_encoded_headers.txt')
email = message_from_file(open(email_file, 'r'))
callback = Callback(email, {})
self.assertTrue(callback.check_item('to', [r'(.*) <testmagopian']))
self.assertEqual(callback.matches['to'], [u'test création'])
def test_check_item_body(self):
email_file = join(dirname(__file__), 'mails/mail_with_attachment.txt')
email = message_from_file(open(email_file, 'r'))
@@ -101,6 +110,15 @@ class CallbackTest(MailBotTestCase):
email = message_from_file(open(email_file, 'r'))
self.assertEqual(callback.get_email_body(email), 'Mail content here\n')
def test_get_email_body_encoded(self):
callback = Callback('foo', 'bar')
# real email with encoded mail body
email_file = join(dirname(__file__), 'mails/mail_encoded_headers.txt')
email = message_from_file(open(email_file, 'r'))
self.assertEqual(callback.get_email_body(email),
u'Test de création de bannette\n')
def test_trigger(self):
callback = Callback('foo', 'bar')