Source code for mailsuite.imap

import logging
from typing import Union, List, Dict
import time
import socket
from ssl import (CERT_NONE, SSLError, CertificateError, SSLContext,
                 create_default_context)

import imapclient
import imapclient.exceptions
import imaplib

import mailsuite.utils

logger = logging.getLogger(__name__)


[docs]class MaxRetriesExceeded(RuntimeError): """Raised when the maximum number of retries in exceeded"""
def _chunks(list_like_object, n: int): """Yield successive n-sized chunks from l.""" for i in range(0, len(list_like_object), n): yield list_like_object[i:i + n]
[docs]class IMAPClient(imapclient.IMAPClient): """A simplified IMAP client""" def _normalize_folder(self, folder_name: str) -> str: """ Returns an appropriate path based on the namespace (if any) and hierarchy separator Args: folder_name: The path to correct Returns: A corrected path """ if folder_name in ["", "*", "INBOX"]: return imapclient.IMAPClient._normalize_folder(self, folder_name) folder_name = folder_name.rstrip("/") folder_name = folder_name.replace(self._path_prefix, "") if not self._hierarchy_separator == "/": folder_name = folder_name.replace(self._hierarchy_separator, "") folder_name = folder_name.replace("/", self._hierarchy_separator) folder_name = "{0}{1}".format(self._path_prefix, folder_name) return imapclient.IMAPClient._normalize_folder(self, folder_name) def _start_idle(self, idle_callback, idle_timeout: int = 30): """ Starts an IMAP IDLE session Args: idle_callback: A callback function idle_timeout: Number of seconds to wait for an IDLE response """ if self._idle_supported is False: raise imapclient.exceptions.IMAPClientError( "IDLE is not supported by the server") idle_callback(self) idle_start_time = time.monotonic() self.idle() while True: try: # Refresh the IDLE session every 5 minutes to stay connected if time.monotonic() - idle_start_time > 5 * 60: logger.info("IMAP: Refreshing IDLE session") self.idle_done() idle_start_time = time.monotonic() self.idle(self) responses = self.idle_check(timeout=idle_timeout) if responses is not None: if len(responses) == 0: # Gmail/G-Suite returns an empty list self.idle_done() idle_callback(self) idle_start_time = time.monotonic() self.idle() else: for r in responses: if r[0] != 0 and r[1] == b'RECENT': self.idle_done() idle_callback(self) idle_start_time = time.monotonic() self.idle() break except (KeyError, socket.error, BrokenPipeError, ConnectionResetError): logger.debug("IMAP error: Connection reset") self.reset_connection() except imapclient.exceptions.IMAPClientError as error: error = error.__str__().lstrip("b'").rstrip("'").rstrip(".") # Workaround for random Exchange/Microsoft 365 IMAP errors if "unexpected response" in error or "BAD" in error: self.reset_connection() except KeyboardInterrupt: break try: self.idle_done() except BrokenPipeError: pass def __init__(self, host: str, username: str, password: str, port: int = None, ssl: bool = True, ssl_context: SSLContext = None, verify: bool = True, timeout: int = 30, max_retries: int = 4, initial_folder: str = "INBOX", idle_callback=None, idle_timeout: int = 30): """ Connects to an IMAP server Args: host: The server hostname or IP address username: The username password: The password port: The port ssl: Use SSL or TLS ssl_context: For more advanced TLS options verify: Verify the SSL/TLS certificate timeout: Number of seconds to wait for an operation max_retries: The maximum number of retries after a timeout initial_folder: The initial folder to select idle_callback: The function to call when new messages are detected idle_timeout: Number of seconds to wait for an IDLE response """ if ssl_context is None: ssl_context = create_default_context() if verify is False: ssl_context.check_hostname = False ssl_context.verify_mode = CERT_NONE self._init_args = dict(host=host, username=username, password=password, port=port, ssl=ssl, ssl_context=ssl_context, verify=verify, timeout=timeout, max_retries=max_retries, initial_folder=initial_folder, idle_callback=idle_callback, idle_timeout=idle_timeout) self.max_retries = max_retries self.idle_callback = idle_callback self.idle_timeout = idle_timeout self._path_prefix = "" self._hierarchy_separator = "" if not ssl: logger.info("Connecting to IMAP over plain text") imapclient.IMAPClient.__init__(self, host=host, port=port, ssl=ssl, ssl_context=ssl_context, use_uid=True, timeout=timeout) try: if not ssl and b"STARTTLS" in self.capabilities(): logger.info("IMAP server supports STARTTLS ... activating now") self.starttls(ssl_context=ssl_context) self.login(username, password) self.server_capabilities = self.capabilities() self._move_supported = b"MOVE" in self.server_capabilities self._idle_supported = b"IDLE" in self.server_capabilities self._namespace = b"NAMESPACE" in self.server_capabilities self._hierarchy_separator = self.list_folders()[0][1] if not self._hierarchy_separator: self._hierarchy_separator = "" if type(self._hierarchy_separator) is bytes: self._hierarchy_separator = bytes( self._hierarchy_separator).decode("utf-8") if self._namespace: self._namespace = self.namespace() personal_namespace = self._namespace.personal if len(personal_namespace) > 0: self._hierarchy_separator = personal_namespace[0][1] if not personal_namespace[0][0] == "": self._path_prefix = personal_namespace[0][0] if type(self._path_prefix) is bytes: self._path_prefix = self._path_prefix.decode( "utf-8") else: self._namespace = None self.select_folder(initial_folder) except (ConnectionResetError, socket.error, TimeoutError, imapclient.exceptions.IMAPClientError) as error: error = error.__str__().lstrip("b'").rstrip("'").rstrip( ".") raise imapclient.exceptions.IMAPClientError(error) except ConnectionAbortedError: raise imapclient.exceptions.IMAPClientError("Connection aborted") except TimeoutError: raise imapclient.exceptions.IMAPClientError("Connection timed out") except SSLError as error: raise imapclient.exceptions.IMAPClientError( "SSL error: {0}".format(error.__str__())) except CertificateError as error: raise imapclient.exceptions.IMAPClientError( "Certificate error: {0}".format(error.__str__())) except BrokenPipeError: raise imapclient.exceptions.IMAPClientError("Broken pipe") if idle_callback is not None: self._start_idle(idle_callback, idle_timeout=idle_timeout)
[docs] def reset_connection(self): """Resets the connection to the IMAP server""" logger.info("Reconnecting to IMAP") try: self.shutdown() except Exception as e: logger.info( "Failed to log out: {0}".format(e.__str__())) self.__init__(self._init_args["host"], self._init_args["username"], self._init_args["password"], port=self._init_args["port"], ssl=self._init_args["ssl"], ssl_context=self._init_args["ssl_context"], verify=self._init_args["verify"], timeout=self._init_args["timeout"], max_retries=self._init_args["max_retries"], initial_folder=self._init_args["initial_folder"], idle_callback=self._init_args["idle_callback"], idle_timeout=self._init_args["idle_timeout"], )
[docs] def fetch_message(self, msg_uid: int, parse: bool = False, _attempt: int = 1) -> Union[str, Dict]: """ Fetch a message by UID, and optionally parse it Args: msg_uid: The message UID parse: Return parsed results from mailparser _attempt: The attempt number Returns: str: The raw mail message, including headers dict: A parsed email message """ try: raw_msg = self.fetch(msg_uid, ["RFC822"])[msg_uid] except (socket.timeout, imaplib.IMAP4.abort): _attempt = _attempt + 1 if _attempt > self.max_retries: raise MaxRetriesExceeded("Maximum retries exceeded") logger.info("Attempt {0} of {1} timed out. Retrying...".format( _attempt, self.max_retries)) self.reset_connection() return self.fetch_message(msg_uid, parse=parse, _attempt=_attempt) msg_keys = [b'RFC822', b'BODY[NULL]', b'BODY[]'] msg_key = '' for key in msg_keys: if key in raw_msg.keys(): msg_key = key break message = raw_msg[msg_key].decode("utf-8", "replace") if parse: message = mailsuite.utils.parse_email(message) return message
[docs] def delete_messages(self, msg_uids: Union[List[int], List[str], str, int], silent: bool = True, _attempt: int = 1): """ Deletes the given messages by Message UIDs Args: msg_uids: A list of UIDs of messages to delete silent: Do it silently _attempt: The attempt number """ logger.info("Deleting message UID(s) {0}".format(",".join( str(uid) for uid in msg_uids))) if type(msg_uids) is str or type(msg_uids) is int: msg_uids = [int(msg_uids)] try: imapclient.IMAPClient.delete_messages(self, msg_uids, silent=silent) imapclient.IMAPClient.expunge(self, msg_uids) except (socket.timeout, imaplib.IMAP4.abort): _attempt = _attempt + 1 if _attempt > self.max_retries: raise MaxRetriesExceeded("Maximum retries exceeded") logger.info("Attempt {0} of {1} timed out. Retrying...".format( _attempt, self.max_retries)) self.reset_connection() self.delete_messages(msg_uids, silent=silent, _attempt=_attempt)
[docs] def create_folder(self, folder_path: str, _attempt: int = 1): """ Creates an IMAP folder at the given path Args: folder_path: The path of the folder to create _attempt: The attempt number """ if not self.folder_exists(folder_path): logger.info("Creating folder: {0}".format(folder_path)) try: imapclient.IMAPClient.create_folder(self, folder_path) except (socket.timeout, imaplib.IMAP4.abort): _attempt = _attempt + 1 if _attempt > self.max_retries: raise MaxRetriesExceeded("Maximum retries exceeded") logger.info("Attempt {0} of {1} timed out. Retrying...".format( _attempt, self.max_retries)) self.reset_connection() self.create_folder(folder_path, _attempt=_attempt)
def _move_messages(self, msg_uids: Union[int, List[int]], folder_path: str): """ Move the emails with the given UIDs to the given folder Args: msg_uids: A UID or list of UIDs of messages to move folder_path: The path of the destination folder """ folder_path = folder_path.replace("\\", "/").rstrip("/") if type(msg_uids) is str or type(msg_uids) is int: msg_uids = [int(msg_uids)] for chunk in _chunks(msg_uids, 100): if self._move_supported: logger.info("Moving message UID(s) {0} to {1}".format( ",".join(str(uid) for uid in chunk), folder_path )) try: self.move(chunk, folder_path) except imapclient.exceptions.IMAPClientError as e: e = e.__str__().lstrip("b'").rstrip( "'").rstrip(".") message = "Error moving message UIDs" e = "{0} {1}: " "{2}".format(message, msg_uids, e) logger.info("IMAP error: {0}".format(e)) logger.info( "Copying message UID(s) {0} to {1} by copy".format( ",".join(str(uid) for uid in chunk), folder_path )) self.copy(msg_uids, folder_path) self.delete_messages(msg_uids) else: logger.info("Moving message UID(s) {0} to {1} by copy".format( ",".join(str(uid) for uid in chunk), folder_path )) self.copy(msg_uids, folder_path) self.delete_messages(msg_uids)
[docs] def move_messages(self, msg_uids: Union[int, List[int]], folder_path: str, _attempt: int = 1): """ Move the emails with the given UIDs to the given folder Args: msg_uids: A UID or list of UIDs of messages to move folder_path: The path of the destination folder _attempt: The attempt number """ try: self._move_messages(msg_uids, folder_path) except (socket.timeout, imaplib.IMAP4.abort): _attempt = _attempt + 1 if _attempt > self.max_retries: raise MaxRetriesExceeded("Maximum retries exceeded") logger.info("Attempt {0} of {1} timed out. Retrying...".format( _attempt, self.max_retries)) self.reset_connection() self._move_messages(msg_uids, folder_path)