Source code for pyepp.epp

"""
EPP Communicator Module
"""

import ssl
import socket
import struct
import logging
import sys
from dataclasses import dataclass, asdict
from enum import Enum
from typing import Optional, Any

from bs4 import BeautifulSoup

from pyepp.command_templates import LOGOUT_XML, LOGIN_XML, HELLO_XML, template_engine

LENGTH_FIELD_SIZE = 4
CRLF_SIZE = 2


[docs] class EppCommunicatorException(Exception): """ EPP communicator exception. """
[docs] class EppResultCode(Enum): """ EPP result codes enumeration. """ SUCCESS = 1000 SUCCESS_ACTION_PENDING = 1001 SUCCESS_NO_MESSAGE = 1300 SUCCESS_ACK_TO_DEQUEUE = 1301 SUCCESS_END_SESSION = 1500 UNKNOWN_COMMAND = 2000 COMMAND_SYNTAX_ERROR = 2001 COMMAND_USE_ERROR = 2002 REQUIRED_PARAMETER_MISSING = 2003 PARAMETER_RANGE_ERROR = 2004 PARAMETER_SYNTAX_ERROR = 2005 UNIMPLEMENTED_PROTOCOL_VERSION = 2100 UNIMPLEMENTED_COMMAND = 2101 UNIMPLEMENTED_OPTION = 2102 UNIMPLEMENTED_EXTENSION = 2103 BILLING_FAILURE = 2104 OBJECT_NOT_ELIGIBLE_FOR_RENEWAL = 2105 OBJECT_NOT_ELIGIBLE_FOR_TRANSFER = 2106 AUTHENTICATION_ERROR = 2200 AUTHORIZATION_ERROR = 2201 INVALID_AUTHORIZATION_INFORMATION = 2202 OBJECT_PENDING_TRANSFER = 2300 OBJECT_NOT_PENDING_TRANSFER = 2301 OBJECT_EXISTS = 2302 OBJECT_DOES_NOT_EXIST = 2303 OBJECT_STATUS_PROHIBITS_OPERATION = 2304 OBJECT_ASSOCIATION_PROHIBITS_OPERATION = 2305 PARAMETER_VALUE_POLICY_ERROR = 2306 UNIMPLEMENTED_OBJECT_SERVICE = 2307 DATA_MANAGEMENT_POLICY_VIOLATION = 2308 COMMAND_FAILED = 2400 COMMAND_FAILED_CLOSING_CONNECTION = 2500 AUTHENTICATION_ERROR_CLOSING_CONNECTION = 2501 SESSION_LIMIT_EXCEEDED_CLOSING_CONNECTION = 2502
[docs] def get_format_32() -> str: """ Get the size of C integers. We need 32 bits unsigned. From http://www.bortzmeyer.org/4934.html """ return ">I"
# pylint: disable=too-many-instance-attributes
[docs] @dataclass class EppResultData: """Epp result data structure.""" code: int message: str raw_response: str result_data: Any reason: Optional[str] = None client_transaction_id: Optional[str] = None server_transaction_id: Optional[str] = None repository_object_id: Optional[str] = None def __setitem__(self, key, value): self.__dict__[key] = value def __getitem__(self, key): return self.__dict__[key] def __len__(self): return len(self.__dict__)
[docs] def to_dict(self) -> dict: """Convert an EppResultData object to a dictionary.""" return asdict(self)
[docs] class EppCommunicator: """ An EPP client for connecting to EPP server. """ # pylint: disable=too-many-instance-attributes,too-many-arguments def __init__( self, server: str, port: str, client_cert: Optional[str] = None, client_key: Optional[str] = None, dry_run: Optional[bool] = False, ) -> None: """ :param server: EPP server to connect to. :param port: EPP port to connect to. :param client_cert: Path to client certificate :param client_key: Path to client key :param dry_run: dry run the request """ self._server = server self._port = port self._user = None self._client_cert = client_cert self._client_key = client_key self._dry_run = dry_run self._format_32 = get_format_32() self._context = None self._socket = None self._ssl_socket = None self.greeting = None @property def user(self): """User property""" return self._user def _unpack_data(self, data: int) -> str: """ Unpack data. :param bytes data: data :return: unpacked data :rtype: str """ return struct.unpack(self._format_32, data)[0] def _pack_data(self, data: int) -> bytes: """ Pack the data. :param data: data :return: bytes """ return struct.pack(self._format_32, data) def _read(self) -> bytes: """ Read the response from the socket. :return: Response :rtype: Optional[bytes] """ length = self._ssl_socket.read(LENGTH_FIELD_SIZE) buffer = bytes() if not length: return None total_bytes = self._unpack_data(length) - LENGTH_FIELD_SIZE while len(buffer) < total_bytes: remaining = total_bytes - len(buffer) chunk = self._ssl_socket.recv(remaining) if not chunk: return None buffer += chunk logging.info("Received %s/%s bytes", len(buffer), total_bytes) return buffer def _write(self, xml: str) -> int: """ Write the request into the socket. :param str xml: XML Command :return: Number of send bytes :rtype: int """ # +4 for the length field itself (section 4 mandates that) # +2 for the CRLF at the end xml_bytes = xml.encode("utf-8") length = self._pack_data(len(xml_bytes) + LENGTH_FIELD_SIZE + CRLF_SIZE) self._ssl_socket.sendall(length) xml += "\r\n" data_to_send = xml.encode("utf-8") self._ssl_socket.sendall(data_to_send) return len(data_to_send) def _execute_command(self, cmd: str) -> bytes: """ Execute the command. Sending the request to the server and receive the response. :param str cmd: XML command :return: Response :rtype: bytes """ # Print the xml command and exit the app if self._dry_run: print(cmd) sys.exit() logging.debug("Sending xml to server :\n%s", cmd) self._write(cmd) response = self._read() if response is None: raise EppCommunicatorException("Cannot connect to server. Please re-login!") logging.debug("Received xml response from server :\n%s", response) return response
[docs] def connect(self) -> bytes: """ Initial connect to the server. :return: Greeting message :rtype: bytes :raises EppCommunicatorException: When there is any errors """ try: self._context = ssl.create_default_context() self._context.minimum_version = ssl.TLSVersion.TLSv1_2 self._context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 self._context.load_default_certs() if self._client_cert and self._client_key: self._context.load_cert_chain( certfile=self._client_cert, keyfile=self._client_key ) self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) self._socket.settimeout(10) self._ssl_socket = self._context.wrap_socket( self._socket, server_hostname=self._server ) self._ssl_socket.connect((self._server, int(self._port))) self.greeting = self._read() logging.debug(BeautifulSoup(self.greeting, "xml")) return self.greeting except Exception as ex: logging.error("Could not setup a sec sure connection. %s", str(ex)) raise EppCommunicatorException( "Could not setup a sec sure connection" ) from ex
[docs] def execute(self, cmd: str) -> EppResultData: """ Execute the command. Sending the request to the server and receive the response. :param str cmd: XML Command :return: Result object :rtype: EppResultData :raises EppCommunicatorException: When there is any errors. """ try: if not self.greeting and not self._dry_run: raise EppCommunicatorException( "The connection to the server has not been established yet!" ) raw_response = self._execute_command(cmd) xml_response = BeautifulSoup(raw_response, "xml") response = xml_response.find("response") result = xml_response.find("result") message = result.find("msg").string try: code = int(result.get("code")) except AttributeError as exc: raise EppCommunicatorException("Could not get result code.") from exc reason = None if code not in ( EppResultCode.SUCCESS.value, EppResultCode.SUCCESS_END_SESSION.value, ): reason = result.find("reason").string if result.find("reason") else None client_transaction_id = ( response.find("clTRID").text if response.find("clTRID") else None ) server_transaction_id = ( response.find("svTRID").text if response.find("svTRID") else None ) repository_object_id = ( response.find("roid").text if response.find("roid") else None ) logging.debug("Command executed:\n%s", xml_response) return EppResultData( code=code, message=message, reason=reason, raw_response=raw_response, client_transaction_id=client_transaction_id, server_transaction_id=server_transaction_id, repository_object_id=repository_object_id, result_data=None, ) except EppCommunicatorException as epp_ex: raise epp_ex except Exception as ex: raise EppCommunicatorException(ex) from ex
[docs] def hello(self) -> bytes: """ Send Hello command the server. :return: Greeting response :rtype: bytes """ logging.debug("Send Hello command to the server!") greeting = self._execute_command(HELLO_XML) return greeting
[docs] def login( self, user: str, password: str, extensions: Optional[list[str]] = None ) -> EppResultData: """ Login the user to EPP server. :param user: username :param password: password :param extensions: A list of supported extension URIs :return: Result object :rtype: EppResultData :raises EppCommunicatorException: When there are any errors. """ if extensions is None: extensions = [] self._user = user command_template = template_engine.from_string(LOGIN_XML) command = command_template.render( user=user, password=password, extensions=extensions ) result = self.execute(command) if result.code == EppResultCode.SUCCESS.value: logging.info( "User %s logged in to %s:%s", self._user, self._server, self._port ) elif result.code == EppResultCode.PARAMETER_RANGE_ERROR.value: raise EppCommunicatorException( "Incorrect user name or password. Please try again!" ) else: raise EppCommunicatorException( f"Something went wrong! Code: {result.code} - Message: " f"{result.message} - Reason {result.reason}" ) return result
[docs] def logout(self) -> EppResultData: """ Logout the user from EPP server. :return: Result object :rtype: EppResultData """ logout = self.execute(LOGOUT_XML) self._socket.close() logging.info( "User %s logged out from %s:%s", self._user, self._server, self._port ) return logout