Source code for federation.entities.diaspora.mappers

import logging
from datetime import datetime
from typing import Callable, List

# noinspection PyPackageRequirements
from Crypto.PublicKey.RSA import RsaKey
from lxml import etree

from federation.entities.base import Comment, Follow, Post, Profile, Reaction, Retraction, Share
from federation.entities.diaspora.entities import (
    DiasporaComment, DiasporaContact, DiasporaLike, DiasporaPost,
    DiasporaProfile, DiasporaReshare, DiasporaRetraction,
    DiasporaImage)
from federation.entities.diaspora.mixins import DiasporaRelayableMixin
from federation.entities.mixins import BaseEntity
from federation.protocols.diaspora.signatures import get_element_child_info
from federation.types import UserType, ReceiverVariant
from federation.utils.diaspora import retrieve_and_parse_profile

logger = logging.getLogger("federation")

MAPPINGS = {
    "status_message": DiasporaPost,
    "comment": DiasporaComment,
    "photo": DiasporaImage,
    "like": DiasporaLike,
    "profile": DiasporaProfile,
    "retraction": DiasporaRetraction,
    "contact": DiasporaContact,
    "reshare": DiasporaReshare,
}

TAGS = [
    # Order is important. Any top level tags should be before possibly child tags
    "reshare", "status_message", "comment", "like", "request", "profile", "retraction", "photo", "contact",
]

BOOLEAN_KEYS = (
    "public",
    "nsfw",
    "following",
    "sharing",
)

DATETIME_KEYS = (
    "created_at",
)

INTEGER_KEYS = (
    "height",
    "width",
)


def xml_children_as_dict(node):
    """Turn the children of node <xml> into a dict, keyed by tag name.

    This is only a shallow conversation - child nodes are not recursively processed.
    """
    return dict((e.tag, e.text) for e in node)


def check_sender_and_entity_handle_match(sender_handle, entity_handle):
    """Ensure that sender and entity handles match.

    Basically we've already verified the sender is who they say when receiving the payload. However, the sender might
    be trying to set another author in the payload itself, since Diaspora has the sender in both the payload headers
    AND the object. We must ensure they're the same.
    """
    if sender_handle != entity_handle:
        logger.warning("sender_handle and entity_handle don't match, aborting! sender_handle: %s, entity_handle: %s",
                       sender_handle, entity_handle)
        return False
    return True


def element_to_objects(
        element: etree.ElementTree, sender: str, sender_key_fetcher: Callable[[str], str] = None, user: UserType = None,
) -> List:
    """Transform an Element to a list of entities recursively.

    Possible child entities are added to each entity ``_children`` list.

    Optional parameter ``sender_key_fetcher`` can be a function to fetch sender public key.
    If not given, key will always be fetched over the network. The function should take sender as the only parameter.
    """
    entities = []
    cls = MAPPINGS.get(element.tag)
    if not cls:
        return []

    attrs = xml_children_as_dict(element)
    transformed = transform_attributes(attrs, cls)
    if hasattr(cls, "fill_extra_attributes"):
        transformed = cls.fill_extra_attributes(transformed)
    entity = cls(**transformed)
    # Add protocol name
    entity._source_protocol = "diaspora"
    # Save element object to entity for possible later use
    entity._source_object = etree.tostring(element)

    # Save receivers on the entity
    if user:
        # Single receiver
        entity._receivers = [UserType(id=user.id, receiver_variant=ReceiverVariant.ACTOR)]
    else:
        # Followers
        entity._receivers = [UserType(id=sender, receiver_variant=ReceiverVariant.FOLLOWERS)]

    if issubclass(cls, DiasporaRelayableMixin):
        # If relayable, fetch sender key for validation
        entity._xml_tags = get_element_child_info(element, "tag")
        if sender_key_fetcher:
            entity._sender_key = sender_key_fetcher(entity.actor_id)
        else:
            profile = retrieve_and_parse_profile(entity.handle)
            if profile:
                entity._sender_key = profile.public_key
    else:
        # If not relayable, ensure handles match
        if not check_sender_and_entity_handle_match(sender, entity.handle):
            return []
    try:
        entity.validate()
    except ValueError as ex:
        logger.error("Failed to validate entity %s: %s", entity, ex, extra={
            "attrs": attrs,
            "transformed": transformed,
        })
        return []

    # Extract mentions
    if hasattr(entity, "extract_mentions"):
        entity.extract_mentions()

    # Do child elements
    for child in element:
        # noinspection PyProtectedMember
        entity._children.extend(element_to_objects(child, sender, user=user))
    # Add to entities list
    entities.append(entity)
    return entities


def message_to_objects(
        message: str, sender: str, sender_key_fetcher:Callable[[str], str]=None, user: UserType =None,
) -> List:
    """Takes in a message extracted by a protocol and maps it to entities.

    :param message: XML payload
    :type message: str
    :param sender: Payload sender id
    :type message: str
    :param sender_key_fetcher: Function to fetch sender public key. If not given, key will always be fetched
        over network. The function should take sender handle as the only parameter.
    :param user: Optional receiving user object. If given, should have a `handle`.
    :returns: list of entities
    """
    doc = etree.fromstring(message)
    if doc.tag in TAGS:
        return element_to_objects(doc, sender, sender_key_fetcher, user)
    return []


