17 Commits

Author SHA1 Message Date
5b27fa1b76 removed errant dash that was causing Travis-CI to fail 2018-07-26 11:06:48 -04:00
9cffb51a81 Added missing documentation for all supported query keyword arguments. Fixes #124. Added Pycharm directory to .gitignore.
Fixed var names in documentation of query keywords

moved Messages and Imbox to their own modules, imported Imbox.imbox into __init__.py and put it in __all__. fixes #130.

clarified in documentation and Imbox.messages logging that, unless a folder is specified in the kwargs to Imbox.messages, the returned messages will be from the inbox.  In the documentation this is accomplished exclusively by the var names. fixes #128.

amended `8df7d7c` to reflect manual changes made to `README.rst` in current master, but also added `inbox_` to several var names to make that explicit in the documentation.  Added flags to messages returned by `fetch_email_by_uid`, using the new function `parse_flags` in `parser.py`.  Fixes #126.

added TODO back into query.py
2018-07-26 10:50:10 -04:00
Martin Rusev
06aa4e054b Update README.rst 2018-07-26 14:47:02 +02:00
Martin Rusev
63bddbd73c Update README.rst 2018-07-26 14:45:34 +02:00
Martin Rusev
34149efed5 Merge pull request #129 from zevaverbach/encapsulate_generator
Created Message class to encapsulate the "messages" generator
2018-07-25 19:46:49 +02:00
73fafcb368 made Messages methods private, as well as uid_list. 2018-07-25 12:11:59 -04:00
8e26e92a39 Moved query_ids from Imbox to Messages.
Moved `fetch_list` to `Messages` and renamed `fetch_email_list`, moving assignment of uid_list to the constructor of `Messages`.

Moved `fetch_by_uid` from `Imbox` to a pure function in `parser` module.

Replaced call to `Imbox.fetch_list` with a call to `Messages` instead.

created `Messages` class, which encapsulates the generator `Messages.fetch_email_list`, while also implementing `__len__` to show how many emails match a given query, and `__iter__` to refresh the `fetch_email_list` generator when it's exhausted.  It also implements `__getitem__` to support indexing of the emails matching a query.  Messages requires the `connection` and `parser_policy` established in `Imbox` and accepts arbitrary keyword arguments, which it uses in the IMAP query as well as in the `__repr__`.
2018-07-25 11:48:19 -04:00
635d15441e shortened up the ImapTransport constructor, a couple of similar cleanups 2018-07-25 09:00:21 -04:00
55f64a1922 removed some unused args, renamed to avoid shadowing a name in the global scope. 2018-07-25 08:46:59 -04:00
Martin Rusev
a1801af56e Merge pull request #119 from sblondon/factorize-version
Reuse __version__ for the library version
2018-04-05 10:21:07 +02:00
Stephane Blondon
6a0da4b105 reuse __version__ for the library version 2018-04-04 18:27:27 +02:00
Martin Rusev
5d09b6f3e4 Merge pull request #117 from sblondon/fix-encoding-in-logger
Fix encoding in logger
2018-04-04 15:43:56 +02:00
Stephane Blondon
ecb62b585c refactoring: string formatting more readable 2018-03-21 18:31:08 +01:00
Stephane Blondon
cbb46ef078 fix decoding of sender e-mail if badly encoded 2018-03-21 18:24:59 +01:00
Stephane Blondon
79ce81aa9d fix spelling error 2018-03-19 20:56:22 +01:00
Martin Rusev
96ce737df5 Merge pull request #112 from wagner-certat/version
Expose version in library
2018-01-08 12:56:01 +01:00
Sebastian Wagner
a94682fc3c Expose version in library 2018-01-08 12:07:21 +01:00
11 changed files with 209 additions and 122 deletions

3
.gitignore vendored
View File

@@ -31,3 +31,6 @@ nosetests.xml
example.* example.*
example.py example.py
# PyCharm
.idea/

View File

