Refactoring, initial test suite
This commit is contained in:
@@ -1,20 +1,5 @@
|
||||
import email
|
||||
import re
|
||||
import StringIO
|
||||
from imbox.imap import ImapTransport
|
||||
from imbox.parser import get_mail_addresses, decode_mail_header
|
||||
|
||||
|
||||
class Struct(object):
|
||||
def __init__(self, **entries):
|
||||
self.__dict__.update(entries)
|
||||
|
||||
def keys(self):
|
||||
return self.__dict__.keys()
|
||||
|
||||
def __repr__(self):
|
||||
return str(self.__dict__)
|
||||
|
||||
from imbox.parser import parse_email
|
||||
|
||||
class Imbox(object):
|
||||
|
||||
@@ -23,98 +8,11 @@ class Imbox(object):
|
||||
server = ImapTransport(hostname, ssl=ssl)
|
||||
self.connection = server.connect(username, password)
|
||||
|
||||
def parse_attachment(self, message_part):
|
||||
content_disposition = message_part.get("Content-Disposition", None) # Check again if this is a valid attachment
|
||||
if content_disposition != None:
|
||||
dispositions = content_disposition.strip().split(";")
|
||||
|
||||
if dispositions[0].lower() == "attachment":
|
||||
file_data = message_part.get_payload(decode=True)
|
||||
|
||||
attachment = {
|
||||
'content-type': message_part.get_content_type(),
|
||||
'size': len(file_data),
|
||||
'content': StringIO.StringIO(file_data)
|
||||
}
|
||||
|
||||
|
||||
for param in dispositions[1:]:
|
||||
name,value = param.split("=")
|
||||
name = name.lower()
|
||||
|
||||
if 'file' in name:
|
||||
attachment['filename'] = value
|
||||
|
||||
if 'create-date' in name:
|
||||
attachment['create-date'] = value
|
||||
|
||||
return attachment
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def parse_email(self, raw_email):
|
||||
email_message = email.message_from_string(raw_email)
|
||||
maintype = email_message.get_content_maintype()
|
||||
parsed_email = {}
|
||||
|
||||
body = {
|
||||
"plain": [],
|
||||
"html": []
|
||||
}
|
||||
attachments = []
|
||||
|
||||
if maintype == 'multipart':
|
||||
for part in email_message.walk():
|
||||
content = part.get_payload(decode=True)
|
||||
content_type = part.get_content_type()
|
||||
content_disposition = part.get('Content-Disposition', None)
|
||||
|
||||
if content_type == "text/plain" and content_disposition == None:
|
||||
body['plain'].append(content)
|
||||
elif content_type == "text/html" and content_disposition == None:
|
||||
body['html'].append(content)
|
||||
elif content_disposition:
|
||||
attachments.append(self.parse_attachment(part))
|
||||
|
||||
elif maintype == 'text':
|
||||
body['plain'].append(email_message.get_payload(decode=True))
|
||||
|
||||
if len(attachments) > 0:
|
||||
parsed_email['attachments'] = attachments
|
||||
|
||||
parsed_email['body'] = body
|
||||
email_dict = dict(email_message.items())
|
||||
|
||||
parsed_email['sent_from'] = get_mail_addresses(email_message, 'from')
|
||||
parsed_email['sent_to'] = get_mail_addresses(email_message, 'to')
|
||||
|
||||
|
||||
value_headers_keys = ['Subject', 'Date','Message-ID']
|
||||
key_value_header_keys = ['Received-SPF',
|
||||
'MIME-Version',
|
||||
'X-Spam-Status',
|
||||
'X-Spam-Score',
|
||||
'Content-Type']
|
||||
|
||||
parsed_email['headers'] = []
|
||||
for key, value in email_dict.iteritems():
|
||||
|
||||
if key in value_headers_keys:
|
||||
valid_key_name = key.lower()
|
||||
parsed_email[valid_key_name] = decode_mail_header(value)
|
||||
|
||||
if key in key_value_header_keys:
|
||||
parsed_email['headers'].append({'Name': key,
|
||||
'Value': value})
|
||||
|
||||
return Struct(**parsed_email)
|
||||
|
||||
def fetch_by_uid(self, uid):
|
||||
message, data = self.connection.uid('fetch', uid, '(BODY.PEEK[])') # Don't mark the messages as read
|
||||
raw_email = data[0][1]
|
||||
|
||||
email_object = self.parse_email(raw_email)
|
||||
email_object = parse_email(raw_email)
|
||||
|
||||
return email_object
|
||||
|
||||
|
||||
101
imbox/parser.py
101
imbox/parser.py
@@ -1,6 +1,20 @@
|
||||
import re
|
||||
import StringIO
|
||||
import email
|
||||
from email.header import decode_header
|
||||
|
||||
|
||||
class Struct(object):
|
||||
def __init__(self, **entries):
|
||||
self.__dict__.update(entries)
|
||||
|
||||
def keys(self):
|
||||
return self.__dict__.keys()
|
||||
|
||||
def __repr__(self):
|
||||
return str(self.__dict__)
|
||||
|
||||
|
||||
def decode_mail_header(value, default_charset='us-ascii'):
|
||||
"""
|
||||
Decode a header value into a unicode string.
|
||||
@@ -30,4 +44,89 @@ def get_mail_addresses(message, header_name):
|
||||
addresses[index]={'name': decode_mail_header(address_name), 'email': address_email}
|
||||
|
||||
return addresses
|
||||
|
||||
|
||||
def parse_attachment(message_part):
|
||||
content_disposition = message_part.get("Content-Disposition", None) # Check again if this is a valid attachment
|
||||
if content_disposition != None:
|
||||
dispositions = content_disposition.strip().split(";")
|
||||
|
||||
if dispositions[0].lower() == "attachment":
|
||||
file_data = message_part.get_payload(decode=True)
|
||||
|
||||
attachment = {
|
||||
'content-type': message_part.get_content_type(),
|
||||
'size': len(file_data),
|
||||
'content': StringIO.StringIO(file_data)
|
||||
}
|
||||
|
||||
|
||||
for param in dispositions[1:]:
|
||||
name,value = param.split("=")
|
||||
name = name.lower()
|
||||
|
||||
if 'file' in name:
|
||||
attachment['filename'] = value
|
||||
|
||||
if 'create-date' in name:
|
||||
attachment['create-date'] = value
|
||||
|
||||
return attachment
|
||||
|
||||
return None
|
||||
|
||||
def parse_email(raw_email):
|
||||
email_message = email.message_from_string(raw_email)
|
||||
maintype = email_message.get_content_maintype()
|
||||
parsed_email = {}
|
||||
|
||||
body = {
|
||||
"plain": [],
|
||||
"html": []
|
||||
}
|
||||
attachments = []
|
||||
|
||||
if maintype == 'multipart':
|
||||
for part in email_message.walk():
|
||||
content = part.get_payload(decode=True)
|
||||
content_type = part.get_content_type()
|
||||
content_disposition = part.get('Content-Disposition', None)
|
||||
|
||||
if content_type == "text/plain" and content_disposition == None:
|
||||
body['plain'].append(content)
|
||||
elif content_type == "text/html" and content_disposition == None:
|
||||
body['html'].append(content)
|
||||
elif content_disposition:
|
||||
attachments.append(parse_attachment(part))
|
||||
|
||||
elif maintype == 'text':
|
||||
body['plain'].append(email_message.get_payload(decode=True))
|
||||
|
||||
if len(attachments) > 0:
|
||||
parsed_email['attachments'] = attachments
|
||||
|
||||
parsed_email['body'] = body
|
||||
email_dict = dict(email_message.items())
|
||||
|
||||
parsed_email['sent_from'] = get_mail_addresses(email_message, 'from')
|
||||
parsed_email['sent_to'] = get_mail_addresses(email_message, 'to')
|
||||
|
||||
|
||||
value_headers_keys = ['Subject', 'Date','Message-ID']
|
||||
key_value_header_keys = ['Received-SPF',
|
||||
'MIME-Version',
|
||||
'X-Spam-Status',
|
||||
'X-Spam-Score',
|
||||
'Content-Type']
|
||||
|
||||
parsed_email['headers'] = []
|
||||
for key, value in email_dict.iteritems():
|
||||
|
||||
if key in value_headers_keys:
|
||||
valid_key_name = key.lower()
|
||||
parsed_email[valid_key_name] = decode_mail_header(value)
|
||||
|
||||
if key in key_value_header_keys:
|
||||
parsed_email['headers'].append({'Name': key,
|
||||
'Value': value})
|
||||
|
||||
return Struct(**parsed_email)
|
||||
@@ -0,0 +1,64 @@
|
||||
import unittest
|
||||
import email
|
||||
from imbox.parser import *
|
||||
|
||||
raw_email = """Delivered-To: johndoe@gmail.com
|
||||
X-Originating-Email: [martin@amon.cx]
|
||||
Message-ID: <test0@example.com>
|
||||
Return-Path: martin@amon.cx
|
||||
Date: Tue, 30 Jul 2013 15:56:29 +0300
|
||||
From: Martin Rusev <martin@amon.cx>
|
||||
MIME-Version: 1.0
|
||||
To: John Doe <johndoe@gmail.com>
|
||||
Subject: Test email - no attachment
|
||||
Content-Type: multipart/alternative;
|
||||
boundary="------------080505090108000500080106"
|
||||
X-OriginalArrivalTime: 30 Jul 2013 12:56:43.0604 (UTC) FILETIME=[3DD52140:01CE8D24]
|
||||
|
||||
--------------080505090108000500080106
|
||||
Content-Type: text/plain; charset="ISO-8859-1"; format=flowed
|
||||
Content-Transfer-Encoding: 7bit
|
||||
|
||||
Hi, this is a test email with no attachments
|
||||
|
||||
--------------080505090108000500080106
|
||||
Content-Type: text/html; charset="ISO-8859-1"
|
||||
Content-Transfer-Encoding: 7bit
|
||||
|
||||
<html><head>
|
||||
<meta http-equiv="content-type" content="text/html; charset=ISO-8859-1"></head><body
|
||||
bgcolor="#FFFFFF" text="#000000">
|
||||
Hi, this is a test email with no <span style="font-weight: bold;">attachments</span><br>
|
||||
</body>
|
||||
</html>
|
||||
|
||||
--------------080505090108000500080106--
|
||||
"""
|
||||
|
||||
class TestParser(unittest.TestCase):
|
||||
|
||||
|
||||
|
||||
def test_parse_email(self):
|
||||
parsed_email = parse_email(raw_email)
|
||||
|
||||
self.assertEqual(u'Test email - no attachment', parsed_email.subject)
|
||||
|
||||
|
||||
# TODO - Complete the test suite
|
||||
def test_parse_attachment(self):
|
||||
pass
|
||||
|
||||
def test_decode_mail_header(self):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
def test_get_mail_addresses(self):
|
||||
|
||||
to_message_object = email.message_from_string("To: John Doe <johndoe@gmail.com>")
|
||||
self.assertEqual([{'email': 'johndoe@gmail.com', 'name': u'John Doe'}], get_mail_addresses(to_message_object, 'to'))
|
||||
|
||||
from_message_object = email.message_from_string("From: John Smith <johnsmith@gmail.com>")
|
||||
self.assertEqual([{'email': 'johnsmith@gmail.com', 'name': u'John Smith'}], get_mail_addresses(from_message_object, 'from'))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user