Source code for pywhoisxmlapi

# -*- coding: utf-8 -*-

"""An unofficial client for WhoisXMLAPI"""

import os
import time
import logging
from datetime import datetime
from io import StringIO
from csv import DictWriter

from requests import session

"""Copyright 2018 Sean Whalen

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""

__version__ = "2.1.7"

logging.basicConfig(
        format='%(asctime)s [%(levelname)s] %(message)s'
    )


def _epoch_to_datetime(epoch_seconds):
    return datetime.fromtimestamp(int(epoch_seconds))


def _datetime_to_string(dt):
    return dt.strftime('%Y-%m-%d %H:%M:%S.%f')


[docs]class WhoisXMLAPIError(RuntimeError): """ Raised when an error is returned by WhoisXMLAPI """ pass
[docs]def flatten_whois(whois_record): """ Flattens a whois record result Args: whois_record (dict): A WHOIS record Returns: dict: A flattened WHOIS result """ flat_whois = dict() flat_whois["domainName"] = whois_record["domainName"] if "createdDate" in whois_record: flat_whois["createdDate"] = whois_record["createdDate"] if "updatedDate" in whois_record: flat_whois["updatedDate"] = whois_record["updatedDate"] if "expiresDate" in whois_record: flat_whois["expiresDate"] = whois_record["expiresDate"] if "registrant" in whois_record: for key in whois_record["registrant"]: flat_whois["registrant_{0}".format(key)] = whois_record["registrant"][key] del flat_whois["registrant_rawText"] if "administrativeContact" in whois_record: for key in whois_record["administrativeContact"]: flat_whois["administrativeContact_{0}".format(key)] = whois_record["administrativeContact"][key] del flat_whois["administrativeContact_rawText"] if "technicalContact" in whois_record: for key in whois_record["technicalContact"]: flat_whois["technicalContact_{0}".format(key)] = whois_record["technicalContact"][key] del flat_whois["technicalContact_rawText"] if "nameServers" in whois_record: flat_whois["nameServers"] = "|".join(whois_record["nameServers"]["hostNames"]) if "status" in whois_record: flat_whois["status"] = whois_record["status"] if "registrarName" in whois_record: flat_whois["registrarName"] = whois_record["registrarName"] if "estimatedDomainAge" in whois_record: flat_whois["estimatedDomainAge"] = whois_record["estimatedDomainAge"] if "ips" in whois_record: flat_whois["ips"] = "|".join(whois_record["ips"]) return flat_whois
[docs]def whois_to_csv(whois_results): """ Returns CSV content given a list of WHOIS results Args: whois_results (list): A list of WHOIS results Returns: str: results as a CSV string """ fields = ['domainName', 'createdDate', 'updatedDate', 'expiresDate', 'registrant_name', 'registrant_organization', 'registrant_street1', 'registrant_street2', 'registrant_city', 'registrant_state', 'registrant_postalCode', 'registrant_country', 'registrant_countryCode', 'registrant_email', 'registrant_telephone', 'registrant_fax', 'administrativeContact_name', 'administrativeContact_organization', 'administrativeContact_street1', 'administrativeContact_street2', 'administrativeContact_city', 'administrativeContact_state', 'administrativeContact_postalCode', 'administrativeContact_country', 'administrativeContact_countryCode', 'administrativeContact_email', 'administrativeContact_telephone', 'administrativeContact_fax', 'technicalContact_name', 'technicalContact_organization', 'technicalContact_street1', 'technicalContact_street2', 'technicalContact_city', 'technicalContact_state', 'technicalContact_postalCode', 'technicalContact_country', 'technicalContact_countryCode', 'technicalContact_email', 'technicalContact_telephone', 'technicalContact_fax', 'nameServers', 'status', 'registrarName', 'estimatedDomainAge', 'ips'] output_file = StringIO(newline="\n") writer = DictWriter(output_file, fieldnames=fields) writer.writeheader() whois_results = list(map(lambda x: flatten_whois(x), whois_results)) writer.writerows(whois_results) output_file.seek(0) return output_file.read()
[docs]class WhoisXMLAPI(object): """ A Python interface to WhoisXMLAPI .. note:: ``api_key`` can be overridden by the ``WHOIS_KEY`` environment variable .. """ _root = "https://www.whoisxmlapi.com" def __init__(self, api_key=None): """ Configures the API client Args: api_key (str): WhoisXMLAPI API key; overridden by the ``WHOIS_KEY`` environment variable """ if "WHOIS_KEY" in os.environ: api_key = os.environ["WHOIS_KEY"] if api_key is None: raise ValueError( "API key must provided as the api_key parameter, or the " "WHOIS_KEY environment variable") self._api_key = api_key self._session = session() self._session.headers.update( {"User-Agent": "pywhoisxmlapi/{0}".format(__version__)}) def _request(self, endpoint, params=None, post=False, parse_json=True): """ Makes an API request Args: endpoint: The API endpoint to request params: The parameters to pass post (bool): If True make a ``POST`` request instead of ``GET`` Returns: dict: The API call results """ if params is None: params = dict() params["outputFormat"] = "json" params["format"] = "JSON" params["apiKey"] = self._api_key if endpoint.lower().startswith("http"): endpoint = endpoint else: endpoint = "{0}/{1}".format(WhoisXMLAPI._root, endpoint) endpoint.lstrip("/") logging.debug("Params: {0}".format(params)) if post: response = self._session.post(endpoint, json=params) else: response = self._session.get(endpoint, params=params) logging.debug("Response {0}".format(response.content)) response.raise_for_status() if "callback" in params.keys() and params["callback"]: parse_json = False if parse_json: response = response.json() if "ErrorMessage" in response: error = response["ErrorMessage"]["msg"] raise WhoisXMLAPIError(error) else: response = response.text return response def _common_api_call(self, endpoint, params, cost, results_key, since_date=None, created_date_from=None, created_date_to=None, updated_date_from=None, updated_date_to=None, expired_date_from=None, expired_date_to=None): if since_date: params["sinceDate"] = since_date if created_date_from: params["createdDateFrom"] = created_date_from if created_date_to: params["createdDateTo"] = created_date_to if updated_date_from: params["updatedDateFrom"] = updated_date_from if updated_date_to: params["updatedDateTo"] = updated_date_to if expired_date_from: params["expiredDateFrom"] = expired_date_from if expired_date_to: params["expiredDateTo"] = expired_date_to results = self._request(endpoint, params, post=True) if params["mode"] == "preview": results["DRSCreditCost"] = cost elif "records" in results: results = results[results_key] return results def _transform_api_call(self, transform, endpoint, params): results = [] response = self._request(endpoint, params=params) _results = response["result"] results += _results while len(_results) >= 300: params["from"] = results[-1]["name"] response = self._request(endpoint, params=params) _results = response["result"] results += _results results = list(map(transform, results)) return results
[docs] def get_account_balances(self): """ Get all account balances Returns: Dict: Account balances """ endpoint = "/accountServices.php" params = dict(servicetype="accountbalance", output_format="JSON") return self._request(endpoint, params)
[docs] def set_account_balance_warnings(self, warn_threshold=10, warn_threshold_enabled=True, warn_empty_enabled=True): """ Sets account settings for balance warnings Args: warn_threshold: The value at which account balance warnings are sent warn_threshold_enabled (bool): Enable low balance warnings warn_empty_enabled (bool): Enable empty balance warnings Returns: dict: Acknowledgement """ endpoint = "/accountServices.php" if warn_threshold_enabled: warn_empty_enabled = 1 else: warn_threshold_enabled = 0 if warn_empty_enabled: warn_empty_enabled = 1 else: warn_empty_enabled = 0 params = dict(servicetype="accountUpdate", output_format="JSON", warn_threshold=warn_threshold, warn_threshold_enabled=warn_threshold_enabled, warn_empty_enabled=warn_empty_enabled) return self._request(endpoint, params)
[docs] def whois(self, domain_name, prefer_fresh=False, da=0, ip=True, check_proxy_data=True, thin_whois=False, callback=None, parse=False, registry_raw_text=None, registrar_raw_text=None): """ Returns parsed WHOIS data Args: domain_name (str): A domain name or IP address prefer_fresh (bool): Get the latest WHOIS record even if it's incomplete da (int): 0 - Do not check for domain amiability; 1 - quick check; 2 - slower, but most accurate ip (bool): Resolve IP addresses check_proxy_data (bool): Check if registration matches known proxies thin_whois (bool): Only return data on the registrant, not the registrar callback (str): JSONP callback parse (bool): Parse the provided raw WHOIS data registry_raw_text (str): Optional raw WHOIS registration data to parse, rather than fetching it via the API registrar_raw_text (str): Optional raw WHOIS registration data to parse, rather than fetching it via the API Returns: dict: Parsed WHOIS data str: WHOIS data wrapped in JSONP if ``callback`` was specified """ endpoint = "/whoisserver/WhoisService" params = dict(domainName=domain_name, da=da) if prefer_fresh: params["preferFresh"] = 1 if da: params["da"] = 1 if ip: params["ip"] = 1 if check_proxy_data: params["checkProxyData"] = 1 if thin_whois: params["thinWhois"] = 1 if callback: params["callback"] = callback if parse: params["_parse"] = 1 if registry_raw_text: params["registryRawText"] = registry_raw_text if registrar_raw_text: params["RegistrarRawText"] = registrar_raw_text response = self._request(endpoint, params) if type(response) == dict: response = response["WhoisRecord"] return response
[docs] def bulk_whois(self, domains): """ Retrieves WHOIS data for multiple domains, in bulk. Args: domains (list): A list of domains to get WHOIS records for Returns: dict: A dictionary with keys: - ``structured`` - parsed data in a dictionary - ``csv``: results in CSV format """ if type(domains) != list: raise ValueError("domains must be a list") endpoint = "/BulkWhoisLookup/bulkServices/bulkWhois" params = dict(domains=domains) response = self._request(endpoint, params, post=True) del params["domains"] params["requestId"] = response["requestId"] params["searchType"] = "all" params["maxRecords"] = len(domains) params["startIndex"] = 1 endpoint = "/BulkWhoisLookup/bulkServices/getRecords" records_left = len(domains) while records_left > 0: time.sleep(15) response = self._request(endpoint, params, post=True) records_left = response["recordsLeft"] structured = response["whoisRecords"] for result in structured: result["domainFetchedTime"] = _epoch_to_datetime( float(result["domainFetchedTime"]) / 1000) endpoint = "https://www.whoisxmlapi.com/BulkWhoisLookup/" \ "bulkServices/download" csv = "" time.sleep(15) response = self._session.post(endpoint, json=params).text for line in response.split("\n"): # remove blank lines if line != '': csv += line return dict(structured=structured, csv=csv)
[docs] def reverse_whois(self, terms, exclude_terms=None, search_type="current", mode="preview", created_date_to=None, created_date_from=None, updated_date_to=None, updated_date_from=None, expired_date_to=None, expired_date_from=None): """ Conducts a reverse WHOIS search Args: terms (list): Terms to search for exclude_terms (list): Terms to filter by search_type (str): ``current`` or ``historic`` mode (str): ``preview`` or ``purchase`` created_date_to (str): Search through domains created before the given date (``YYYY-MM-DD`` format) created_date_from (str): Search through domains created after the given date (``YYYY-MM-DD`` format) updated_date_to (str): Search through domains created before the given date (``YYYY-MM-DD`` format) updated_date_from (str): Search through domains created after the given date (``YYYY-MM-DD`` format) expired_date_to (str): Search through domains created before the given date (``YYYY-MM-DD`` format) expired_date_from (str): Search through domains created after the given date (``YYYY-MM-DD`` format) Returns: dict: A dictionary of preview data list: A list of results """ endpoint = "https://reverse-whois-api.whoisxmlapi.com/api/v2" params = dict(mode=mode.lower()) if created_date_to: params["createdDateTo"] = created_date_to if created_date_from: params["createdDateFrom"] = created_date_from if updated_date_from: params["updatedDateFrom"] = updated_date_from if updated_date_to: params["updatedDateTo"] = updated_date_to if expired_date_from: params["expiredDateFrom"] = expired_date_from if expired_date_to: params["expiredDateTo"] = expired_date_to if exclude_terms is None: exclude_terms = [] search_type = search_type.lower() mode = mode.lower() if len(terms) > 4: raise ValueError("Number of terms cannot be greater than 4") if len(exclude_terms) > 4: raise ValueError( "Number of excluded terms cannot be greater than 4") if search_type not in ["current", "historic"]: raise ValueError("Search type must be current or historic") if mode not in ["preview", "purchase"]: raise ValueError("mode must be preview or purchase") params["searchType"] = search_type params["basicSearchTerms"] = dict(include=terms, exclude=exclude_terms) results = self._request(endpoint, params, post=True) drs_credit_cost = 1 if mode == "preview": results["DRSCreditCost"] = drs_credit_cost elif "domainslist" in results: results = results["domainsList"] return results
[docs] def whois_history(self, domain, since_date=None, created_date_from=None, created_date_to=None, updated_date_from=None, updated_date_to=None, expired_date_from=None, expired_date_to=None, mode="preview"): """ Returns WHOIS history for a given domains Args: domain (str): The domain since_date (str): Only return domains created or deleted since this date, (``YYYY-MM-DD`` format) created_date_to (str): Search through domains created before the given date (``YYYY-MM-DD`` format) created_date_from (str): Search through domains created after the given date (``YYYY-MM-DD`` format) updated_date_to (str): Search through domains created before the given date (``YYYY-MM-DD`` format) updated_date_from (str): Search through domains created after the given date (``YYYY-MM-DD`` format) expired_date_to (str): Search through domains created before the given date (``YYYY-MM-DD`` format) expired_date_from (str): Search through domains created after the given date (``YYYY-MM-DD`` format) mode (str): ``preview`` or ``purchase`` Returns: dict: A dictionary of preview data list: A list of parsed historic WHOIS records, starting with the current record """ endpoint = "https://whois-history-api.whoisxmlapi.com/api/v1" mode = mode.lower() params = dict(domainName=domain, mode=mode) results = self._common_api_call(endpoint, params, 50, "records", since_date=since_date, created_date_from=created_date_from, created_date_to=created_date_to, updated_date_from=updated_date_from, updated_date_to=updated_date_to, expired_date_from=expired_date_from, expired_date_to=expired_date_to) return results
[docs] def brand_alert(self, terms, exclude_terms=None, since_date=None, created_date_from=None, created_date_to=None, updated_date_from=None, updated_date_to=None, expired_date_from=None, expired_date_to=None, mode="preview"): """ Lists newly created or deleted domains based on brand terms Args: terms (list): Brand terms to include in the search (max 4) exclude_terms (list): Terms to exclude (max 4) since_date (str): Only return domains created or deleted since this date, (``YYYY-MM-DD`` format) created_date_to (str): Search through domains created before the given date (``YYYY-MM-DD`` format) created_date_from (str): Search through domains created after the given date (``YYYY-MM-DD`` format) updated_date_to (str): Search through domains created before the given date (``YYYY-MM-DD`` format) updated_date_from (str): Search through domains created after the given date (``YYYY-MM-DD`` format) expired_date_to (str): Search through domains created before the given date (``YYYY-MM-DD`` format) expired_date_from (str): Search through domains created after the given date (``YYYY-MM-DD`` format) mode (str): ``preview`` or ``purchase`` Returns: dict: A dictionary of preview data list: A list of dictionaries containing the ``domainName``, and its ``action`` (i.e. ``added`` or ``dropped``) """ endpoint = "https://brand-alert-api.whoisxmlapi.com/api/v2" params = dict(apiKey=self._api_key, includeSearchTerms=terms, mode=mode) if exclude_terms is None: exclude_terms = [] if len(terms) > 4: raise ValueError("Number of terms cannot be greater than 4") if len(exclude_terms) > 4: raise ValueError( "Number of excluded terms cannot be greater than 4") mode = mode.lower() if mode not in ["preview", "purchase"]: raise ValueError("mode must be preview or purchase") params["excludeTerms"] = exclude_terms results = self._common_api_call(endpoint, params, 10, "domainsList", since_date=since_date, created_date_from=created_date_from, created_date_to=created_date_to, updated_date_from=updated_date_from, updated_date_to=updated_date_to, expired_date_from=expired_date_from, expired_date_to=expired_date_to) return results
[docs] def registrant_alert(self, terms, exclude_terms=None, since_date=None, created_date_from=None, created_date_to=None, updated_date_from=None, updated_date_to=None, expired_date_from=None, expired_date_to=None, mode="preview"): """ Lists newly created or deleted domains based on registrant Args: terms (list): Brand terms to include in the search (max 4) exclude_terms (list): Terms to exclude (max 4) since_date (str): Only return domains created or deleted since this date, (``YYYY-MM-DD`` format) created_date_to (str): Search through domains created before the given date (``YYYY-MM-DD`` format) created_date_from (str): Search through domains created after the given date (``YYYY-MM-DD`` format) updated_date_to (str): Search through domains created before the given date (``YYYY-MM-DD`` format) updated_date_from (str): Search through domains created after the given date (``YYYY-MM-DD`` format) expired_date_to (str): Search through domains created before the given date (``YYYY-MM-DD`` format) expired_date_from (str): Search through domains created after the given date (``YYYY-MM-DD`` format) mode (str): ``preview`` or ``purchase`` Returns: dict: A dictionary of preview data list: A list of dictionaries containing the ``domainName``, and its ``action`` (i.e. ``added`` or ``dropped``) """ endpoint = "https://registrant-alert-api.whoisxmlapi.com/api/v2" if exclude_terms is None: exclude_terms = [] if len(terms) > 4: raise ValueError("Number of terms cannot be greater than 4") if len(exclude_terms) > 4: raise ValueError( "Number of excluded terms cannot be greater than 4") mode = mode.lower() params = dict(mode=mode) if mode not in ["preview", "purchase"]: raise ValueError("mode must be preview or purchase") params["basicSearchTerms"] = dict(include=terms, exclude=exclude_terms) results = self._common_api_call(endpoint, params, 10, "domainsList", since_date=since_date, created_date_from=created_date_from, created_date_to=created_date_to, updated_date_from=updated_date_from, updated_date_to=updated_date_to, expired_date_from=expired_date_from, expired_date_to=expired_date_to) return results
[docs] def dns_lookup(self, domain_name, record_type="_all", callback=None): """ Preforms a DNS lookup Args: domain_name (str): The domain name or sub-domain to lookup record_type (str): The DNS resource record type to query for, or ``_all`` for all record types callback (str): A JSONP callback Returns: dict: Lookup results str: lookup results data wrapped in JSONP if ``callback`` was specified """ endpoint = "/whoisserver/DNSService" params = dict(domainName=domain_name, recordType=record_type, callback=callback) return self._request(endpoint, params)
[docs] def domain_availability(self, domain_name, mode="DNS_ONLY"): """ Check domain name availability Args: domain_name (str): The domain name to check mode (str): ``DNS_ONLY`` or ``DNS_AND_WHOIS`` Returns: bool: The availability of the domain name """ endpoint = "/whoisserver/WhoisService" params = dict(cmd="GET_DN_AVAILABILITY", getMode=mode.upper(), domainName=domain_name) if mode.upper() not in ["DNS_ONLY", "DNS_AND_WHOIS"]: raise ValueError("Mode must be DNS_ONLY or DNS_AND_WHOIS") results = self._request(endpoint, params) return results["domainAvailability"] == "AVAILABLE"
[docs] def verify_email(self, email_address): """ Returns metadata about an email address Args: email_address (str): The email address to verify Returns: dict: email verification results """ endpoint = "https://emailverification.whoisxmlapi.com/api/v1" params = dict(emailAddress=email_address) response = self._request(endpoint, params=params) return response
[docs] def reverse_mx(self, mx): """ Performs a reverse MX query Args: mx (str): A MX hostname Returns: list: A list of results """ def transform(result): result["first_seen"] = _epoch_to_datetime( result["first_seen"]) result["last_visit"] = _epoch_to_datetime( result["last_visit"]) return result endpoint = "https://reverse-mx-api.whoisxmlapi.com/api/v1" params = dict(mx=mx) return self._transform_api_call(transform, endpoint, params)
[docs] def reverse_ns(self, ns): """ Performs a reverse MX query Args: ns (str): A nameserver hostname Returns: list: A list of results """ def transform(result): result["first_seen"] = _epoch_to_datetime( result["first_seen"]) result["last_visit"] = _epoch_to_datetime( result["last_visit"]) return result endpoint = "https://reverse-ns-api.whoisxmlapi.com/api/v1" params = dict(ns=ns) return self._transform_api_call(transform, endpoint, params)
[docs] def reverse_ip(self, ip): """ Performs a reverse IP/DNS query Args: ip (str): An IPv4 or IPv6 address Returns: list: A list of results """ def transform(result): result["first_seen"] = _epoch_to_datetime( result["first_seen"]) result["last_visit"] = _epoch_to_datetime( result["last_visit"]) return result endpoint = "https://reverse-ip-api.whoisxmlapi.com/api/v1" params = dict(ip=ip) return self._transform_api_call(transform, endpoint, params)
[docs] def domain_reputation(self, domain, mode="fast"): """ Checks a domain's reputation Args: domain (str): The domain to check mode (str): ``fast`` - some heavy tests and data collectors will be disabled (1 credit) or ``full`` - all the data and the tests will be processed (3 credits) Returns: float: A number ranging from 0.0 being most malicious to 100.0 being most safe """ endpoint = "https://domain-reputation-api.whoisxmlapi.com/api/v1" mode = mode.lower() if mode not in ["fast", "full"]: raise ValueError("Mode must be fast or full") params = dict(domainName=domain, mode=mode) return self._request(endpoint, params)["reputationScore"]
[docs] def ip_geolocation(self, ip): """ Returns geolocation information about an IP address Args: ip (str): An IPv4 or IPv6 address Returns: dict: Geolocation information """ endpoint = "https://geoipify.whoisxmlapi.com/api/v1" params = dict(ipAddress=ip) return self._request(endpoint, params)["location"]
[docs] def netblocks(self, ip=None, mask=None, org=None, limit=10000): """ Returns netblock information about a given IP or organisation .. note:: You must specify ``ip`` or ``org``, but not both Args: ip (str): An IP address mask (int): A subnet bask integer org (list): A list os organisation strings limit (int): Max number of records to return (1-10000) Returns: list: A list of netblocks """ if (not ip and not org) or (ip and org): raise ValueError("ip or org must be specified") params = dict(ip=ip, mask=mask, org=org, limit=limit) endpoint = "https://ip-netblocks-api.whoisxmlapi.com/api/v1" return self._request(endpoint, params)