@@ -36,36 +36,45 @@ Usage
ssl=True, ssl=True,
ssl_context=None, ssl_context=None,
starttls=False) as imbox: starttls=False) as imbox:
# Get all folders # Get all folders
status, folders_with_additional_info = imbox.folders() status, folders_with_additional_info = imbox.folders()
# Gets all messages # Gets all messages from the inbox
all_messages = imbox.messages() all_inbox_messages = imbox.messages()
# Unread messages # Unread messages
unread_messages = imbox.messages(unread=True) unread_inbox_messages = imbox.messages(unread=True)
# Flagged messages
inbox_flagged_messages = imbox.messages(flagged=True)
# Un-flagged messages
inbox_unflagged_messages = imbox.messages(unflagged=True)
# Messages sent FROM # Messages sent FROM
messages_from = imbox.messages(sent_from='martin@amon.cx') inbox_messages_from = imbox.messages(sent_from='sender@example.org')
# Messages sent TO # Messages sent TO
messages_from = imbox.messages(sent_to='martin@amon.cx') inbox_messages_to = imbox.messages(sent_to='receiver@example.org')
# Messages received before specific date # Messages received before specific date
messages_from = imbox.messages(date__lt=datetime.date(2013, 7, 31)) inbox_messages_received_before = imbox.messages(date__lt=datetime.date(2018, 7, 31))
# Messages received after specific date # Messages received after specific date
messages_from = imbox.messages(date__gt=datetime.date(2013, 7, 30)) inbox_messages_received_after = imbox.messages(date__gt=datetime.date(2018, 7, 30))
# Messages received on a specific date # Messages received on a specific date
messages_from = imbox.messages(date__on=datetime.date(2013, 7, 30)) inbox_messages_received_on_date = imbox.messages(date__on=datetime.date(2018, 7, 30))
# Messages from a specific folder # Messages from a specific folder
messages_folder = imbox.messages(folder='Social') messages_from_folder = imbox.messages(folder='Social')
# Messages whose subjects contain a string
inbox_messages_subject_christmas = imbox.messages(subject='Christmas')
for uid, message in all_inbox_messages:
for uid, message in all_messages:
# Every message is an object with the following keys # Every message is an object with the following keys
message.sent_from message.sent_from

View File

@@ -1,93 +1,8 @@
from imbox.imap import ImapTransport from imbox.imbox import Imbox
from imbox.parser import parse_email
from imbox.query import build_search_query
import logging __version_info__ = (0, 9, 5)
logger = logging.getLogger(__name__) __version__ = '.'.join([str(x) for x in __version_info__])
class Imbox: __all__ = ['Imbox']
def __init__(self, hostname, username=None, password=None, ssl=True,
port=None, ssl_context=None, policy=None, starttls=False):
self.server = ImapTransport(hostname, ssl=ssl, port=port,
ssl_context=ssl_context, starttls=starttls)
self.hostname = hostname
self.username = username
self.password = password
self.parser_policy = policy
self.connection = self.server.connect(username, password)
logger.info("Connected to IMAP Server with user {username} on {hostname}{ssl}".format(
hostname=hostname, username=username, ssl=(" over SSL" if ssl or starttls else "")))
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.logout()
def logout(self):
self.connection.close()
self.connection.logout()
logger.info("Disconnected from IMAP Server {username}@{hostname}".format(
hostname=self.hostname, username=self.username))
def query_uids(self, **kwargs):
query = build_search_query(**kwargs)
message, data = self.connection.uid('search', None, query)
if data[0] is None:
return []
return data[0].split()
def fetch_by_uid(self, uid):
message, data = self.connection.uid('fetch', uid, '(BODY.PEEK[])')
logger.debug("Fetched message for UID {}".format(int(uid)))
raw_email = data[0][1]
email_object = parse_email(raw_email, policy=self.parser_policy)
return email_object
def fetch_list(self, **kwargs):
uid_list = self.query_uids(**kwargs)
logger.debug("Fetch all messages for UID in {}".format(uid_list))
for uid in uid_list:
yield (uid, self.fetch_by_uid(uid))
def mark_seen(self, uid):
logger.info("Mark UID {} with \\Seen FLAG".format(int(uid)))
self.connection.uid('STORE', uid, '+FLAGS', '(\\Seen)')
def mark_flag(self, uid):
logger.info("Mark UID {} with \\Flagged FLAG".format(int(uid)))
self.connection.uid('STORE', uid, '+FLAGS', '(\\Flagged)')
def delete(self, uid):
logger.info("Mark UID {} with \\Deleted FLAG and expunge.".format(int(uid)))
mov, data = self.connection.uid('STORE', uid, '+FLAGS', '(\\Deleted)')
self.connection.expunge()
def copy(self, uid, destination_folder):
logger.info("Copy UID {} to {} folder".format(int(uid), str(destination_folder)))
return self.connection.uid('COPY', uid, destination_folder)
def move(self, uid, destination_folder):
logger.info("Move UID {} to {} folder".format(int(uid), str(destination_folder)))
if self.copy(uid, destination_folder):
self.delete(uid)
def messages(self, *args, **kwargs):
folder = kwargs.get('folder', False)
msg = ""
if folder:
self.connection.select(folder)
msg = " from folder '{}'".format(folder)
logger.info("Fetch list of messages{}".format(msg))
return self.fetch_list(**kwargs)
def folders(self):
return self.connection.list()

