Compare commits
25 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| dfca46886f | |||
| 6d527a2f1d | |||
| 80d6104387 | |||
|
|
0464a5a74d | ||
| 5b27fa1b76 | |||
| 9cffb51a81 | |||
|
|
06aa4e054b | ||
|
|
63bddbd73c | ||
| 2838639bfa | |||
|
|
34149efed5 | ||
| 73fafcb368 | |||
| 8e26e92a39 | |||
| 635d15441e | |||
| 55f64a1922 | |||
| 71942a69e8 | |||
| b4cb03e145 | |||
| b53a1e6837 | |||
|
|
a1801af56e | ||
|
|
6a0da4b105 | ||
|
|
5d09b6f3e4 | ||
|
|
ecb62b585c | ||
|
|
cbb46ef078 | ||
|
|
79ce81aa9d | ||
|
|
96ce737df5 | ||
|
|
a94682fc3c |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -31,3 +31,6 @@ nosetests.xml
|
||||
|
||||
example.*
|
||||
example.py
|
||||
|
||||
# PyCharm
|
||||
.idea/
|
||||
|
||||
37
README.rst
37
README.rst
@@ -36,36 +36,53 @@ Usage
|
||||
ssl=True,
|
||||
ssl_context=None,
|
||||
starttls=False) as imbox:
|
||||
|
||||
# Get all folders
|
||||
status, folders_with_additional_info = imbox.folders()
|
||||
|
||||
# Gets all messages
|
||||
all_messages = imbox.messages()
|
||||
# Gets all messages from the inbox
|
||||
all_inbox_messages = imbox.messages()
|
||||
|
||||
# Unread messages
|
||||
unread_messages = imbox.messages(unread=True)
|
||||
unread_inbox_messages = imbox.messages(unread=True)
|
||||
|
||||
# Flagged messages
|
||||
inbox_flagged_messages = imbox.messages(flagged=True)
|
||||
|
||||
# Un-flagged messages
|
||||
inbox_unflagged_messages = imbox.messages(unflagged=True)
|
||||
|
||||
# Flagged messages
|
||||
flagged_messages = imbox.messages(flagged=True)
|
||||
|
||||
# Un-flagged messages
|
||||
unflagged_messages = imbox.messages(unflagged=True)
|
||||
|
||||
# Messages sent FROM
|
||||
messages_from = imbox.messages(sent_from='martin@amon.cx')
|
||||
inbox_messages_from = imbox.messages(sent_from='sender@example.org')
|
||||
|
||||
# Messages sent TO
|
||||
messages_from = imbox.messages(sent_to='martin@amon.cx')
|
||||
inbox_messages_to = imbox.messages(sent_to='receiver@example.org')
|
||||
|
||||
# Messages received before specific date
|
||||
messages_from = imbox.messages(date__lt=datetime.date(2013, 7, 31))
|
||||
inbox_messages_received_before = imbox.messages(date__lt=datetime.date(2018, 7, 31))
|
||||
|
||||
# Messages received after specific date
|
||||
messages_from = imbox.messages(date__gt=datetime.date(2013, 7, 30))
|
||||
inbox_messages_received_after = imbox.messages(date__gt=datetime.date(2018, 7, 30))
|
||||
|
||||
# Messages received on a specific date
|
||||
messages_from = imbox.messages(date__on=datetime.date(2013, 7, 30))
|
||||
inbox_messages_received_on_date = imbox.messages(date__on=datetime.date(2018, 7, 30))
|
||||
|
||||
# Messages from a specific folder
|
||||
messages_folder = imbox.messages(folder='Social')
|
||||
messages_from_folder = imbox.messages(folder='Social')
|
||||
|
||||
# Messages whose subjects contain a string
|
||||
inbox_messages_subject_christmas = imbox.messages(subject='Christmas')
|
||||
|
||||
# Messages from a specific folder
|
||||
messages_in_folder_social = imbox.messages(folder='Social')
|
||||
|
||||
for uid, message in all_messages:
|
||||
for uid, message in all_inbox_messages:
|
||||
# Every message is an object with the following keys
|
||||
|
||||
message.sent_from
|
||||
|
||||
@@ -1,93 +1,8 @@
|
||||
from imbox.imap import ImapTransport
|
||||
from imbox.parser import parse_email
|
||||
from imbox.query import build_search_query
|
||||
from imbox.imbox import Imbox
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
__version_info__ = (0, 9, 5)
|
||||
__version__ = '.'.join([str(x) for x in __version_info__])
|
||||
|
||||
|
||||
class Imbox:
|
||||
__all__ = ['Imbox']
|
||||
|
||||
def __init__(self, hostname, username=None, password=None, ssl=True,
|
||||
port=None, ssl_context=None, policy=None, starttls=False):
|
||||
|
||||
self.server = ImapTransport(hostname, ssl=ssl, port=port,
|
||||
ssl_context=ssl_context, starttls=starttls)
|
||||
self.hostname = hostname
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.parser_policy = policy
|
||||
self.connection = self.server.connect(username, password)
|
||||
logger.info("Connected to IMAP Server with user {username} on {hostname}{ssl}".format(
|
||||
hostname=hostname, username=username, ssl=(" over SSL" if ssl or starttls else "")))
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
self.logout()
|
||||
|
||||
def logout(self):
|
||||
self.connection.close()
|
||||
self.connection.logout()
|
||||
logger.info("Disconnected from IMAP Server {username}@{hostname}".format(
|
||||
hostname=self.hostname, username=self.username))
|
||||
|
||||
def query_uids(self, **kwargs):
|
||||
query = build_search_query(**kwargs)
|
||||
message, data = self.connection.uid('search', None, query)
|
||||
if data[0] is None:
|
||||
return []
|
||||
return data[0].split()
|
||||
|
||||
def fetch_by_uid(self, uid):
|
||||
message, data = self.connection.uid('fetch', uid, '(BODY.PEEK[])')
|
||||
logger.debug("Fetched message for UID {}".format(int(uid)))
|
||||
raw_email = data[0][1]
|
||||
|
||||
email_object = parse_email(raw_email, policy=self.parser_policy)
|
||||
|
||||
return email_object
|
||||
|
||||
def fetch_list(self, **kwargs):
|
||||
uid_list = self.query_uids(**kwargs)
|
||||
logger.debug("Fetch all messages for UID in {}".format(uid_list))
|
||||
|
||||
for uid in uid_list:
|
||||
yield (uid, self.fetch_by_uid(uid))
|
||||
|
||||
def mark_seen(self, uid):
|
||||
logger.info("Mark UID {} with \\Seen FLAG".format(int(uid)))
|
||||
self.connection.uid('STORE', uid, '+FLAGS', '(\\Seen)')
|
||||
|
||||
def mark_flag(self, uid):
|
||||
logger.info("Mark UID {} with \\Flagged FLAG".format(int(uid)))
|
||||
self.connection.uid('STORE', uid, '+FLAGS', '(\\Flagged)')
|
||||
|
||||
def delete(self, uid):
|
||||
logger.info("Mark UID {} with \\Deleted FLAG and expunge.".format(int(uid)))
|
||||
mov, data = self.connection.uid('STORE', uid, '+FLAGS', '(\\Deleted)')
|
||||
self.connection.expunge()
|
||||
|
||||
def copy(self, uid, destination_folder):
|
||||
logger.info("Copy UID {} to {} folder".format(int(uid), str(destination_folder)))
|
||||
return self.connection.uid('COPY', uid, destination_folder)
|
||||
|
||||
def move(self, uid, destination_folder):
|
||||
logger.info("Move UID {} to {} folder".format(int(uid), str(destination_folder)))
|
||||
if self.copy(uid, destination_folder):
|
||||
self.delete(uid)
|
||||
|
||||
def messages(self, *args, **kwargs):
|
||||
folder = kwargs.get('folder', False)
|
||||
msg = ""
|
||||
|
||||
if folder:
|
||||
self.connection.select(folder)
|
||||
msg = " from folder '{}'".format(folder)
|
||||
|
||||
logger.info("Fetch list of messages{}".format(msg))
|
||||
return self.fetch_list(**kwargs)
|
||||
|
||||
def folders(self):
|
||||
return self.connection.list()
|
||||
|
||||
@@ -10,22 +10,16 @@ class ImapTransport:
|
||||
|
||||
def __init__(self, hostname, port=None, ssl=True, ssl_context=None, starttls=False):
|
||||
self.hostname = hostname
|
||||
self.port = port
|
||||
kwargs = {}
|
||||
|
||||
if ssl:
|
||||
self.transport = IMAP4_SSL
|
||||
if not self.port:
|
||||
self.port = 993
|
||||
self.port = port or 993
|
||||
if ssl_context is None:
|
||||
ssl_context = pythonssllib.create_default_context()
|
||||
kwargs["ssl_context"] = ssl_context
|
||||
self.server = IMAP4_SSL(self.hostname, self.port, ssl_context=ssl_context)
|
||||
else:
|
||||
self.transport = IMAP4
|
||||
if not self.port:
|
||||
self.port = 143
|
||||
self.port = port or 143
|
||||
self.server = IMAP4(self.hostname, self.port)
|
||||
|
||||
self.server = self.transport(self.hostname, self.port, **kwargs)
|
||||
if starttls:
|
||||
self.server.starttls()
|
||||
logger.debug("Created IMAP4 transport for {host}:{port}"
|
||||
|
||||
13
imbox/imap.pyi
Normal file
13
imbox/imap.pyi
Normal file
@@ -0,0 +1,13 @@
|
||||
from imaplib import IMAP4, IMAP4_SSL
|
||||
from ssl import SSLContext
|
||||
from typing import Optional, Union, Tuple, List
|
||||
|
||||
|
||||
class ImapTransport:
|
||||
|
||||
def __init__(self, hostname: str, port: Optional[int], ssl: bool,
|
||||
ssl_context: Optional[SSLContext], starttls: bool) -> None: ...
|
||||
|
||||
def list_folders(self) -> Tuple[str, List[bytes]]: ...
|
||||
|
||||
def connect(self, username: str, password: str) -> Union[IMAP4, IMAP4_SSL]: ...
|
||||
72
imbox/imbox.py
Normal file
72
imbox/imbox.py
Normal file
@@ -0,0 +1,72 @@
|
||||
from imbox.imap import ImapTransport
|
||||
from imbox.messages import Messages
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Imbox:
|
||||
|
||||
def __init__(self, hostname, username=None, password=None, ssl=True,
|
||||
port=None, ssl_context=None, policy=None, starttls=False):
|
||||
|
||||
self.server = ImapTransport(hostname, ssl=ssl, port=port,
|
||||
ssl_context=ssl_context, starttls=starttls)
|
||||
self.hostname = hostname
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.parser_policy = policy
|
||||
self.connection = self.server.connect(username, password)
|
||||
logger.info("Connected to IMAP Server with user {username} on {hostname}{ssl}".format(
|
||||
hostname=hostname, username=username, ssl=(" over SSL" if ssl or starttls else "")))
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
self.logout()
|
||||
|
||||
def logout(self):
|
||||
self.connection.close()
|
||||
self.connection.logout()
|
||||
logger.info("Disconnected from IMAP Server {username}@{hostname}".format(
|
||||
hostname=self.hostname, username=self.username))
|
||||
|
||||
def mark_seen(self, uid):
|
||||
logger.info("Mark UID {} with \\Seen FLAG".format(int(uid)))
|
||||
self.connection.uid('STORE', uid, '+FLAGS', '(\\Seen)')
|
||||
|
||||
def mark_flag(self, uid):
|
||||
logger.info("Mark UID {} with \\Flagged FLAG".format(int(uid)))
|
||||
self.connection.uid('STORE', uid, '+FLAGS', '(\\Flagged)')
|
||||
|
||||
def delete(self, uid):
|
||||
logger.info("Mark UID {} with \\Deleted FLAG and expunge.".format(int(uid)))
|
||||
self.connection.expunge()
|
||||
|
||||
def copy(self, uid, destination_folder):
|
||||
logger.info("Copy UID {} to {} folder".format(int(uid), str(destination_folder)))
|
||||
return self.connection.uid('COPY', uid, destination_folder)
|
||||
|
||||
def move(self, uid, destination_folder):
|
||||
logger.info("Move UID {} to {} folder".format(int(uid), str(destination_folder)))
|
||||
if self.copy(uid, destination_folder):
|
||||
self.delete(uid)
|
||||
|
||||
def messages(self, **kwargs):
|
||||
folder = kwargs.get('folder', False)
|
||||
|
||||
if folder:
|
||||
self.connection.select(folder)
|
||||
msg = " from folder '{}'".format(folder)
|
||||
else:
|
||||
msg = " from inbox"
|
||||
|
||||
logger.info("Fetch list of messages{}".format(msg))
|
||||
return Messages(connection=self.connection,
|
||||
parser_policy=self.parser_policy,
|
||||
**kwargs)
|
||||
|
||||
def folders(self):
|
||||
return self.connection.list()
|
||||
31
imbox/imbox.pyi
Normal file
31
imbox/imbox.pyi
Normal file
@@ -0,0 +1,31 @@
|
||||
import datetime
|
||||
from email._policybase import Policy
|
||||
from inspect import Traceback
|
||||
from ssl import SSLContext
|
||||
from typing import Optional, Union, Tuple, List
|
||||
|
||||
|
||||
class Imbox:
|
||||
|
||||
def __init__(self, hostname: str, username: Optional[str], password: Optional[str], ssl: bool,
|
||||
port: Optional[int], ssl_context: Optional[SSLContext], policy: Optional[Policy], starttls: bool): ...
|
||||
|
||||
def __enter__(self) -> 'Imbox': ...
|
||||
|
||||
def __exit__(self, type: Exception, value: str, traceback: Traceback) -> None: ...
|
||||
|
||||
def logout(self) -> None: ...
|
||||
|
||||
def mark_seen(self, uid: bytes) -> None: ...
|
||||
|
||||
def mark_flag(self, uid: bytes) -> None: ...
|
||||
|
||||
def delete(self, uid: bytes) -> None: ...
|
||||
|
||||
def copy(self, uid: bytes, destination_folder: Union[bytes, str]) -> Tuple[str, Union[list, List[None, bytes]]]: ...
|
||||
|
||||
def move(self, uid: bytes, destination_folder: Union[bytes, str]) -> None: ...
|
||||
|
||||
def messages(self, **kwargs: Union[bool, str, datetime.date]) -> 'Messages': ...
|
||||
|
||||
def folders(self) -> Tuple[str, List[bytes]]: ...
|
||||
62
imbox/messages.py
Normal file
62
imbox/messages.py
Normal file
@@ -0,0 +1,62 @@
|
||||
from imbox.parser import fetch_email_by_uid
|
||||
from imbox.query import build_search_query
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Messages:
|
||||
|
||||
def __init__(self,
|
||||
connection,
|
||||
parser_policy,
|
||||
**kwargs):
|
||||
|
||||
self.connection = connection
|
||||
self.parser_policy = parser_policy
|
||||
self.kwargs = kwargs
|
||||
self._uid_list = self._query_uids(**kwargs)
|
||||
|
||||
logger.debug("Fetch all messages for UID in {}".format(self._uid_list))
|
||||
|
||||
def _fetch_email(self, uid):
|
||||
return fetch_email_by_uid(uid=uid,
|
||||
connection=self.connection,
|
||||
parser_policy=self.parser_policy)
|
||||
|
||||
def _query_uids(self, **kwargs):
|
||||
query_ = build_search_query(**kwargs)
|
||||
message, data = self.connection.uid('search', None, query_)
|
||||
if data[0] is None:
|
||||
return []
|
||||
return data[0].split()
|
||||
|
||||
def _fetch_email_list(self):
|
||||
for uid in self._uid_list:
|
||||
yield uid, self._fetch_email(uid)
|
||||
|
||||
def __repr__(self):
|
||||
if len(self.kwargs) > 0:
|
||||
return 'Messages({})'.format('\n'.join('{}={}'.format(key, value)
|
||||
for key, value in self.kwargs.items()))
|
||||
return 'Messages(ALL)'
|
||||
|
||||
def __iter__(self):
|
||||
return self._fetch_email_list()
|
||||
|
||||
def __next__(self):
|
||||
return self
|
||||
|
||||
def __len__(self):
|
||||
return len(self._uid_list)
|
||||
|
||||
def __getitem__(self, index):
|
||||
uids = self._uid_list[index]
|
||||
|
||||
if not isinstance(uids, list):
|
||||
uid = uids
|
||||
return uid, self._fetch_email(uid)
|
||||
|
||||
return [(uid, self._fetch_email(uid))
|
||||
for uid in uids]
|
||||
28
imbox/messages.pyi
Normal file
28
imbox/messages.pyi
Normal file
@@ -0,0 +1,28 @@
|
||||
import datetime
|
||||
from email._policybase import Policy
|
||||
from imaplib import IMAP4, IMAP4_SSL
|
||||
from typing import Union, List, Generator, Tuple
|
||||
|
||||
|
||||
class Messages:
|
||||
|
||||
def __init__(self,
|
||||
connection: Union[IMAP4, IMAP4_SSL],
|
||||
parser_policy: Policy,
|
||||
**kwargs: Union[bool, str, datetime.date]) -> None: ...
|
||||
|
||||
def _fetch_email(self, uid: bytes) -> 'Struct': ...
|
||||
|
||||
def _query_uids(self, **kwargs: Union[bool, str, datetime.date]) -> List[bytes]: ...
|
||||
|
||||
def _fetch_email_list(self) -> Generator[Tuple[bytes, 'Struct']]: ...
|
||||
|
||||
def __repr__(self) -> str: ...
|
||||
|
||||
def __iter__(self) -> Generator[Tuple[bytes, 'Struct']]: ...
|
||||
|
||||
def __next__(self) -> 'Messages': ...
|
||||
|
||||
def __len__(self) -> int: ...
|
||||
|
||||
def __getitem__(self, index) -> Union['Struct', List['Struct']]: ...
|
||||
@@ -1,14 +1,17 @@
|
||||
import imaplib
|
||||
import io
|
||||
import re
|
||||
import email
|
||||
import base64
|
||||
import quopri
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime
|
||||
from email.header import decode_header
|
||||
from imbox.utils import str_encode, str_decode
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -33,7 +36,10 @@ def decode_mail_header(value, default_charset='us-ascii'):
|
||||
return str_decode(str_encode(value, default_charset, 'replace'), default_charset)
|
||||
else:
|
||||
for index, (text, charset) in enumerate(headers):
|
||||
logger.debug("Mail header no. {}: {} encoding {}".format(index, str_decode(text, charset or 'utf-8'), charset))
|
||||
logger.debug("Mail header no. {index}: {data} encoding {charset}".format(
|
||||
index=index,
|
||||
data=str_decode(text, charset or 'utf-8', 'replace'),
|
||||
charset=charset))
|
||||
try:
|
||||
headers[index] = str_decode(text, charset or default_charset,
|
||||
'replace')
|
||||
@@ -54,7 +60,7 @@ def get_mail_addresses(message, header_name):
|
||||
for index, (address_name, address_email) in enumerate(addresses):
|
||||
addresses[index] = {'name': decode_mail_header(address_name),
|
||||
'email': address_email}
|
||||
logger.debug("{} Mail addressees in message: <{}> {}".format(header_name.upper(), address_name, address_email))
|
||||
logger.debug("{} Mail address in message: <{}> {}".format(header_name.upper(), address_name, address_email))
|
||||
return addresses
|
||||
|
||||
|
||||
@@ -63,7 +69,7 @@ def decode_param(param):
|
||||
values = v.split('\n')
|
||||
value_results = []
|
||||
for value in values:
|
||||
match = re.search(r'=\?((?:\w|-)+)\?(Q|B)\?(.+)\?=', value)
|
||||
match = re.search(r'=\?((?:\w|-)+)\?([QB])\?(.+)\?=', value)
|
||||
if match:
|
||||
encoding, type_, code = match.groups()
|
||||
if type_ == 'Q':
|
||||
@@ -124,6 +130,28 @@ def decode_content(message):
|
||||
return content
|
||||
|
||||
|
||||
def fetch_email_by_uid(uid, connection, parser_policy):
|
||||
message, data = connection.uid('fetch', uid, '(BODY.PEEK[] FLAGS)')
|
||||
logger.debug("Fetched message for UID {}".format(int(uid)))
|
||||
|
||||
raw_headers, raw_email = data[0]
|
||||
|
||||
email_object = parse_email(raw_email, policy=parser_policy)
|
||||
flags = parse_flags(raw_headers.decode())
|
||||
email_object.__dict__['flags'] = flags
|
||||
|
||||
return email_object
|
||||
|
||||
|
||||
def parse_flags(headers):
|
||||
"""Copied from https://github.com/girishramnani/gmail/blob/master/gmail/message.py"""
|
||||
if len(headers) == 0:
|
||||
return []
|
||||
if sys.version_info[0] == 3:
|
||||
headers = bytes(headers, "ascii")
|
||||
return list(imaplib.ParseFlags(headers))
|
||||
|
||||
|
||||
def parse_email(raw_email, policy=None):
|
||||
if isinstance(raw_email, bytes):
|
||||
raw_email = str_encode(raw_email, 'utf-8', errors='ignore')
|
||||
@@ -137,9 +165,7 @@ def parse_email(raw_email, policy=None):
|
||||
except UnicodeEncodeError:
|
||||
email_message = email.message_from_string(raw_email.encode('utf-8'), **email_parse_kwargs)
|
||||
maintype = email_message.get_content_maintype()
|
||||
parsed_email = {}
|
||||
|
||||
parsed_email['raw_email'] = raw_email
|
||||
parsed_email = {'raw_email': raw_email}
|
||||
|
||||
body = {
|
||||
"plain": [],
|
||||
@@ -159,7 +185,7 @@ def parse_email(raw_email, policy=None):
|
||||
content = decode_content(part)
|
||||
|
||||
is_inline = content_disposition is None \
|
||||
or content_disposition.startswith("inline")
|
||||
or content_disposition.startswith("inline")
|
||||
if content_type == "text/plain" and is_inline:
|
||||
body['plain'].append(content)
|
||||
elif content_type == "text/html" and is_inline:
|
||||
|
||||
30
imbox/parser.pyi
Normal file
30
imbox/parser.pyi
Normal file
@@ -0,0 +1,30 @@
|
||||
import datetime
|
||||
from email._policybase import Policy
|
||||
from email.message import Message
|
||||
from imaplib import IMAP4_SSL
|
||||
import io
|
||||
from typing import Union, Dict, List, KeysView, Tuple, Optional
|
||||
|
||||
|
||||
class Struct:
|
||||
def __init__(self, **entries: Union[
|
||||
str, datetime.datetime, Dict[str, str], list, List[Dict[str, str]]
|
||||
]) -> None: ...
|
||||
|
||||
def keys(self) -> KeysView: ...
|
||||
|
||||
def __repr__(self) -> str: ...
|
||||
|
||||
def decode_mail_header(value: str, default_charset: str) -> str: ...
|
||||
|
||||
def get_mail_addresses(message: Message, header_name: str) -> List[Dict[str, str]]: ...
|
||||
|
||||
def decode_param(param: str) -> Tuple[str, str]: ...
|
||||
|
||||
def parse_attachment(message_part: Message) -> Optional[Dict[str, Union[int, str, io.BytesIO]]]: ...
|
||||
|
||||
def decode_content(message: Message) -> str: ...
|
||||
|
||||
def fetch_email_by_uid(uid: bytes, connection: IMAP4_SSL, parser_policy: Optional[Policy]) -> Struct: ...
|
||||
|
||||
def parse_email(raw_email: bytes, policy: Optional[Policy]) -> Struct: ...
|
||||
@@ -8,8 +8,7 @@ logger = logging.getLogger(__name__)
|
||||
def format_date(date):
|
||||
if isinstance(date, datetime.date):
|
||||
return date.strftime('%d-%b-%Y')
|
||||
else:
|
||||
return date
|
||||
return date
|
||||
|
||||
|
||||
def build_search_query(**kwargs):
|
||||
|
||||
7
imbox/query.pyi
Normal file
7
imbox/query.pyi
Normal file
@@ -0,0 +1,7 @@
|
||||
import datetime
|
||||
from typing import Union
|
||||
|
||||
|
||||
def format_date(date: Union[str, datetime.date]) -> str: ...
|
||||
|
||||
def build_search_query(**kwargs: Union[bool, str, datetime.date]) -> str: ...
|
||||
@@ -1,14 +1,16 @@
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def str_encode(value='', encoding=None, errors='strict'):
|
||||
logger.debug("Encode str {} with and errors {}".format(value, encoding, errors))
|
||||
return str(value, encoding, errors)
|
||||
|
||||
|
||||
def str_decode(value='', encoding=None, errors='strict'):
|
||||
if isinstance(value, str):
|
||||
return bytes(value, encoding, errors).decode('utf-8')
|
||||
elif isinstance(value, bytes):
|
||||
return value.decode(encoding or 'utf-8', errors=errors)
|
||||
else:
|
||||
raise TypeError( "Cannot decode '{}' object".format(value.__class__) )
|
||||
raise TypeError("Cannot decode '{}' object".format(value.__class__))
|
||||
|
||||
6
imbox/utils.pyi
Normal file
6
imbox/utils.pyi
Normal file
@@ -0,0 +1,6 @@
|
||||
from typing import Optional, Union
|
||||
|
||||
|
||||
def str_encode(value: Union[str, bytes], encoding: Optional[str], errors: str) -> str: ...
|
||||
|
||||
def str_decode(value: Union[str, bytes], encoding: Optional[str], errors: str) -> Union[str, bytes]: ...
|
||||
4
setup.py
4
setup.py
@@ -1,7 +1,9 @@
|
||||
from setuptools import setup
|
||||
import os
|
||||
|
||||
version = '0.9.5'
|
||||
import imbox
|
||||
|
||||
version = imbox.__version__
|
||||
|
||||
|
||||
def read(filename):
|
||||
|
||||
@@ -336,6 +336,9 @@ class TestParser(unittest.TestCase):
|
||||
from_message_object = email.message_from_string("From: John Smith <johnsmith@gmail.com>")
|
||||
self.assertEqual([{'email': 'johnsmith@gmail.com', 'name': 'John Smith'}], get_mail_addresses(from_message_object, 'from'))
|
||||
|
||||
invalid_encoding_in_from_message_object = email.message_from_string("From: =?UTF-8?Q?C=E4cilia?= <caciliahxg827m@example.org>")
|
||||
self.assertEqual([{'email': 'caciliahxg827m@example.org', 'name': 'C<EFBFBD>cilia'}], get_mail_addresses(invalid_encoding_in_from_message_object, 'from'))
|
||||
|
||||
def test_parse_email_with_policy(self):
|
||||
if not SMTP:
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user