def transform_attributes(attrs, cls):
    """Transform some attribute keys.

    :param attrs: Properties from the XML
    :type attrs: dict
    :param cls: Class of the entity
    :type cls: class
    """
    transformed = {}
    for key, value in attrs.items():
        if value is None:
            value = ""
        if key == "text":
            transformed["raw_content"] = value
        elif key == "activitypub_id":
            transformed["id"] = value
        elif key == "author":
            if cls == DiasporaProfile:
                # Diaspora Profile XML message contains no GUID. We need the guid. Fetch it.
                profile = retrieve_and_parse_profile(value)
                transformed['id'] = value
                transformed["guid"] = profile.guid
            else:
                transformed["actor_id"] = value
            transformed["handle"] = value
        elif key == 'guid':
            if cls != DiasporaProfile:
                transformed["id"] = value
                transformed["guid"] = value
        elif key in ("root_author", "recipient"):
            transformed["target_id"] = value
            transformed["target_handle"] = value
        elif key in ("target_guid", "root_guid", "parent_guid"):
            transformed["target_id"] = value
            transformed["target_guid"] = value
        elif key == "thread_parent_guid":
            transformed["root_target_id"] = value
            transformed["root_target_guid"] = value
        elif key in ("first_name", "last_name"):
            values = [attrs.get('first_name'), attrs.get('last_name')]
            values = [v for v in values if v]
            transformed["name"] = " ".join(values)
        elif key == "image_url":
            if "image_urls" not in transformed:
                transformed["image_urls"] = {}
            transformed["image_urls"]["large"] = value
        elif key == "image_url_small":
            if "image_urls" not in transformed:
                transformed["image_urls"] = {}
            transformed["image_urls"]["small"] = value
        elif key == "image_url_medium":
            if "image_urls" not in transformed:
                transformed["image_urls"] = {}
            transformed["image_urls"]["medium"] = value
        elif key == "tag_string":
            if value:
                transformed["tag_list"] = value.replace("#", "").split(" ")
        elif key == "bio":
            transformed["raw_content"] = value
        elif key == "searchable":
            transformed["public"] = True if value == "true" else False
        elif key in ["target_type"] and cls == DiasporaRetraction:
            transformed["entity_type"] = DiasporaRetraction.entity_type_from_remote(value)
        elif key == "remote_photo_path":
            transformed["url"] = f"{value}{attrs.get('remote_photo_name')}"
        elif key == "author_signature":
            transformed["signature"] = value
        elif key in BOOLEAN_KEYS:
            transformed[key] = True if value == "true" else False
        elif key in DATETIME_KEYS:
            transformed[key] = datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ")
        elif key in INTEGER_KEYS:
            transformed[key] = int(value)
        else:
            transformed[key] = value
    return transformed


[docs]def get_outbound_entity(entity: BaseEntity, private_key: RsaKey): """Get the correct outbound entity for this protocol. We might have to look at entity values to decide the correct outbound entity. If we cannot find one, we should raise as conversion cannot be guaranteed to the given protocol. Private key of author is needed to be passed for signing the outbound entity. :arg entity: An entity instance which can be of a base or protocol entity class. :arg private_key: Private key of sender as an RSA object :returns: Protocol specific entity class instance. :raises ValueError: If conversion cannot be done. """ if getattr(entity, "outbound_doc", None): # If the entity already has an outbound doc, just return the entity as is return entity outbound = None cls = entity.__class__ if cls in [DiasporaPost, DiasporaImage, DiasporaComment, DiasporaLike, DiasporaProfile, DiasporaRetraction, DiasporaContact, DiasporaReshare]: # Already fine outbound = entity elif cls == Post: outbound = DiasporaPost.from_base(entity) elif cls == Comment: outbound = DiasporaComment.from_base(entity) elif cls == Reaction: if entity.reaction == "like": outbound = DiasporaLike.from_base(entity) elif cls == Follow: outbound = DiasporaContact.from_base(entity) elif cls == Profile: outbound = DiasporaProfile.from_base(entity) elif cls == Retraction: outbound = DiasporaRetraction.from_base(entity) elif cls == Share: outbound = DiasporaReshare.from_base(entity) if not outbound: raise ValueError("Don't know how to convert this base entity to Diaspora protocol entities.") if isinstance(outbound, DiasporaRelayableMixin) and not outbound.signature: # Sign by author if not signed yet. We don't want to overwrite any existing signature in the case # that this is being sent by the parent author outbound.sign(private_key) # If missing, also add same signature to `parent_author_signature`. This is required at the moment # in all situations but is apparently being removed. # TODO: remove this once Diaspora removes the extra signature outbound.parent_signature = outbound.signature # Validate the entity outbound.validate(direction="outbound") return outbound