From 16951ccc2232903f99a5f8dc9bdba855f738c5e6 Mon Sep 17 00:00:00 2001 From: Radu Carpa Date: Fri, 21 Jul 2023 15:12:07 +0200 Subject: [PATCH] Transfers: bittorrent transfertool. Close #6479 Additional metadata is added to each file did: the merkle root and piece layers used by the bittorrent v2 format. This way, we can reconstruct the .torrent files from this data, allowing us to transfer files directly between RSEs using bittorrent clients running on each of the RSEs. This, initial implementation, relies on the qBittorrent client, but other clients which support bittorrent v2 could be added later. A new RSE protocol was needed for the task. The protocol must be configured with the hostname+port of the bittorrent's data channel and the 'magnet' scheme. We use custom extensions to the magnet format to store the name/scope/path of a replica, so the link is not currently importable into existing torrent clients, but this leaves the door open for the future. It would be possible to generate magnet links which actually work with such clients directly in list-replicas. --- etc/docker/dev/configure_qbittorrent.py | 135 ++++++++++ etc/docker/dev/docker-compose.ports.yml | 4 + etc/docker/dev/docker-compose.yml | 13 +- etc/docker/dev/web1/entrypoint.sh | 34 +++ etc/docker/dev/xrd/entrypoint.sh | 9 +- .../test/extra/rucio_autotests_common.cfg | 2 +- etc/docker/test/extra/rucio_default.cfg | 2 +- lib/rucio/client/uploadclient.py | 18 +- lib/rucio/common/constants.py | 2 +- lib/rucio/common/types.py | 5 +- lib/rucio/common/utils.py | 238 +++++++++++++++++- lib/rucio/core/request.py | 4 +- lib/rucio/core/transfer.py | 16 +- lib/rucio/daemons/conveyor/poller.py | 18 +- lib/rucio/rse/protocols/bittorrent.py | 186 ++++++++++++++ lib/rucio/transfertool/bittorrent.py | 197 +++++++++++++++ lib/rucio/transfertool/bittorrent_driver.py | 52 ++++ .../bittorrent_driver_qbittorrent.py | 131 ++++++++++ lib/rucio/transfertool/transfertool.py | 2 +- requirements.txt | 2 + tests/conftest.py | 7 + tests/test_conveyor.py | 59 ++++- tests/test_upload.py | 6 - tests/test_utils.py | 26 +- tools/docker_activate_rses.sh | 8 + 25 files changed, 1138 insertions(+), 38 deletions(-) create mode 100644 etc/docker/dev/configure_qbittorrent.py create mode 100755 etc/docker/dev/web1/entrypoint.sh create mode 100644 lib/rucio/rse/protocols/bittorrent.py create mode 100644 lib/rucio/transfertool/bittorrent.py create mode 100644 lib/rucio/transfertool/bittorrent_driver.py create mode 100644 lib/rucio/transfertool/bittorrent_driver_qbittorrent.py diff --git a/etc/docker/dev/configure_qbittorrent.py b/etc/docker/dev/configure_qbittorrent.py new file mode 100644 index 0000000000..477a35e6c7 --- /dev/null +++ b/etc/docker/dev/configure_qbittorrent.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import re +import os +import sys +import time +import urllib.request +import urllib.parse + + +def wait_for_server(host, port, max_wait): + timer = 0 + while timer < max_wait: + try: + response = urllib.request.urlopen(urllib.request.Request(f'http://{host}:{port}')) + if response.status == 200: + return + except Exception: + if timer > max_wait: + raise + time.sleep(1) + timer += 1 + + +def configure( + host, + port, + username, + password, + new_config, +): + scheme = 'http' + headers = { + 'content-type': 'application/x-www-form-urlencoded' + } + + # Authenticate + req = urllib.request.Request( + f'{scheme}://{host}:{port}/api/v2/auth/login', + headers=headers, + data=urllib.parse.urlencode({ + 'username': username, + 'password': password + }).encode(), + method='POST', + ) + response = urllib.request.urlopen(req) + headers['Cookie'] = response.getheader("Set-Cookie") + + # Update the config + req = urllib.request.Request( + f'{scheme}://{host}:{port}/api/v2/app/setPreferences', + headers=headers, + data=urllib.parse.urlencode({'json': json.dumps(new_config)}).encode(), + method='POST', + ) + urllib.request.urlopen(req) + + scheme = 'https' if new_config.get('use_https') else scheme + port = new_config.get('web_ui_port', port) + + # Logout + req = urllib.request.Request(f'{scheme}://{host}:{port}/api/v2/auth/logout', headers=headers, method='POST') + urllib.request.urlopen(req) + + +if __name__ == "__main__": + initial_config_done = False + extract_password = re.compile(r'.*: ([23456789ABCDEFGHIJKLMNPQRSTUVWXYZabcdefghjkmnpqrstuvwxyz]{9})$') + for line in sys.stdin: + if initial_config_done: + continue + + automatic_password = extract_password.search(line) + if not automatic_password: + continue + automatic_password = automatic_password.group(1) + + port = 8080 + host = 'localhost' + + wait_for_server(host=host, port=port, max_wait=60) + + config = { + 'listen_port': int(os.environ.get('QBITTORRENT_LISTEN_PORT', 10000)), + # 'ssl_enabled': True, + # 'ssl_listen_port': 20000, + 'upnp': False, + 'dht': False, + 'pex': False, + 'lsd': False, + 'encryption': 1, # require encryption + 'bypass_local_auth': False, + 'web_ui_upnp': False, + # 'web_ui_address': '', + 'enable_embedded_tracker': True, + 'embedded_tracker_port': int(os.environ.get('QBITTORRENT_TRACKER_PORT', 10001)), + 'enable_multi_connections_from_same_ip': True, + } + + if os.environ.get('QBITTORRENT_UI_PASSWORD'): + config['web_ui_password'] = os.environ['QBITTORRENT_UI_PASSWORD'] + if os.environ.get('QBITTORRENT_UI_USERNAME'): + config['web_ui_username'] = os.environ['QBITTORRENT_UI_USERNAME'] + if os.environ.get('QBITTORRENT_UI_PORT'): + config['web_ui_port'] = os.environ['QBITTORRENT_UI_PORT'] + + if os.environ.get('QBITTORRENT_UI_CERT') and os.environ.get('QBITTORRENT_UI_KEY'): + config['use_https'] = True + config['web_ui_https_cert_path'] = os.environ['QBITTORRENT_UI_CERT'] + config['web_ui_https_key_path'] = os.environ['QBITTORRENT_UI_KEY'] + + configure( + host=host, + port=port, + username='admin', + password=automatic_password, + new_config=config, + ) + + initial_config_done = True diff --git a/etc/docker/dev/docker-compose.ports.yml b/etc/docker/dev/docker-compose.ports.yml index e563bceea5..47338e05f1 100644 --- a/etc/docker/dev/docker-compose.ports.yml +++ b/etc/docker/dev/docker-compose.ports.yml @@ -46,6 +46,10 @@ services: xrd5: ports: - "127.0.0.1:1098:1098" + - "127.0.0.1:8098:8098" + web1: + ports: + - "127.0.0.1:8099:8099" minio: ports: - "127.0.0.1:9000:9000" diff --git a/etc/docker/dev/docker-compose.yml b/etc/docker/dev/docker-compose.yml index 5dda1b2b2d..261b4f3bc7 100644 --- a/etc/docker/dev/docker-compose.yml +++ b/etc/docker/dev/docker-compose.yml @@ -216,9 +216,14 @@ services: environment: - XRDHOST=xrd5 - XRDPORT=1098 + - QBITTORRENT_UI_USERNAME=rucio + - QBITTORRENT_UI_PASSWORD=rucio90df + - QBITTORRENT_UI_PORT=8098 + - QBITTORRENT_LISTEN_PORT=10000 volumes: - ./xrd/entrypoint.sh:/docker-entrypoint.sh:ro - ./xrd:/configs:ro + - ./configure_qbittorrent.py:/configure_qbittorrent.py:ro - ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:z - ../../certs/hostcert_xrd5.pem:/tmp/xrdcert.pem:Z - ../../certs/hostcert_xrd5.key.pem:/tmp/xrdkey.pem:Z @@ -228,8 +233,14 @@ services: hard: 10240 web1: image: rucio/webdav - command: bash -c "a2dismod want_digest; apache2 -D FOREGROUND" + environment: + - QBITTORRENT_UI_USERNAME=rucio + - QBITTORRENT_UI_PASSWORD=rucio90df + - QBITTORRENT_UI_PORT=8099 + - QBITTORRENT_LISTEN_PORT=10000 volumes: + - ./web1/entrypoint.sh:/usr/local/bin/docker-entrypoint.sh:ro + - ./configure_qbittorrent.py:/configure_qbittorrent.py:ro - ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:ro - ../../certs/hostcert_web1.key.pem:/etc/grid-security/hostkey.pem:ro - ../../certs/hostcert_web1.pem:/etc/grid-security/hostcert.pem:Z diff --git a/etc/docker/dev/web1/entrypoint.sh b/etc/docker/dev/web1/entrypoint.sh new file mode 100755 index 0000000000..075072c7d2 --- /dev/null +++ b/etc/docker/dev/web1/entrypoint.sh @@ -0,0 +1,34 @@ +#!/bin/bash +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +. /etc/apache2/envvars + +a2dismod want_digest +a2dismod zgridsite +a2ensite default-ssl +a2dissite default + +if [ -n "$QBITTORRENT_UI_PORT" ] +then + export QBITTORRENT_UI_CERT=/etc/grid-security/hostcert.pem + export QBITTORRENT_UI_KEY=/etc/grid-security/hostkey.pem + chown www-data /var/www/ + su -s /bin/bash -c qbittorrent-nox www-data | tee >(python3 /configure_qbittorrent.py) & +fi + +exec "$@" diff --git a/etc/docker/dev/xrd/entrypoint.sh b/etc/docker/dev/xrd/entrypoint.sh index e045374847..dc1bfe5811 100755 --- a/etc/docker/dev/xrd/entrypoint.sh +++ b/etc/docker/dev/xrd/entrypoint.sh @@ -24,6 +24,11 @@ cp /tmp/xrdkey.pem /etc/grid-security/xrd/xrdkey.pem chown -R xrootd:xrootd /etc/grid-security/xrd chmod 0400 /etc/grid-security/xrd/xrdkey.pem -xrootd -R xrootd -n rucio -c /etc/xrootd/xrdrucio.cfg +if [ -n "$QBITTORRENT_UI_PORT" ] +then + export QBITTORRENT_UI_CERT=/tmp/xrdcert.pem + export QBITTORRENT_UI_KEY=/tmp/xrdkey.pem + su -s /bin/bash -c qbittorrent-nox xrootd | tee >(python3 /configure_qbittorrent.py) & +fi -exec "$@" +xrootd -R xrootd -n rucio -c /etc/xrootd/xrdrucio.cfg diff --git a/etc/docker/test/extra/rucio_autotests_common.cfg b/etc/docker/test/extra/rucio_autotests_common.cfg index 9d17512d53..c3117192f4 100644 --- a/etc/docker/test/extra/rucio_autotests_common.cfg +++ b/etc/docker/test/extra/rucio_autotests_common.cfg @@ -41,7 +41,7 @@ carbon_port = 8125 user_scope = travis [conveyor] -scheme = srm,root,davs,gsiftp,http,https,mock,file +scheme = srm,root,davs,gsiftp,http,https,mock,file,magnet transfertool = fts3 ftshosts = https://fts:8446 cacert = /opt/rucio/etc/rucio_ca.pem diff --git a/etc/docker/test/extra/rucio_default.cfg b/etc/docker/test/extra/rucio_default.cfg index fc9cb420f0..13319509f4 100644 --- a/etc/docker/test/extra/rucio_default.cfg +++ b/etc/docker/test/extra/rucio_default.cfg @@ -48,7 +48,7 @@ carbon_port = 8125 user_scope = docker [conveyor] -scheme = https,davs,gsiftp,root,srm,mock,file +scheme = https,davs,gsiftp,root,srm,mock,file,magnet #scheme = https #user_transfers = cms #user_activities = ['dummy_user_activity'] diff --git a/lib/rucio/client/uploadclient.py b/lib/rucio/client/uploadclient.py index 0a29d90648..8cc24759ba 100644 --- a/lib/rucio/client/uploadclient.py +++ b/lib/rucio/client/uploadclient.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import copy import json import logging @@ -24,13 +25,13 @@ from rucio import version from rucio.client.client import Client -from rucio.common.config import config_get_int, config_get +from rucio.common.config import config_get_int, config_get, config_get_bool from rucio.common.exception import (RucioException, RSEWriteBlocked, DataIdentifierAlreadyExists, RSEOperationNotSupported, DataIdentifierNotFound, NoFilesUploaded, NotAllFilesUploaded, FileReplicaAlreadyExists, ResourceTemporaryUnavailable, ServiceUnavailable, InputValidationError, RSEChecksumUnavailable, ScopeNotFound) from rucio.common.utils import (adler32, detect_client_location, execute, generate_uuid, make_valid_did, md5, send_trace, - retry, GLOBALLY_SUPPORTED_CHECKSUMS) + retry, bittorrent_v2_merkle_sha256, GLOBALLY_SUPPORTED_CHECKSUMS) from rucio.rse import rsemanager as rsemgr @@ -336,6 +337,18 @@ def _pick_random_rse(rse_expression): raise NotAllFilesUploaded() return 0 + def _add_bittorrent_meta(self, file, logger): + if not config_get_bool('client', 'register_bittorrent_meta', default=False): + return + + pieces_root, pieces_layers, piece_length = bittorrent_v2_merkle_sha256(os.path.join(file['dirname'], file['basename'])) + bittorrent_meta = { + 'bittorrent_pieces_root': base64.b64encode(pieces_root).decode(), + 'bittorrent_pieces_layers': base64.b64encode(pieces_layers).decode(), + 'bittorrent_piece_length': piece_length, + } + self.client.set_metadata_bulk(scope=file['did_scope'], name=file['did_name'], meta=bittorrent_meta) + def _register_file(self, file, registered_dataset_dids, ignore_availability=False, activity=None): """ Registers the given file in Rucio. Creates a dataset if @@ -410,6 +423,7 @@ def _register_file(self, file, registered_dataset_dids, ignore_availability=Fals except DataIdentifierNotFound: logger(logging.DEBUG, 'File DID does not exist') self.client.add_replicas(rse=rse, files=[replica_for_api]) + self._add_bittorrent_meta(file=file, logger=logger) logger(logging.INFO, 'Successfully added replica in Rucio catalogue at %s' % rse) if not dataset_did_str: # only need to add rules for files if no dataset is given diff --git a/lib/rucio/common/constants.py b/lib/rucio/common/constants.py index cdd0731008..aab2ae14d0 100644 --- a/lib/rucio/common/constants.py +++ b/lib/rucio/common/constants.py @@ -48,7 +48,7 @@ SCHEME_MAP['srm'].append('davs') SCHEME_MAP['davs'].append('srm') -SUPPORTED_PROTOCOLS = ['gsiftp', 'srm', 'root', 'davs', 'http', 'https', 'file', 'storm', 'srm+https', 'scp', 'rsync', 'rclone'] +SUPPORTED_PROTOCOLS = ['gsiftp', 'srm', 'root', 'davs', 'http', 'https', 'file', 'storm', 'srm+https', 'scp', 'rsync', 'rclone', 'magnet'] FTS_STATE = namedtuple('FTS_STATE', ['SUBMITTED', 'READY', 'ACTIVE', 'FAILED', 'FINISHED', 'FINISHEDDIRTY', 'NOT_USED', 'CANCELED'])('SUBMITTED', 'READY', 'ACTIVE', 'FAILED', 'FINISHED', 'FINISHEDDIRTY', diff --git a/lib/rucio/common/types.py b/lib/rucio/common/types.py index 7c38982b01..c3b0628dd8 100644 --- a/lib/rucio/common/types.py +++ b/lib/rucio/common/types.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Optional, TypedDict, Union +from typing import Any, Callable, Optional, TypedDict, Union class InternalType(object): @@ -104,6 +104,9 @@ def __init__(self, scope, vo='def', fromExternal=True): super(InternalScope, self).__init__(value=scope, vo=vo, fromExternal=fromExternal) +LoggerFunction = Callable[..., Any] + + class RSEDomainLANDict(TypedDict): read: Optional[int] write: Optional[int] diff --git a/lib/rucio/common/utils.py b/lib/rucio/common/utils.py index c2647dab90..34e7b8b013 100644 --- a/lib/rucio/common/utils.py +++ b/lib/rucio/common/utils.py @@ -15,11 +15,13 @@ import argparse import base64 +import copy import datetime import errno import getpass import hashlib import io +import ipaddress import itertools import json import logging @@ -43,6 +45,7 @@ from uuid import uuid4 as uuid from xml.etree import ElementTree +import math import mmap import requests import zlib @@ -63,7 +66,7 @@ if TYPE_CHECKING: from collections.abc import Callable - from typing import TypeVar + from typing import TypeVar, Optional T = TypeVar('T') @@ -347,6 +350,218 @@ def crc32(file): CHECKSUM_ALGO_DICT['crc32'] = crc32 +def _next_pow2(num): + if not num: + return 0 + return math.ceil(math.log2(num)) + + +def _bittorrent_v2_piece_length_pow2(file_size: int) -> int: + """ + Automatically chooses the `piece size` so that `piece layers` + is kept small(er) than usually. This is a balancing act: + having a big piece_length requires more work on bittorrent client + side to validate hashes, but having it small requires more + place to store the `piece layers` in the database. + + Returns the result as the exponent 'x' for power of 2. + To get the actual length in bytes, the caller should compute 2^x. + """ + + # by the bittorrent v2 specification, the piece size is equal to block size = 16KiB + min_piece_len_pow2 = 14 # 2 ** 14 == 16 KiB + if not file_size: + return min_piece_len_pow2 + # Limit the maximum size of pieces_layers hash chain for bittorrent v2, + # because we'll have to store it in the database + max_pieces_layers_size_pow2 = 20 # 2 ** 20 == 1 MiB + # sha256 requires 2 ** 5 == 32 Bytes == 256 bits + hash_size_pow2 = 5 + + # The closest power of two bigger than the file size + file_size_pow2 = _next_pow2(file_size) + + # Compute the target size for the 'pieces layers' in the torrent + # (as power of two: the closest power-of-two smaller than the number) + # Will cap at max_pieces_layers_size for files larger than 1TB. + target_pieces_layers_size = math.sqrt(file_size) + target_pieces_layers_size_pow2 = min(math.floor(math.log2(target_pieces_layers_size)), max_pieces_layers_size_pow2) + target_piece_num_pow2 = max(target_pieces_layers_size_pow2 - hash_size_pow2, 0) + + piece_length_pow2 = max(file_size_pow2 - target_piece_num_pow2, min_piece_len_pow2) + return piece_length_pow2 + + +def bittorrent_v2_piece_length(file_size: int) -> int: + return 2 ** _bittorrent_v2_piece_length_pow2(file_size) + + +def bittorrent_v2_merkle_sha256(file) -> tuple[bytes, bytes, int]: + """ + Compute the .torrent v2 hash tree for the given file. + (http://www.bittorrent.org/beps/bep_0052.html) + In particular, it will return the root of the merkle hash + tree of the file, the 'piece layers' as described in the + previous BEP, and the chosen `piece size` + + This function will read the file in chunks of 16KiB + (which is the imposed block size by bittorrent v2) and compute + the sha256 hash of each block. When enough blocks are read + to form a `piece`, will compute the merkle hash root of the + piece from the hashes of its blocks. At the end, the hashes + of pieces are combined to create the global pieces_root. + """ + + # by the bittorrent v2 specification, the block size and the + # minimum piece size are both fixed to 16KiB + block_size = 16384 + block_size_pow2 = 14 # 2 ** 14 == 16 KiB + # sha256 requires 2 ** 5 == 32 Bytes == 256 bits + hash_size = 32 + + def _merkle_root(leafs: list[bytes], nb_levels: int, padding: bytes) -> bytes: + """ + Build the root of the merkle hash tree from the (possibly incomplete) leafs layer. + If len(leafs) < 2 ** nb_levels, it will be padded with the padding repeated as many times + as needed to have 2 ** nb_levels leafs in total. + """ + nodes = copy.copy(leafs) + level = nb_levels + + while level > 0: + for i in range(2 ** (level - 1)): + node1 = nodes[2 * i] if 2 * i < len(nodes) else padding + node2 = nodes[2 * i + 1] if 2 * i + 1 < len(nodes) else padding + h = hashlib.sha256(node1) + h.update(node2) + if i < len(nodes): + nodes[i] = h.digest() + else: + nodes.append(h.digest()) + level -= 1 + return nodes[0] if nodes else padding + + file_size = os.stat(file).st_size + piece_length_pow2 = _bittorrent_v2_piece_length_pow2(file_size) + + block_per_piece_pow2 = piece_length_pow2 - block_size_pow2 + piece_length = 2 ** piece_length_pow2 + block_per_piece = 2 ** block_per_piece_pow2 + piece_num = math.ceil(file_size / piece_length) + + remaining = file_size + remaining_in_block = min(file_size, block_size) + block_hashes = [] + piece_hashes = [] + current_hash = hashlib.sha256() + block_padding = bytes(hash_size) + with open(file, 'rb') as f: + while True: + data = f.read(remaining_in_block) + if not data: + break + + current_hash.update(data) + + remaining_in_block -= len(data) + remaining -= len(data) + + if not remaining_in_block: + block_hashes.append(current_hash.digest()) + if len(block_hashes) == block_per_piece or not remaining: + piece_hashes.append(_merkle_root(block_hashes, nb_levels=block_per_piece_pow2, padding=block_padding)) + block_hashes = [] + current_hash = hashlib.sha256() + remaining_in_block = min(block_size, remaining) + + if not remaining: + break + + if remaining or remaining_in_block or len(piece_hashes) != piece_num: + raise RucioException(f'Error while computing merkle sha256 of {file}') + + piece_padding = _merkle_root([], nb_levels=block_per_piece_pow2, padding=block_padding) + pieces_root = _merkle_root(piece_hashes, nb_levels=_next_pow2(piece_num), padding=piece_padding) + pieces_layers = b''.join(piece_hashes) if len(piece_hashes) > 1 else b'' + + return pieces_root, pieces_layers, piece_length + + +def merkle_sha256(file) -> str: + """ + The root of the sha256 merkle hash tree with leaf size of 16 KiB. + """ + pieces_root, _, _ = bittorrent_v2_merkle_sha256(file) + return pieces_root.hex() + + +CHECKSUM_ALGO_DICT['merkle_sha256'] = merkle_sha256 + + +def bencode(obj) -> bytes: + """ + Copied from the reference implementation of v2 bittorrent: + http://bittorrent.org/beps/bep_0052_torrent_creator.py + """ + + if isinstance(obj, int): + return b"i" + str(obj).encode() + b"e" + elif isinstance(obj, bytes): + return str(len(obj)).encode() + b":" + obj + elif isinstance(obj, str): + return bencode(obj.encode("utf-8")) + elif isinstance(obj, list): + return b"l" + b"".join(map(bencode, obj)) + b"e" + elif isinstance(obj, dict): + if all(isinstance(i, bytes) for i in obj.keys()): + items = list(obj.items()) + items.sort() + return b"d" + b"".join(map(bencode, itertools.chain(*items))) + b"e" + else: + raise ValueError("dict keys should be bytes " + str(obj.keys())) + raise ValueError("Allowed types: int, bytes, list, dict; not %s", type(obj)) + + +def construct_torrent( + scope: str, + name: str, + length: int, + piece_length: int, + pieces_root: bytes, + pieces_layers: "Optional[bytes]" = None, + trackers: "Optional[list[str]]" = None, +) -> "tuple[str, bytes]": + + torrent_dict = { + b'creation date': int(time.time()), + b'info': { + b'meta version': 2, + b'private': 1, + b'name': f'{scope}:{name}'.encode(), + b'piece length': piece_length, + b'file tree': { + name.encode(): { + b'': { + b'length': length, + b'pieces root': pieces_root, + } + } + } + }, + b'piece layers': {}, + } + if trackers: + torrent_dict[b'announce'] = trackers[0].encode() + if len(trackers) > 1: + torrent_dict[b'announce-list'] = [t.encode() for t in trackers] + if pieces_layers: + torrent_dict[b'piece layers'][pieces_root] = pieces_layers + + torrent_id = hashlib.sha256(bencode(torrent_dict[b'info'])).hexdigest()[:40] + torrent = bencode(torrent_dict) + return torrent_id, torrent + + def str_to_date(string): """ Converts a RFC-1123 string to the corresponding datetime value. @@ -908,6 +1123,27 @@ class Color: END = '\033[0m' +def resolve_ips(hostname: str) -> list[str]: + try: + ipaddress.ip_address(hostname) + return [hostname] + except ValueError: + pass + try: + addrinfo = socket.getaddrinfo(hostname, 0, socket.AF_INET, 0, socket.IPPROTO_TCP) + return [ai[4][0] for ai in addrinfo] + except socket.gaierror: + pass + return [] + + +def resolve_ip(hostname: str) -> str: + ips = resolve_ips(hostname) + if ips: + return ips[0] + return hostname + + def detect_client_location(): """ Normally client IP will be set on the server side (request.remote_addr) diff --git a/lib/rucio/core/request.py b/lib/rucio/core/request.py index d99e3e055b..4c9c8ca112 100644 --- a/lib/rucio/core/request.py +++ b/lib/rucio/core/request.py @@ -781,8 +781,8 @@ def get_and_mark_next( dst_id = res_dict['dest_rse_id'] src_id = res_dict['source_rse_id'] - res_dict['dst_rse'] = rse_collection[dst_id].ensure_loaded(load_name=True) - res_dict['src_rse'] = rse_collection[src_id].ensure_loaded(load_name=True) if src_id is not None else None + res_dict['dst_rse'] = rse_collection[dst_id].ensure_loaded(load_name=True, load_attributes=True) + res_dict['src_rse'] = rse_collection[src_id].ensure_loaded(load_name=True, load_attributes=True) if src_id is not None else None result.append(res_dict) else: diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index 86c615749f..faf1bbc7ce 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -45,14 +45,15 @@ from rucio.db.sqla.constants import DIDType, RequestState, RequestType, TransferLimitDirection from rucio.db.sqla.session import read_session, transactional_session, stream_session from rucio.rse import rsemanager as rsemgr -from rucio.transfertool.transfertool import TransferStatusReport +from rucio.transfertool.transfertool import TransferStatusReport, Transfertool +from rucio.transfertool.bittorrent import BittorrentTransfertool from rucio.transfertool.fts3 import FTS3Transfertool from rucio.transfertool.globus import GlobusTransferTool from rucio.transfertool.mock import MockTransfertool if TYPE_CHECKING: from collections.abc import Callable, Iterator, Iterable, Mapping, Sequence - from typing import Any, Optional + from typing import Any, Optional, Type from sqlalchemy.orm import Session from rucio.common.types import InternalAccount from rucio.core.topology import Topology @@ -72,10 +73,11 @@ DEFAULT_MULTIHOP_TOMBSTONE_DELAY = int(datetime.timedelta(hours=2).total_seconds()) -TRANSFERTOOL_CLASSES_BY_NAME = { +TRANSFERTOOL_CLASSES_BY_NAME: "dict[str, Type[Transfertool]]" = { FTS3Transfertool.external_name: FTS3Transfertool, GlobusTransferTool.external_name: GlobusTransferTool, MockTransfertool.external_name: MockTransfertool, + BittorrentTransfertool.external_name: BittorrentTransfertool, } @@ -157,6 +159,12 @@ def source_url(self, source: RequestSource) -> str: ) return url + def dest_protocol(self): + return self.protocol_factory.protocol(self.dst.rse, self.dst.scheme, self.operation_dest) + + def source_protocol(self, source: RequestSource): + return self.protocol_factory.protocol(source.rse, source.scheme, self.operation_src) + @property def use_ipv4(self): # If any source or destination rse is ipv4 only @@ -1444,7 +1452,7 @@ def prepare_transfers( logger(logging.WARNING, '%s: all available sources were filtered', rws) continue - update_dict: dict[Any, Any] = { + update_dict: "dict[Any, Any]" = { models.Request.state.name: _throttler_request_state( activity=rws.activity, source_rse=selected_source.rse, diff --git a/lib/rucio/daemons/conveyor/poller.py b/lib/rucio/daemons/conveyor/poller.py index 2ad0aa76f7..3d71a14c92 100644 --- a/lib/rucio/daemons/conveyor/poller.py +++ b/lib/rucio/daemons/conveyor/poller.py @@ -45,8 +45,6 @@ from rucio.db.sqla.constants import MYSQL_LOCK_WAIT_TIMEOUT_EXCEEDED, ORACLE_DEADLOCK_DETECTED_REGEX, ORACLE_RESOURCE_BUSY_REGEX, RequestState, RequestType from rucio.transfertool.transfertool import Transfertool from rucio.transfertool.fts3 import FTS3Transfertool -from rucio.transfertool.globus import GlobusTransferTool -from rucio.transfertool.mock import MockTransfertool if TYPE_CHECKING: from rucio.daemons.common import HeartbeatHandler @@ -132,18 +130,22 @@ def _handle_requests( for chunk in dict_chunks(transfers_by_eid, fts_bulk): try: - if transfertool == 'mock': - transfertool_obj = MockTransfertool(external_host=MockTransfertool.external_name) - elif transfertool == 'globus': - transfertool_obj = GlobusTransferTool(external_host=GlobusTransferTool.external_name) - else: + transfertool_cls = transfer_core.TRANSFERTOOL_CLASSES_BY_NAME.get(transfertool, FTS3Transfertool) + + transfertool_kwargs = {} + if transfertool_cls.external_name == FTS3Transfertool.external_name: account = None if oidc_account: if vo: account = InternalAccount(oidc_account, vo=vo) else: account = InternalAccount(oidc_account) - transfertool_obj = FTS3Transfertool(external_host=external_host, vo=vo, oidc_account=account) + transfertool_kwargs.update({ + 'vo': vo, + 'oidc_account': account, + }) + + transfertool_obj = transfertool_cls(external_host=external_host, **transfertool_kwargs) poll_transfers( transfertool_obj=transfertool_obj, transfers_by_eid=chunk, diff --git a/lib/rucio/rse/protocols/bittorrent.py b/lib/rucio/rse/protocols/bittorrent.py new file mode 100644 index 0000000000..ef30255865 --- /dev/null +++ b/lib/rucio/rse/protocols/bittorrent.py @@ -0,0 +1,186 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import logging +import os.path +import time +from urllib.parse import urlparse, urlencode, parse_qs + +from rucio.common import exception +from rucio.common.utils import construct_torrent, resolve_ip +from rucio.rse.protocols.protocol import RSEProtocol +from rucio.rse import rsemanager + +from rucio.common.extra import import_extras + +EXTRA_MODULES = import_extras(['libtorrent']) + +lt = None +if EXTRA_MODULES['libtorrent']: + import libtorrent as lt # pylint: disable=E0401 + +if getattr(rsemanager, 'CLIENT_MODE', None): + from rucio.client.didclient import DIDClient + + def _fetch_meta_client(rse_id: str, scope: str, name: str): + return DIDClient().get_metadata(scope=scope, name=name, plugin='all') + + _fetch_meta = _fetch_meta_client +else: + from rucio.common.types import InternalScope + from rucio.core.did import get_metadata + from rucio.core.rse import get_rse_vo + + def _fetch_meta_server(rse_id: str, scope: str, name: str): + vo = get_rse_vo(rse_id) + return get_metadata(scope=InternalScope(scope, vo=vo), name=name, plugin='all') + + _fetch_meta = _fetch_meta_server + + +class Default(RSEProtocol): + + def __init__(self, protocol_attr, rse_settings, logger=logging.log): + super(Default, self).__init__(protocol_attr, rse_settings, logger=logger) + self.logger = logger + + def lfns2pfns(self, lfns): + pfns = {} + prefix = self.attributes['prefix'] + + if not prefix.startswith('/'): + prefix = ''.join(['/', prefix]) + if not prefix.endswith('/'): + prefix = ''.join([prefix, '/']) + + host_port = '%s:%s' % (self.attributes['hostname'], str(self.attributes['port'])) + + lfns = [lfns] if isinstance(lfns, dict) else lfns + for lfn in lfns: + scope, name = lfn['scope'], lfn['name'] + + if 'path' in lfn and lfn['path'] is not None: + path = lfn['path'] if not lfn['path'].startswith('/') else lfn['path'][1:] + else: + path = self._get_path(scope=scope, name=name) + + scope_name = '%s:%s' % (scope, name) + + query = { + 'x.pe': host_port, + 'x.rucio_scope': scope, + 'x.rucio_name': name, + 'x.rucio_path': ''.join((prefix, path)) + } + pfns[scope_name] = 'magnet:?' + urlencode(query) + + return pfns + + def parse_pfns(self, pfns): + ret = dict() + pfns = [pfns] if isinstance(pfns, str) else pfns + + for pfn in pfns: + parsed = urlparse(pfn) + scheme = parsed.scheme + + query = parse_qs(parsed.query) + host_port = next(iter(query.get('x.pe', [])), ':') + hostname, port = host_port.split(':') + port = int(port) + path = next(iter(query.get('x.rucio_path', [])), '') + scope = next(iter(query.get('x.rucio_scope', [])), '') + name = next(iter(query.get('x.rucio_name', [])), '') + + # Protect against 'lazy' defined prefixes for RSEs in the repository + if not self.attributes['prefix'].startswith('/'): + self.attributes['prefix'] = '/' + self.attributes['prefix'] + if not self.attributes['prefix'].endswith('/'): + self.attributes['prefix'] += '/' + + if self.attributes['hostname'] != hostname: + if self.attributes['hostname'] != 'localhost': # In the database empty hostnames are replaced with localhost but for some URIs (e.g. file) a hostname is not included + raise exception.RSEFileNameNotSupported('Invalid hostname: provided \'%s\', expected \'%s\'' % (hostname, self.attributes['hostname'])) + + if self.attributes['port'] != port: + raise exception.RSEFileNameNotSupported('Invalid port: provided \'%s\', expected \'%s\'' % (port, self.attributes['port'])) + + if not path.startswith(self.attributes['prefix']): + raise exception.RSEFileNameNotSupported('Invalid prefix: provided \'%s\', expected \'%s\'' % ('/'.join(path.split('/')[0:len(self.attributes['prefix'].split('/')) - 1]), + self.attributes['prefix'])) # len(...)-1 due to the leading '/ + + # Spliting parsed.path into prefix, path, filename + prefix = self.attributes['prefix'] + path = path.partition(self.attributes['prefix'])[2] + path = '/'.join(path.split('/')[:-1]) + if not path.startswith('/'): + path = '/' + path + if path != '/' and not path.endswith('/'): + path = path + '/' + ret[pfn] = {'path': path, 'scope': scope, 'name': name, 'scheme': scheme, 'prefix': prefix, 'port': port, 'hostname': hostname, } + + return ret + + def connect(self): + pass + + def close(self): + pass + + def get(self, path, dest, transfer_timeout=None): + if not lt: + raise exception.RucioException('The libtorrent python package is required to perform this operation') + + [lfn] = self.parse_pfns([path]).values() + scope = lfn['scope'] + name = lfn['name'] + hostname = lfn['hostname'] + port = lfn['port'] + + meta = _fetch_meta(rse_id=self.rse['id'], scope=scope, name=name) + pieces_root = base64.b64decode(meta.get('bittorrent_pieces_root', '')) + if not pieces_root: + raise exception.RucioException('Torrent metadata missing. Cannot download file.') + + length = meta.get('bytes') + piece_length = meta.get('bittorrent_piece_length', 0) + pieces_layers = base64.b64decode(meta.get('bittorrent_pieces_layers', '')) + + _, torrent = construct_torrent( + scope=scope, + name=name, + length=length, + piece_length=piece_length, + pieces_root=pieces_root, + pieces_layers=pieces_layers, + ) + + ses = lt.session() # type: ignore # noqa + params = { + 'ti': lt.torrent_info(torrent), # type: ignore # noqa + 'save_path': os.path.dirname(dest), + 'name': os.path.basename(dest), + 'renamed_files': {0: os.path.basename(dest)}, + } + + handle = ses.add_torrent(params) + try: + handle.resume() + handle.connect_peer((resolve_ip(hostname), port)) + while handle.status().progress != 1.0: + time.sleep(0.25) + finally: + ses.remove_torrent(handle) diff --git a/lib/rucio/transfertool/bittorrent.py b/lib/rucio/transfertool/bittorrent.py new file mode 100644 index 0000000000..8fc1281702 --- /dev/null +++ b/lib/rucio/transfertool/bittorrent.py @@ -0,0 +1,197 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import logging +from os import path +from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Type + +from rucio.common import types +from rucio.common.config import config_get +from rucio.common.extra import import_extras +from rucio.common.utils import construct_torrent +from rucio.core.did_meta_plugins import get_metadata +from rucio.transfertool.transfertool import Transfertool, TransferToolBuilder, TransferStatusReport +from .bittorrent_driver import BittorrentDriver + +if TYPE_CHECKING: + from rucio.core.request import DirectTransfer + from rucio.core.rse import RseData + +DRIVER_NAME_RSE_ATTRIBUTE = 'bittorrent_driver' +DRIVER_CLASSES_BY_NAME: dict[str, Type[BittorrentDriver]] = {} + +EXTRA_MODULES = import_extras(['qbittorrentapi']) + +if EXTRA_MODULES['qbittorrentapi']: + from .bittorrent_driver_qbittorrent import QBittorrentDriver + DRIVER_CLASSES_BY_NAME[QBittorrentDriver.external_name] = QBittorrentDriver + + +class BittorrentTransfertool(Transfertool): + """ + Use bittorrent to perform the peer-to-peer transfer. + """ + external_name = 'bittorrent' + supported_schemes = {'magnet'} + + required_rse_attrs = (DRIVER_NAME_RSE_ATTRIBUTE, ) + + def __init__(self, external_host: str, logger: types.LoggerFunction = logging.log) -> None: + super().__init__(external_host=external_host, logger=logger) + + self._drivers_by_rse_id = {} + self.ca_cert, self.ca_key = None, None + + self.tracker = config_get('transfers', 'bittorrent_tracker_addr', raise_exception=False, default=None) + + @classmethod + def _pick_management_api_driver_cls(cls: "Type[BittorrentTransfertool]", rse: "RseData") -> Optional[Type[BittorrentDriver]]: + driver_cls = DRIVER_CLASSES_BY_NAME.get(rse.attributes.get(DRIVER_NAME_RSE_ATTRIBUTE, '')) + if driver_cls is None: + return None + if not all(rse.attributes.get(attribute) is not None for attribute in driver_cls.required_rse_attrs): + return None + return driver_cls + + def _driver_for_rse(self, rse: "RseData") -> Optional[BittorrentDriver]: + driver = self._drivers_by_rse_id.get(rse.id) + if driver: + return driver + + driver_cls = self._pick_management_api_driver_cls(rse) + if not driver_cls: + return None + + driver = driver_cls.make_driver(rse) + self._drivers_by_rse_id[rse.id] = driver + return driver + + @staticmethod + def _get_torrent_meta(scope: "types.InternalScope", name: str) -> tuple[bytes, bytes, int]: + meta = get_metadata(scope=scope, name=name, plugin='all') + pieces_root = base64.b64decode(meta.get('bittorrent_pieces_root', '')) + pieces_layers = base64.b64decode(meta.get('bittorrent_pieces_layers', '')) + piece_length = meta.get('bittorrent_piece_length', 0) + return pieces_root, pieces_layers, piece_length + + @classmethod + def submission_builder_for_path( + cls: "Type[BittorrentTransfertool]", + transfer_path: "list[DirectTransfer]", + logger: types.LoggerFunction = logging.log + ) -> "tuple[list[DirectTransfer], Optional[TransferToolBuilder]]": + hop = transfer_path[0] + if hop.rws.byte_count == 0: + logger(logging.INFO, f"Bittorrent cannot transfer fully empty torrents. Skipping {hop}") + return [], None + + if not cls.can_perform_transfer(hop.src.rse, hop.dst.rse): + logger(logging.INFO, f"The required RSE attributes are not set. Skipping {hop}") + return [], None + + for rse in [hop.src.rse, hop.dst.rse]: + driver_cls = cls._pick_management_api_driver_cls(rse) + if not driver_cls: + logger(logging.INFO, f"The rse '{rse}' is not configured correctly for bittorrent") + return [], None + + pieces_root, _pieces_layers, piece_length = cls._get_torrent_meta(hop.rws.scope, hop.rws.name) + if not pieces_root or not piece_length: + logger(logging.INFO, "The required bittorrent metadata not set on the DID") + return [], None + + return [hop], TransferToolBuilder(cls, external_host='Bittorrent Transfertool') + + def group_into_submit_jobs(self, transfer_paths: "Sequence[list[DirectTransfer]]") -> list[dict[str, Any]]: + return [{'transfers': transfer_path, 'job_params': {}} for transfer_path in transfer_paths] + + @staticmethod + def _connect_directly(torrent_id: str, peers_drivers: Sequence[BittorrentDriver]) -> None: + peer_addr = [] + for i, driver in enumerate(peers_drivers): + peer_addr.append(driver.listen_addr()) + + for driver in peers_drivers: + driver.add_peers(torrent_id=torrent_id, peers=peer_addr) + + def submit(self, transfers: "Sequence[DirectTransfer]", job_params: dict[str, str], timeout: Optional[int] = None) -> str: + [transfer] = transfers + rws = transfer.rws + + tracker = transfer.dst.rse.attributes.get('bittorrent_tracker_addr', self.tracker) + + src_drivers = {} + for source in transfer.sources: + driver = self._driver_for_rse(source.rse) + if driver: + src_drivers[source] = driver + + dst_driver = self._driver_for_rse(transfer.dst.rse) + + if not dst_driver or not src_drivers: + raise Exception('Cannot initialize bittorrent drivers to submit transfers') + + pieces_root, pieces_layers, piece_length = self._get_torrent_meta(rws.scope, rws.name) + torrent_id, torrent = construct_torrent( + scope=str(rws.scope), + name=rws.name, + length=rws.byte_count, + piece_length=piece_length, + pieces_root=pieces_root, + pieces_layers=pieces_layers, + trackers=[tracker] if tracker else None, + ) + + for source, driver in src_drivers.items(): + source_protocol = transfer.source_protocol(source) + [lfn] = source_protocol.parse_pfns([transfer.source_url(source)]).values() + driver.add_torrent( + file_name=rws.name, + file_content=torrent, + download_location=lfn['prefix'] + path.dirname(lfn['path']), + seed_mode=True, + ) + + dest_protocol = transfer.dest_protocol() + [lfn] = dest_protocol.parse_pfns([transfer.dest_url]).values() + dst_driver.add_torrent( + file_name=rws.name, + file_content=torrent, + download_location=lfn['prefix'] + lfn['path'], + ) + + self._connect_directly(torrent_id, [dst_driver] + list(src_drivers.values())) + return torrent_id + + def bulk_query(self, requests_by_eid, timeout: Optional[int] = None) -> Mapping[str, Mapping[str, TransferStatusReport]]: + response = {} + for transfer_id, requests in requests_by_eid.items(): + for request_id, request in requests.items(): + driver = self._driver_for_rse(request['dst_rse']) + if not driver: + self.logger(f'Cannot instantiate BitTorrent driver for {request["dest_rse"]}') + continue + response.setdefault(transfer_id, {})[request_id] = driver.get_status(request_id=request_id, torrent_id=transfer_id) + return response + + def query(self, transfer_ids: Sequence[str], details: bool = False, timeout: Optional[int] = None) -> None: + pass + + def cancel(self, transfer_ids: Sequence[str], timeout: Optional[int] = None) -> None: + pass + + def update_priority(self, transfer_id: str, priority: int, timeout: Optional[int] = None) -> None: + pass diff --git a/lib/rucio/transfertool/bittorrent_driver.py b/lib/rucio/transfertool/bittorrent_driver.py new file mode 100644 index 0000000000..63679fa4f4 --- /dev/null +++ b/lib/rucio/transfertool/bittorrent_driver.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from abc import ABCMeta, abstractmethod +from typing import TYPE_CHECKING, Sequence + +from rucio.common import types + +if TYPE_CHECKING: + from typing import Optional, Type + + from rucio.core.rse import RseData + from rucio.transfertool.transfertool import TransferStatusReport + + +class BittorrentDriver(metaclass=ABCMeta): + external_name = '' + required_rse_attrs = tuple() + + @classmethod + @abstractmethod + def make_driver(cls: "Type[BittorrentDriver]", rse: "RseData", logger: types.LoggerFunction = logging.log) -> "Optional[BittorrentDriver]": + pass + + @abstractmethod + def listen_addr(self) -> tuple[str, int]: + pass + + @abstractmethod + def add_torrent(self, file_name: str, file_content: bytes, download_location: str, seed_mode: bool = False) -> None: + pass + + @abstractmethod + def add_peers(self, torrent_id: str, peers: Sequence[tuple[str, int]]) -> None: + pass + + @abstractmethod + def get_status(self, request_id: str, torrent_id: str) -> "TransferStatusReport": + pass diff --git a/lib/rucio/transfertool/bittorrent_driver_qbittorrent.py b/lib/rucio/transfertool/bittorrent_driver_qbittorrent.py new file mode 100644 index 0000000000..81f57d54c0 --- /dev/null +++ b/lib/rucio/transfertool/bittorrent_driver_qbittorrent.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING, cast, Optional, Sequence +from urllib.parse import urlparse + +import qbittorrentapi + +from rucio.common import types +from rucio.common.config import get_rse_credentials +from rucio.common.utils import resolve_ip +from rucio.core.oidc import request_token +from rucio.db.sqla.constants import RequestState +from rucio.transfertool.transfertool import TransferStatusReport +from .bittorrent_driver import BittorrentDriver + +if TYPE_CHECKING: + from typing import Type + from sqlalchemy.orm import Session + from rucio.core.rse import RseData + + +class QBittorrentTransferStatusReport(TransferStatusReport): + + supported_db_fields = [ + 'state', + 'external_id', + ] + + def __init__(self, request_id: str, external_id: str, qbittorrent_response: Optional[qbittorrentapi.TorrentDictionary]) -> None: + super().__init__(request_id) + + if qbittorrent_response and qbittorrent_response.state_enum.is_complete == 1: + new_state = RequestState.DONE + else: + new_state = RequestState.SUBMITTED + + self.state = new_state + self.external_id = None + if new_state in [RequestState.FAILED, RequestState.DONE]: + self.external_id = external_id + + def initialize(self, session: "Session", logger: types.LoggerFunction = logging.log) -> None: + pass + + def get_monitor_msg_fields(self, session: "Session", logger: types.LoggerFunction = logging.log) -> dict[str, str]: + return {'protocol': 'qbittorrent'} + + +class QBittorrentDriver(BittorrentDriver): + + external_name = 'qbittorrent' + required_rse_attrs = ('qbittorrent_management_address', ) + + @classmethod + def make_driver(cls: "Type[QBittorrentDriver]", rse: "RseData", logger: types.LoggerFunction = logging.log) -> "Optional[BittorrentDriver]": + + address = rse.attributes.get('qbittorrent_management_address') + if not address: + return None + + url = urlparse(address) + token = None + if url.scheme.lower() == 'https': + token = request_token(audience=url.hostname, scope='qbittorrent_admin') + else: + logging.debug(f'{cls.external_name} will not try token authentication. Requires HTTPS.') + + rse_cred = get_rse_credentials().get(rse.id, {}) + username = rse_cred.get('qbittorrent_username') + password = rse_cred.get('qbittorrent_password') + + if not (token or (username and password)): + return None + + return cls( + address=address, + username=username, + password=password, + token=token, + logger=logger, + ) + + def __init__(self, address: str, username: str, password: str, token: Optional[str] = None, logger: types.LoggerFunction = logging.log) -> None: + extra_headers = None + if token: + extra_headers = {'Authorization': 'Bearer ' + token} + + self.client = qbittorrentapi.Client( + host=address, + username=username, + password=password, + EXTRA_HEADERS=extra_headers, + FORCE_SCHEME_FROM_HOST=True, + ) + self.logger = logger + + def listen_addr(self) -> tuple[str, int]: + preferences = self.client.app_preferences() + port = cast(int, preferences['listen_port']) + ip = resolve_ip(urlparse(self.client.host).hostname or self.client.host) + return ip, port + + def add_torrent(self, file_name: str, file_content: bytes, download_location: str, seed_mode: bool = False) -> None: + self.client.torrents_add( + rename=file_name, + torrent_files=file_content, + save_path=download_location, + is_skip_checking=seed_mode, + is_sequential_download=True, + ) + + def add_peers(self, torrent_id: str, peers: Sequence[tuple[str, int]]) -> None: + self.client.torrents_add_peers(torrent_hashes=[torrent_id], peers=[f'{ip}:{port}' for ip, port in peers]) + + def get_status(self, request_id: str, torrent_id: str) -> TransferStatusReport: + info = self.client.torrents_info(torrent_hashes=[torrent_id]) + return QBittorrentTransferStatusReport(request_id, external_id=torrent_id, qbittorrent_response=info[0] if info else None) diff --git a/lib/rucio/transfertool/transfertool.py b/lib/rucio/transfertool/transfertool.py index 9956405845..6805288def 100644 --- a/lib/rucio/transfertool/transfertool.py +++ b/lib/rucio/transfertool/transfertool.py @@ -118,7 +118,7 @@ class Transfertool(object, metaclass=ABCMeta): external_name = '' required_rse_attrs = () - supported_schemes = set(SUPPORTED_PROTOCOLS) + supported_schemes = set(SUPPORTED_PROTOCOLS).difference(('magnet', )) def __init__(self, external_host, logger=logging.log): """ diff --git a/requirements.txt b/requirements.txt index 28532a404f..3ff2e95e9c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -37,6 +37,8 @@ PyYAML==6.0.1 # globus_extras and globus-sdk==3.32.0 # globus_extras python3-saml==1.16.0 # saml_extras pymongo==4.6.0 # pymongo (metadata plugin) +libtorrent==2.0.9 # Support for the bittorrent transfertool +qbittorrent-api==2023.11.57 # qBittorrent plugin for the bittorrent tranfsertool # All dependencies needed to develop/test rucio should be defined here pytest==7.4.3 diff --git a/tests/conftest.py b/tests/conftest.py index 884e7cab49..851357e565 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -163,6 +163,13 @@ def dirac_client(): return DiracClient() +@pytest.fixture +def download_client(): + from rucio.client.downloadclient import DownloadClient + + return DownloadClient() + + @pytest.fixture def rest_client(): from rucio.tests.common import print_response diff --git a/tests/test_conveyor.py b/tests/test_conveyor.py index 4a21a2d93c..9846c80bc1 100644 --- a/tests/test_conveyor.py +++ b/tests/test_conveyor.py @@ -16,6 +16,7 @@ import threading import time from datetime import datetime, timedelta +from tempfile import TemporaryDirectory from unittest.mock import patch from urllib.parse import urlencode, urlparse, parse_qsl, urlunparse from sqlalchemy import update @@ -24,7 +25,7 @@ import rucio.daemons.reaper.reaper from rucio.common.types import InternalAccount -from rucio.common.utils import generate_uuid +from rucio.common.utils import generate_uuid, adler32 from rucio.common.exception import ReplicaNotFound, RequestNotFound from rucio.core import config as core_config from rucio.core import did as did_core @@ -56,6 +57,11 @@ TEST_FTS_HOST = 'https://fts:8446' +@transactional_session +def __update_request(request_id, *, session=None, **kwargs): + session.query(models.Request).filter_by(id=request_id).update(kwargs, synchronize_session=False) + + def __wait_for_replica_transfer(dst_rse_id, scope, name, max_wait_seconds=MAX_POLL_WAIT_SECONDS, transfertool=None): """ Wait for the replica to become AVAILABLE on the given RSE as a result of a pending transfer @@ -223,6 +229,7 @@ def __fake_source_ranking(*, session=None): assert __get_source(request_id=request['id'], src_rse_id=src_rse2_id, **did).ranking == 0 # Only group_bulk=1 part of the path was submitted. # run submitter again to copy from jump rse to destination rse + __update_request(request_core.get_request_by_did(rse_id=dst_rse_id, **did)['id'], last_processed_by=None) submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], partition_wait_time=0, transfertype='single', filter_transfertool=None) # Wait for the destination replica to become ready @@ -1007,10 +1014,6 @@ def test_lost_transfers(rse_factory, did_factory, root_account): rule_core.add_rule(dids=[did], account=root_account, copies=1, rse_expression=dst_rse, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None) - @transactional_session - def __update_request(request_id, *, session=None, **kwargs): - session.query(models.Request).filter_by(id=request_id).update(kwargs, synchronize_session=False) - # Fake that the transfer is submitted and lost submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], group_bulk=2, partition_wait_time=0, transfertype='single', filter_transfertool=None) request = request_core.get_request_by_did(rse_id=dst_rse_id, **did) @@ -1391,7 +1394,6 @@ def bulk_query(self, requests_by_eid, timeout=None): submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], group_bulk=2, partition_wait_time=0, transfertype='single', filter_transfertool=None) assert sorted(certs_used_by_submitter) == ['DEFAULT_DUMMY_CERT', 'NEW_VO_DUMMY_CERT'] - with patch('rucio.daemons.conveyor.poller.FTS3Transfertool', _FTSWrapper): poller(once=True, older_than=0, partition_wait_time=0) assert sorted(certs_used_by_poller) == ['DEFAULT_DUMMY_CERT', 'NEW_VO_DUMMY_CERT'] @@ -1503,6 +1505,7 @@ def on_submit(file): replica_core.get_replica(rse_id=rse_id, **did) # Final hop + __update_request(request_core.get_request_by_did(rse_id=rse_id_queued, **did)['id'], last_processed_by=None) submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], group_bulk=10, partition_wait_time=0, transfertype='single', filter_transfertool=None) replica = __wait_for_replica_transfer(dst_rse_id=rse_id_queued, **did) assert replica['state'] == ReplicaState.AVAILABLE @@ -1657,3 +1660,47 @@ def __setup_test(): preparer(once=True, sleep_time=1, bulk=100, partition_wait_time=0, ignore_availability=True) request = request_core.get_request_by_did(rse_id=dst_rse_id, **did) assert request['state'] == RequestState.QUEUED + + +@skip_rse_tests_with_accounts +@pytest.mark.noparallel(groups=[NoParallelGroups.XRD, NoParallelGroups.SUBMITTER, NoParallelGroups.POLLER, NoParallelGroups.FINISHER]) +@pytest.mark.parametrize("file_config_mock", [{ + "overrides": [('client', 'register_bittorrent_meta', 'true')] +}], indirect=True) +def test_bittorrent_submission(did_factory, root_account, vo, download_client, file_config_mock): + src_rse = 'WEB1' + src_rse_id = rse_core.get_rse_id(rse=src_rse, vo=vo) + dst_rse = 'XRD5' + dst_rse_id = rse_core.get_rse_id(rse=dst_rse, vo=vo) + all_rses = [src_rse_id, dst_rse_id] + + did = did_factory.upload_test_file(src_rse) + + rule_core.add_rule(dids=[did], account=root_account, copies=1, rse_expression=dst_rse, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None) + + mocked_credentials = { + src_rse_id: { + "qbittorrent_username": "rucio", + "qbittorrent_password": "rucio90df" + }, + dst_rse_id: { + "qbittorrent_username": "rucio", + "qbittorrent_password": "rucio90df" + } + } + with patch('rucio.transfertool.bittorrent_driver_qbittorrent.get_rse_credentials', return_value=mocked_credentials): + submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], group_bulk=2, partition_wait_time=0, transfertools=['bittorrent'], filter_transfertool=None) + request = request_core.get_request_by_did(rse_id=dst_rse_id, **did) + assert request['state'] == RequestState.SUBMITTED + + replica = __wait_for_replica_transfer(dst_rse_id=dst_rse_id, max_wait_seconds=MAX_POLL_WAIT_SECONDS, transfertool='bittorrent', **did) + assert replica['state'] == ReplicaState.AVAILABLE + + with TemporaryDirectory() as tmp_dir: + download_client.download_dids([{ + 'did': '{scope}:{name}'.format(**did), + 'base_dir': tmp_dir, + 'rse': dst_rse, + 'no_subdir': True, + }]) + assert adler32(f'{tmp_dir}/{did["name"]}') == did_core.get_did(**did)['adler32'] diff --git a/tests/test_upload.py b/tests/test_upload.py index b245adfc74..7c62f8bf99 100644 --- a/tests/test_upload.py +++ b/tests/test_upload.py @@ -21,7 +21,6 @@ from tempfile import TemporaryDirectory from unittest.mock import patch -from rucio.client.downloadclient import DownloadClient from rucio.client.uploadclient import UploadClient from rucio.common.config import config_add_section, config_set from rucio.common.exception import InputValidationError, NoFilesUploaded, NotAllFilesUploaded @@ -37,11 +36,6 @@ def upload_client(): return UploadClient(logger=logger) -@pytest.fixture -def download_client(): - return DownloadClient() - - @pytest.fixture def rse(containerized_rses, rse_factory): if len(containerized_rses) > 0: diff --git a/tests/test_utils.py b/tests/test_utils.py index a12f89b25a..2c61d44b9b 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -15,12 +15,13 @@ import datetime import logging +import os from re import match import pytest from rucio.common.exception import InvalidType -from rucio.common.utils import md5, adler32, parse_did_filter_from_string, Availability, retrying +from rucio.common.utils import md5, adler32, parse_did_filter_from_string, Availability, retrying, bittorrent_v2_merkle_sha256 from rucio.common.logging import formatted_logger @@ -181,3 +182,26 @@ def retry_on_attribute_error(): with pytest.raises(ValueError): retry_on_attribute_error() assert len(attempts) == 1 + + +def test_bittorrent_sa256_merkle(file_factory): + + def _sha256_merkle_via_libtorrent(file, piece_size=0): + import libtorrent as lt + file = str(file) + fs = lt.file_storage() + lt.add_files(fs, file) + t = lt.create_torrent(fs, flags=lt.create_torrent.v2_only, piece_size=piece_size) + lt.set_piece_hashes(t, os.path.dirname(file)) + + torrent = t.generate() + pieces_root = next(iter(next(iter(torrent[b'info'][b'file tree'].values())).values()))[b'pieces root'] + pieces_layers = torrent.get(b'piece layers', {}).get(pieces_root, b'') + piece_size = t.piece_length() + + return pieces_root, pieces_layers, piece_size + + for size in (1, 333, 1024, 16384, 16390, 32768, 32769, 49152, 65530, 65536, 81920, 2**20 - 2**17, 2**20, 2**20 + 2): + file = file_factory.file_generator(size=size) + root, layers, piece_size = bittorrent_v2_merkle_sha256(file) + assert (root, layers, piece_size) == _sha256_merkle_via_libtorrent(file, piece_size=piece_size) diff --git a/tools/docker_activate_rses.sh b/tools/docker_activate_rses.sh index fe278058ec..3e6eab7923 100755 --- a/tools/docker_activate_rses.sh +++ b/tools/docker_activate_rses.sh @@ -50,10 +50,12 @@ rucio-admin rse add-protocol --hostname xrd3 --scheme root --prefix //rucio --po rucio-admin rse add-protocol --hostname xrd4 --scheme root --prefix //rucio --port 1097 --impl rucio.rse.protocols.xrootd.Default --domain-json '{"wan": {"read": 1, "write": 1, "delete": 1, "third_party_copy_read": 1, "third_party_copy_write": 1}, "lan": {"read": 1, "write": 1, "delete": 1}}' XRD4 rucio-admin rse add-protocol --hostname xrd5 --scheme root --prefix //rucio --port 1098 --impl rucio.rse.protocols.xrootd.Default --domain-json '{"wan": {"read": 1, "write": 1, "delete": 1, "third_party_copy_read": 1, "third_party_copy_write": 1}, "lan": {"read": 1, "write": 1, "delete": 1}}' XRD5 rucio-admin rse add-protocol --hostname xrd5 --scheme davs --prefix //rucio --port 1098 --impl rucio.rse.protocols.gfal.Default --domain-json '{"wan": {"read": 2, "write": 2, "delete": 2, "third_party_copy_read": 2, "third_party_copy_write": 2}, "lan": {"read": 2, "write": 2, "delete": 2}}' XRD5 +rucio-admin rse add-protocol --hostname xrd5 --scheme magnet --prefix //rucio --port 10000 --impl rucio.rse.protocols.bittorrent.Default --domain-json '{"wan": {"read": 3, "write": 0, "delete": 0, "third_party_copy_read": 3, "third_party_copy_write": 3}, "lan": {"read": 3, "write": 0, "delete": 0}}' XRD5 rucio-admin rse add-protocol --hostname ssh1 --scheme scp --prefix /rucio --port 22 --impl rucio.rse.protocols.ssh.Default --domain-json '{"wan": {"read": 1, "write": 1, "delete": 1, "third_party_copy_read": 1, "third_party_copy_write": 1}, "lan": {"read": 1, "write": 1, "delete": 1}}' SSH1 rucio-admin rse add-protocol --hostname ssh1 --scheme rsync --prefix /rucio --port 22 --impl rucio.rse.protocols.ssh.Rsync --domain-json '{"wan": {"read": 2, "write": 2, "delete": 2, "third_party_copy_read": 2, "third_party_copy_write": 2}, "lan": {"read": 2, "write": 2, "delete": 2}}' SSH1 rucio-admin rse add-protocol --hostname ssh1 --scheme rclone --prefix /rucio --port 22 --impl rucio.rse.protocols.rclone.Default --domain-json '{"wan": {"read": 3, "write": 3, "delete": 3, "third_party_copy_read": 3, "third_party_copy_write": 3}, "lan": {"read": 3, "write": 3, "delete": 3}}' SSH1 rucio-admin rse add-protocol --hostname web1 --scheme davs --prefix /rucio --port 443 --impl rucio.rse.protocols.gfal.Default --domain-json '{"wan": {"read": 1, "write": 1, "delete": 1, "third_party_copy_read": 1, "third_party_copy_write": 2}, "lan": {"read": 1, "write": 1, "delete": 1}}' WEB1 +rucio-admin rse add-protocol --hostname web1 --scheme magnet --prefix /var/www/webdav/data/rucio/ --port 10000 --impl rucio.rse.protocols.bittorrent.Default --domain-json '{"wan": {"read": 2, "write": 0, "delete": 0, "third_party_copy_read": 2, "third_party_copy_write": 2}, "lan": {"read": 2, "write": 0, "delete": 0}}' WEB1 # Set test_container_xrd attribute for xrd containers rucio-admin rse set-attribute --rse XRD1 --key test_container_xrd --value True @@ -62,8 +64,14 @@ rucio-admin rse set-attribute --rse XRD3 --key test_container_xrd --value True rucio-admin rse set-attribute --rse XRD4 --key test_container_xrd --value True rucio-admin rse set-attribute --rse SSH1 --key test_container_ssh --value True rucio-admin rse set-attribute --rse XRD5 --key oidc_support --value True +rucio-admin rse set-attribute --rse XRD5 --key bittorrent_driver --value qbittorrent +rucio-admin rse set-attribute --rse XRD5 --key qbittorrent_management_address --value https://xrd5:8098/ +rucio-admin rse set-attribute --rse XRD5 --key bittorrent_tracker_addr --value http://xrd5:10001/announce rucio-admin rse set-attribute --rse WEB1 --key oidc_support --value True rucio-admin rse set-attribute --rse WEB1 --key verify_checksum --value False +rucio-admin rse set-attribute --rse WEB1 --key bittorrent_driver --value qbittorrent +rucio-admin rse set-attribute --rse WEB1 --key qbittorrent_management_address --value https://web1:8099/ +rucio-admin rse set-attribute --rse WEB1 --key bittorrent_tracker_addr --value http://web1:10001/announce # Workaround, xrootd.py#connect returns with Auth Failed due to execution of the command in subprocess XrdSecPROTOCOL=gsi XRD_REQUESTTIMEOUT=10 XrdSecGSISRVNAMES=xrd1 xrdfs xrd1:1094 query config xrd1:1094