Source code for sos_access.client

import logging
import socket
import ssl
import time

from sos_access.exceptions import (
    SOSAccessClientException,
    IncorrectlyConfigured,
    InvalidLength,
    InvalidXML,
    WrongContent,
    NotAuthorized,
    NotTreatedNotDistributed,
    MandatoryDataMissing,
    ServiceUnavailable,
    DuplicateAlarm,
    OtherError,
    XMLHeaderError,
    PingToOften,
    ServerSystemError,
    TCPTransportError,
    XMLParseError,
)
from sos_access.schemas import (
    AlarmRequestSchema,
    AlarmRequest,
    AlarmResponseSchema,
    PingRequest,
    PingRequestSchema,
    PingResponseSchema,
    NewAuthRequestSchema,
    NewAuthResponseSchema,
    NewAuthRequest,
    PingResponse,
    AlarmResponse,
    NewAuthResponse,
)

from sos_access.decorators import alternating_retry

logger = logging.getLogger(__name__)


# TODO: Implement Point class for different ways of generation a geografical
#       point and use to position.
# TODO: What event codes are there? An official list?
# TODO: Find better description on what Section and detector is used for in the recieiver
# TODO: Document monitored connections
# TODO: add proper logging
# TODO: convert time to utc aware


[docs]class SOSAccessClient: """ Client implementing the SOS Access v4 protocol to be used for sending Alarm Requests to alarm operators. :param str transmitter_code: The code that identifies the transmitter. Supplied by alarm operator :param str transmitter_type: The transmitter type. Supplied by alarm operator :param str authentication: Password for the alarm transmitter. Supplied by alarm operator :param str receiver_id: ID for the receiving system. Needed for the protocol, but might not be needed by the alarm operator. :param (str, int) receiver_address: Tuple of host (IP or FQDN) and port :param (str, int) secondary_receiver_address: Tuple of host (IP or FQDN) and port :param bool use_single_receiver: To enable not supplying secondary_receiver_address :param bool use_tls: Indicates if the transmission of alarm should be encrypted using TLS. This library does not support SSLv3 since it is insecure. """ MAX_RETRY = 3 ENCODING = "latin-1" # it is in the specs only to allow iso-8859-1 def __init__( self, transmitter_code, transmitter_type, authentication, receiver_id, receiver_address, secondary_receiver_address=None, use_single_receiver=False, use_tls=False, ): self.transmitter_code = transmitter_code self.transmitter_type = transmitter_type self.authentication = authentication self.receiver_id = receiver_id self.receiver_address = receiver_address self.secondary_receiver_address = secondary_receiver_address self.use_single_receiver = use_single_receiver self.use_tls = use_tls if self.secondary_receiver_address is None and not use_single_receiver: raise IncorrectlyConfigured( "Both primary and secondary receiver address is needed." ) # load all schemas so we don't have to remember to do it again. self.alarm_request_schema = AlarmRequestSchema() self.alarm_response_schema = AlarmResponseSchema() self.ping_request_schema = PingRequestSchema() self.ping_response_schema = PingResponseSchema() self.new_auth_request_schema = NewAuthRequestSchema() self.new_auth_response_schema = NewAuthResponseSchema() def __repr__(self): return ( f"{self.__class__.__name__}(" f"transmitter_code={self.transmitter_code}, " f"transmitter_type={self.transmitter_type}, " f"authentication=<redacted>, " f"receiver_id={self.receiver_id}, " f"receiver_address={self.receiver_address}, " f"secondary_receiver_address={self.secondary_receiver_address}, " f"use_single_receiver={self.use_single_receiver}, " f"use_tls={self.use_tls})" )
[docs] def send_alarm( self, event_code, transmitter_time=None, reference=None, transmitter_area=None, section=None, section_text=None, detector=None, detector_text=None, additional_info=None, position=None, ) -> AlarmResponse: """ Sends an alarm in the receiver. :param str event_code: The event code of the alarm. :param datetime.datetime transmitter_time: Time of the device or system sending the alarm :param str reference: A reference that will show up in logs on the alarm receiver. :param str transmitter_area: Can be used to control different action at the alarm operator on the same transmitter and event_code. :param str section: Section ID :param str section_text: Section Description :param str detector: Detector ID :param str detector_text: Detector Description :param str|dict|list|set|tuple additional_info: Extra information about alarm :param str position: Position data :return: :class:`AlarmResponse` from alarm receiver """ alarm_request = AlarmRequest( event_code=event_code, transmitter_type=self.transmitter_type, transmitter_code=self.transmitter_code, authentication=self.authentication, receiver=self.receiver_id, alarm_type="AL", transmitter_time=transmitter_time, reference=reference, transmitter_area=transmitter_area, section=section, section_text=section_text, detector=detector, detector_text=detector_text, additional_info=additional_info, position=position, ) logger.info(f"Sending alarm request: {alarm_request}") return self._send_alarm(alarm_request)
[docs] def restore_alarm( self, event_code, transmitter_time=None, reference=None, transmitter_area=None, section=None, section_text=None, detector=None, detector_text=None, additional_info=None, position=None, ) -> AlarmResponse: """ Restores an alarm in the receiver. :param str event_code: The event code of the alarm. :param str transmitter_time: Time of the device or system sending the alarm :param str reference: A reference that will show up in logs on the alarm receiver. :param str transmitter_area: Can be used to control different action at the alarm operator on the same transmitter and event_code. :param str section: Section ID :param str section_text: Section Description :param str detector: Detector ID :param str detector_text: Detector Description :param str|dict|list|set|tuple additional_info: Extra information about alarm :param str position: Position data :return: :class:`AlarmResponse` from alarm receiver .. todo:: Is all the extra fields necessary on alarm restore? """ alarm_request = AlarmRequest( event_code=event_code, transmitter_type=self.transmitter_type, transmitter_code=self.transmitter_code, authentication=self.authentication, receiver=self.receiver_id, alarm_type="RE", transmitter_time=transmitter_time, reference=reference, transmitter_area=transmitter_area, section=section, section_text=section_text, detector=detector, detector_text=detector_text, additional_info=additional_info, position=position, ) logger.info(f"Sending restore alarm request: {alarm_request}") return self._send_alarm(alarm_request)
@alternating_retry def _send_alarm( self, alarm_request: AlarmRequest, secondary=False ) -> AlarmResponse: """ Sending function so it is wrappable with alternating_retry and we can check the status of the response object :param AlarmRequest alarm_request: The :class:`AlarmRequest` to send. :param bool secondary: Indicates if the secondary receiver should be used. Is used by alternating_retry :return: :class:`AlarmRequest` """ out_data = self.alarm_request_schema.dump(alarm_request) logger.debug(f"Sending SOS Access Data: {out_data}") alarm_response = self.transmit( out_data, self.alarm_response_schema, secondary=secondary ) logger.info(f"Received alarm response: {alarm_response}") self._check_response_status(alarm_response) return alarm_response
[docs] def ping(self, reference=None) -> PingResponse: """ Sends a heart beat message to indicate to the alarm operator that the alarm device is still operational :param str reference: Is used as reference for the request and can be searched for in logs. :return: :class:`PingResponse` """ ping_request = PingRequest( authentication=self.authentication, transmitter_code=self.transmitter_code, transmitter_type=self.transmitter_type, reference=reference, ) logger.info(f"Sending {ping_request}") return self._send_ping(ping_request)
@alternating_retry def _send_ping(self, ping_request: PingRequest, secondary=False) -> PingResponse: """ Sending function so it is wrappable with alternating_retry and we can check the status of the response object :param PingRequest ping_request: The :class:`PingRequest` to send. :param bool secondary: Indicates if the secondary receiver should be used. Is used by alternating_retry :return: :class:`PingResponse` """ out_data = self.ping_request_schema.dump(ping_request) logger.debug(f"Sending SOS Access data: {out_data}") ping_response = self.transmit( out_data, self.ping_response_schema, secondary=secondary ) logger.info(f"Received {ping_response}") self._check_response_status(ping_response) return ping_response
[docs] def request_new_auth(self, reference=None) -> NewAuthResponse: """ Send a request for new password on the server. This is used so that you can have a standard password when deploying devices but it is changed to something the alarm installer does not know when the alarm is operational :param str reference: Is used as reference for the request and can be searched for in logs. :return: :class:`NewAuthResponse` """ new_auth_request = NewAuthRequest( authentication=self.authentication, transmitter_code=self.transmitter_code, transmitter_type=self.transmitter_type, reference=reference, ) logger.info(f"Sending {new_auth_request}") return self._send_request_new_auth(new_auth_request)
@alternating_retry def _send_request_new_auth( self, new_auth_request: NewAuthRequest, secondary=False ) -> NewAuthResponse: """ Sending function so it is wrappable with alternating_retry and we can check the status of the response object :param NewAuthRequest new_auth_request: The :class:`NewAuthRequest` to send. :param bool secondary: Indicates if the secondary receiver should be used. Is used by alternating_retry :return: :class:`NewAuthResponse` """ out_data = self.new_auth_request_schema.dump(new_auth_request) logger.debug(f"Sending SOS Access data: {out_data}") new_auth_response = self.transmit( out_data, self.new_auth_response_schema, secondary=secondary ) logger.info(f"Received {new_auth_response}") self._check_response_status(new_auth_response) return new_auth_response
[docs] def transmit(self, data, response_schema, secondary=False): """ Will create a TCP connection and send the request and received the response and then close the TCP connection. :param str data: The SOS Access XML data to be sent. :param AlarmResponseSchema|PingResponseSchema|NewAuthResponseSchema response_schema: The schema used to deserialize the response. :param bool secondary: Indicates if to use the secondary receiver. """ if secondary: address = self.secondary_receiver_address else: address = self.receiver_address logger.info( f"Starting new connection to {address} with " f"secure={self.use_tls}" ) with TCPTransport(address, secure=self.use_tls) as transport: transport.connect() transport.send(data.encode(self.ENCODING)) return self._receive(transport, response_schema)
def _receive(self, transport, response_schema, timeout=10): """ Some alarm receivers will send the response in several packets. Try and parse for each packet and if it doesnt work read some more. :param TCPTransport transport: the current transport that has a connection to the alarm receiver :param AlarmResponseSchema|PingResponseSchema|NewAuthResponseSchema response_schema: The schema used to deserialize the response. :param int timeout: Indicates how long to try and read more data. """ in_data = "" start_time = time.time() duration = 0 while duration < timeout: in_data = in_data + transport.receive().decode(self.ENCODING) try: response = response_schema.load(in_data) logger.debug(f"Received SOS Access Data: {in_data}") return response except XMLParseError: duration = time.time() - start_time continue raise SOSAccessClientException("Reading response within timeout failed") @staticmethod def _check_response_status(response): """ Checks the status of the response and raise appropriate errors if the request wasn't successful. :param AlarmResponse|PingResponse|NewAuthResponse response: Response object """ # TODO: Test! if response.status == 1: raise InvalidLength(f"{response.info}: Message is too long.") elif response.status == 2: raise InvalidXML(f"{response.info}: Invalid XML content.") elif response.status == 3: raise WrongContent( f"{response.info}: Wrong data content. " f"Ex: too long text in a field." ) elif response.status == 4: raise NotAuthorized( f"{response.info}: Wrong combination of " f"transmitter, instance and password" ) elif response.status == 5: # Fail over should kick in. raise NotTreatedNotDistributed( f"{response.info}: Error in " f"processing. It has neither been " f"treated or distributed" ) elif response.status == 7: raise MandatoryDataMissing( f"{response.info}: Mandatory XML tag " f"missing" ) elif response.status == 9: raise ServiceUnavailable( ( f"{response.info}: Heartbeat is not enabled on the server for " f"this transmitter or you are not authorized to use it." ) ) elif response.status == 10: raise DuplicateAlarm( f"{response.info}: The same alarm was received" f" multiple times" ) elif response.status == 98: raise ServerSystemError(f"{response.info}: General receiver error") elif response.status == 99: # Failover should kick in. raise OtherError(f"{response.info}: Unknown receiver error") elif response.status == 100: raise XMLHeaderError(f"{response.info}: Invalid or missing XML " f"header") elif response.status == 101: raise PingToOften(f"{response.info}: Heartbeat is sent too often")
[docs]class TCPTransport: """ A context manager for TCP sockets to make sure the are closed correctly. Can create both secure and non secures sockets. """ def __init__(self, address, secure=False, timeout=5): self.address = address self.timeout = timeout self.socket = self._get_socket(secure, timeout) def __enter__(self): # If we dont do anything in __enter__ we dont have to handle exceptions # twice.... return self def __exit__(self, exc_type, exc_val, exc_tb): self.socket.close() # Catch any error related to the socket and raise as TCP error if exc_type in (OSError, IOError, ssl.SSLError, socket.error, socket.timeout): raise TCPTransportError from exc_type(exc_val, exc_tb) def _get_socket(self, secure=False, timeout=None): """ Returns socket. If secure is True the socket is wrapped in an SSL Context """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout or self.timeout) if secure: # Setting Purpose to CLIENT_AUTH might seem a bit backwards. But # SOS Access v4 is using SSL/TLS for encryption not authentications # and verification. There is no cert and no hostname to check so # setting the purpose to Client Auth diables that in a nifty way. self.context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) return self.context.wrap_socket(sock) else: return sock def connect(self, address=None): _address = address or self.address self.socket.connect(_address)
[docs] def send(self, data): """Send data over socket with correct encoding""" self.socket.sendall(data)
[docs] def receive(self): """Receive data on socket and decode using correct encoding""" data = self.socket.recv(4096) return data