View File

@@ -10,22 +10,16 @@ class ImapTransport:
def __init__(self, hostname, port=None, ssl=True, ssl_context=None, starttls=False): def __init__(self, hostname, port=None, ssl=True, ssl_context=None, starttls=False):
self.hostname = hostname self.hostname = hostname
self.port = port
kwargs = {}
if ssl: if ssl:
self.transport = IMAP4_SSL self.port = port or 993
if not self.port:
self.port = 993
if ssl_context is None: if ssl_context is None:
ssl_context = pythonssllib.create_default_context() ssl_context = pythonssllib.create_default_context()
kwargs["ssl_context"] = ssl_context self.server = IMAP4_SSL(self.hostname, self.port, ssl_context=ssl_context)
else: else:
self.transport = IMAP4 self.port = port or 143
if not self.port: self.server = IMAP4(self.hostname, self.port)
self.port = 143
self.server = self.transport(self.hostname, self.port, **kwargs)
if starttls: if starttls:
self.server.starttls() self.server.starttls()
logger.debug("Created IMAP4 transport for {host}:{port}" logger.debug("Created IMAP4 transport for {host}:{port}"

72
imbox/imbox.py Normal file
View File

@@ -0,0 +1,72 @@
from imbox.imap import ImapTransport
from imbox.messages import Messages
import logging
logger = logging.getLogger(__name__)
class Imbox:
def __init__(self, hostname, username=None, password=None, ssl=True,
port=None, ssl_context=None, policy=None, starttls=False):
self.server = ImapTransport(hostname, ssl=ssl, port=port,
ssl_context=ssl_context, starttls=starttls)
self.hostname = hostname
self.username = username
self.password = password
self.parser_policy = policy
self.connection = self.server.connect(username, password)
logger.info("Connected to IMAP Server with user {username} on {hostname}{ssl}".format(
hostname=hostname, username=username, ssl=(" over SSL" if ssl or starttls else "")))
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.logout()
def logout(self):
self.connection.close()
self.connection.logout()
logger.info("Disconnected from IMAP Server {username}@{hostname}".format(
hostname=self.hostname, username=self.username))
def mark_seen(self, uid):
logger.info("Mark UID {} with \\Seen FLAG".format(int(uid)))
self.connection.uid('STORE', uid, '+FLAGS', '(\\Seen)')
def mark_flag(self, uid):
logger.info("Mark UID {} with \\Flagged FLAG".format(int(uid)))
self.connection.uid('STORE', uid, '+FLAGS', '(\\Flagged)')
def delete(self, uid):
logger.info("Mark UID {} with \\Deleted FLAG and expunge.".format(int(uid)))
self.connection.expunge()
def copy(self, uid, destination_folder):
logger.info("Copy UID {} to {} folder".format(int(uid), str(destination_folder)))
return self.connection.uid('COPY', uid, destination_folder)
def move(self, uid, destination_folder):
logger.info("Move UID {} to {} folder".format(int(uid), str(destination_folder)))
if self.copy(uid, destination_folder):
self.delete(uid)
def messages(self, **kwargs):
folder = kwargs.get('folder', False)
if folder:
self.connection.select(folder)
msg = " from folder '{}'".format(folder)
else:
msg = " from inbox"
logger.info("Fetch list of messages{}".format(msg))
return Messages(connection=self.connection,
parser_policy=self.parser_policy,
**kwargs)
def folders(self):
return self.connection.list()

62
imbox/messages.py Normal file
View File

@@ -0,0 +1,62 @@
from imbox.parser import fetch_email_by_uid
from imbox.query import build_search_query
import logging
logger = logging.getLogger(__name__)
class Messages:
def __init__(self,
connection,
parser_policy,
**kwargs):
self.connection = connection
self.parser_policy = parser_policy
self.kwargs = kwargs
self._uid_list = self._query_uids(**kwargs)
logger.debug("Fetch all messages for UID in {}".format(self._uid_list))
def _fetch_email(self, uid):
return fetch_email_by_uid(uid=uid,
connection=self.connection,
parser_policy=self.parser_policy)
def _query_uids(self, **kwargs):
query_ = build_search_query(**kwargs)
message, data = self.connection.uid('search', None, query_)
if data[0] is None:
return []
return data[0].split()
def _fetch_email_list(self):
for uid in self._uid_list:
yield uid, self._fetch_email(uid)
def __repr__(self):
if len(self.kwargs) > 0:
return 'Messages({})'.format('\n'.join('{}={}'.format(key, value)
for key, value in self.kwargs.items()))
return 'Messages(ALL)'
def __iter__(self):
return self._fetch_email_list()
def __next__(self):
return self
def __len__(self):
return len(self._uid_list)
def __getitem__(self, index):
uids = self._uid_list[index]
if not isinstance(uids, list):
uid = uids
return uid, self._fetch_email(uid)
return [(uid, self._fetch_email(uid))
for uid in uids]

View File

@@ -1,14 +1,17 @@
import imaplib
import io import io
import re import re
import email import email
import base64 import base64
import quopri import quopri
import sys
import time import time
from datetime import datetime from datetime import datetime
from email.header import decode_header from email.header import decode_header
from imbox.utils import str_encode, str_decode from imbox.utils import str_encode, str_decode
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -33,7 +36,10 @@ def decode_mail_header(value, default_charset='us-ascii'):
return str_decode(str_encode(value, default_charset, 'replace'), default_charset) return str_decode(str_encode(value, default_charset, 'replace'), default_charset)
else: else:
for index, (text, charset) in enumerate(headers): for index, (text, charset) in enumerate(headers):
logger.debug("Mail header no. {}: {} encoding {}".format(index, str_decode(text, charset or 'utf-8'), charset)) logger.debug("Mail header no. {index}: {data} encoding {charset}".format(
index=index,
data=str_decode(text, charset or 'utf-8', 'replace'),
charset=charset))
try: try:
headers[index] = str_decode(text, charset or default_charset, headers[index] = str_decode(text, charset or default_charset,
'replace') 'replace')
@@ -54,7 +60,7 @@ def get_mail_addresses(message, header_name):
for index, (address_name, address_email) in enumerate(addresses): for index, (address_name, address_email) in enumerate(addresses):
addresses[index] = {'name': decode_mail_header(address_name), addresses[index] = {'name': decode_mail_header(address_name),
'email': address_email} 'email': address_email}
logger.debug("{} Mail addressees in message: <{}> {}".format(header_name.upper(), address_name, address_email)) logger.debug("{} Mail address in message: <{}> {}".format(header_name.upper(), address_name, address_email))
return addresses return addresses
@@ -63,7 +69,7 @@ def decode_param(param):
values = v.split('\n') values = v.split('\n')
value_results = [] value_results = []
for value in values: for value in values:
match = re.search(r'=\?((?:\w|-)+)\?(Q|B)\?(.+)\?=', value) match = re.search(r'=\?((?:\w|-)+)\?([QB])\?(.+)\?=', value)
if match: if match:
encoding, type_, code = match.groups() encoding, type_, code = match.groups()
if type_ == 'Q': if type_ == 'Q':
@@ -124,6 +130,28 @@ def decode_content(message):
return content return content
def fetch_email_by_uid(uid, connection, parser_policy):
message, data = connection.uid('fetch', uid, '(BODY.PEEK[] FLAGS)')
logger.debug("Fetched message for UID {}".format(int(uid)))
raw_headers, raw_email = data[0]
email_object = parse_email(raw_email, policy=parser_policy)
flags = parse_flags(raw_headers.decode())
email_object.__dict__['flags'] = flags
return email_object
def parse_flags(headers):
"""Copied from https://github.com/girishramnani/gmail/blob/master/gmail/message.py"""
if len(headers) == 0:
return []
if sys.version_info[0] == 3:
headers = bytes(headers, "ascii")
return list(imaplib.ParseFlags(headers))
def parse_email(raw_email, policy=None): def parse_email(raw_email, policy=None):
if isinstance(raw_email, bytes): if isinstance(raw_email, bytes):
raw_email = str_encode(raw_email, 'utf-8', errors='ignore') raw_email = str_encode(raw_email, 'utf-8', errors='ignore')
@@ -137,9 +165,7 @@ def parse_email(raw_email, policy=None):
except UnicodeEncodeError: except UnicodeEncodeError:
email_message = email.message_from_string(raw_email.encode('utf-8'), **email_parse_kwargs) email_message = email.message_from_string(raw_email.encode('utf-8'), **email_parse_kwargs)
maintype = email_message.get_content_maintype() maintype = email_message.get_content_maintype()
parsed_email = {} parsed_email = {'raw_email': raw_email}
parsed_email['raw_email'] = raw_email
body = { body = {
"plain": [], "plain": [],
@@ -159,7 +185,7 @@ def parse_email(raw_email, policy=None):
content = decode_content(part) content = decode_content(part)
is_inline = content_disposition is None \ is_inline = content_disposition is None \
or content_disposition.startswith("inline") or content_disposition.startswith("inline")
if content_type == "text/plain" and is_inline: if content_type == "text/plain" and is_inline:
body['plain'].append(content) body['plain'].append(content)
elif content_type == "text/html" and is_inline: elif content_type == "text/html" and is_inline:

View File

@@ -8,8 +8,7 @@ logger = logging.getLogger(__name__)
def format_date(date): def format_date(date):
if isinstance(date, datetime.date): if isinstance(date, datetime.date):
return date.strftime('%d-%b-%Y') return date.strftime('%d-%b-%Y')
else: return date
return date
def build_search_query(**kwargs): def build_search_query(**kwargs):

View File

@@ -1,14 +1,16 @@
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def str_encode(value='', encoding=None, errors='strict'): def str_encode(value='', encoding=None, errors='strict'):
logger.debug("Encode str {} with and errors {}".format(value, encoding, errors)) logger.debug("Encode str {} with and errors {}".format(value, encoding, errors))
return str(value, encoding, errors) return str(value, encoding, errors)
def str_decode(value='', encoding=None, errors='strict'): def str_decode(value='', encoding=None, errors='strict'):
if isinstance(value, str): if isinstance(value, str):
return bytes(value, encoding, errors).decode('utf-8') return bytes(value, encoding, errors).decode('utf-8')
elif isinstance(value, bytes): elif isinstance(value, bytes):
return value.decode(encoding or 'utf-8', errors=errors) return value.decode(encoding or 'utf-8', errors=errors)
else: else:
raise TypeError( "Cannot decode '{}' object".format(value.__class__) ) raise TypeError("Cannot decode '{}' object".format(value.__class__))

View File

@@ -1,7 +1,9 @@
from setuptools import setup from setuptools import setup
import os import os
version = '0.9.5' import imbox
version = imbox.__version__
def read(filename): def read(filename):

View File

@@ -336,6 +336,9 @@ class TestParser(unittest.TestCase):
from_message_object = email.message_from_string("From: John Smith <johnsmith@gmail.com>") from_message_object = email.message_from_string("From: John Smith <johnsmith@gmail.com>")
self.assertEqual([{'email': 'johnsmith@gmail.com', 'name': 'John Smith'}], get_mail_addresses(from_message_object, 'from')) self.assertEqual([{'email': 'johnsmith@gmail.com', 'name': 'John Smith'}], get_mail_addresses(from_message_object, 'from'))
invalid_encoding_in_from_message_object = email.message_from_string("From: =?UTF-8?Q?C=E4cilia?= <caciliahxg827m@example.org>")
self.assertEqual([{'email': 'caciliahxg827m@example.org', 'name': 'C<EFBFBD>cilia'}], get_mail_addresses(invalid_encoding_in_from_message_object, 'from'))
def test_parse_email_with_policy(self): def test_parse_email_with_policy(self):
if not SMTP: if not SMTP:
return return