Skip to content

Commit

Permalink
Transfers: bittorrent transfertool. Close rucio#6479
Browse files Browse the repository at this point in the history
Additional metadata is added to each file did: the merkle root and
piece layers used by the bittorrent v2 format. This way, we can
reconstruct the .torrent files from this data, allowing us to
transfer files directly between RSEs using bittorrent clients
running on each of the RSEs.

This, initial implementation, relies on the qBittorrent client, but
other clients which support bittorrent v2 could be added later.

A new RSE protocol was needed for the task. The protocol must be
configured with the hostname+port of the bittorrent's data channel
and the 'magnet' scheme. We use custom extensions to the magnet
format to store the name/scope/path of a replica, so the link is not
currently importable into existing torrent clients, but this leaves
the door open for the future. It would be possible to generate magnet
links which actually work with such clients directly in list-replicas.
  • Loading branch information
Radu Carpa authored and bari12 committed Jan 26, 2024
1 parent 43db233 commit 16951cc
Show file tree
Hide file tree
Showing 25 changed files with 1,138 additions and 38 deletions.
135 changes: 135 additions & 0 deletions etc/docker/dev/configure_qbittorrent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# -*- coding: utf-8 -*-
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import re
import os
import sys
import time
import urllib.request
import urllib.parse


def wait_for_server(host, port, max_wait):
timer = 0
while timer < max_wait:
try:
response = urllib.request.urlopen(urllib.request.Request(f'http://{host}:{port}'))
if response.status == 200:
return
except Exception:
if timer > max_wait:
raise
time.sleep(1)
timer += 1


def configure(
host,
port,
username,
password,
new_config,
):
scheme = 'http'
headers = {
'content-type': 'application/x-www-form-urlencoded'
}

# Authenticate
req = urllib.request.Request(
f'{scheme}://{host}:{port}/api/v2/auth/login',
headers=headers,
data=urllib.parse.urlencode({
'username': username,
'password': password
}).encode(),
method='POST',
)
response = urllib.request.urlopen(req)
headers['Cookie'] = response.getheader("Set-Cookie")

# Update the config
req = urllib.request.Request(
f'{scheme}://{host}:{port}/api/v2/app/setPreferences',
headers=headers,
data=urllib.parse.urlencode({'json': json.dumps(new_config)}).encode(),
method='POST',
)
urllib.request.urlopen(req)

scheme = 'https' if new_config.get('use_https') else scheme
port = new_config.get('web_ui_port', port)

# Logout
req = urllib.request.Request(f'{scheme}://{host}:{port}/api/v2/auth/logout', headers=headers, method='POST')
urllib.request.urlopen(req)


if __name__ == "__main__":
initial_config_done = False
extract_password = re.compile(r'.*: ([23456789ABCDEFGHIJKLMNPQRSTUVWXYZabcdefghjkmnpqrstuvwxyz]{9})$')
for line in sys.stdin:
if initial_config_done:
continue

automatic_password = extract_password.search(line)
if not automatic_password:
continue
automatic_password = automatic_password.group(1)

port = 8080
host = 'localhost'

wait_for_server(host=host, port=port, max_wait=60)

config = {
'listen_port': int(os.environ.get('QBITTORRENT_LISTEN_PORT', 10000)),
# 'ssl_enabled': True,
# 'ssl_listen_port': 20000,
'upnp': False,
'dht': False,
'pex': False,
'lsd': False,
'encryption': 1, # require encryption
'bypass_local_auth': False,
'web_ui_upnp': False,
# 'web_ui_address': '',
'enable_embedded_tracker': True,
'embedded_tracker_port': int(os.environ.get('QBITTORRENT_TRACKER_PORT', 10001)),
'enable_multi_connections_from_same_ip': True,
}

if os.environ.get('QBITTORRENT_UI_PASSWORD'):
config['web_ui_password'] = os.environ['QBITTORRENT_UI_PASSWORD']
if os.environ.get('QBITTORRENT_UI_USERNAME'):
config['web_ui_username'] = os.environ['QBITTORRENT_UI_USERNAME']
if os.environ.get('QBITTORRENT_UI_PORT'):
config['web_ui_port'] = os.environ['QBITTORRENT_UI_PORT']

if os.environ.get('QBITTORRENT_UI_CERT') and os.environ.get('QBITTORRENT_UI_KEY'):
config['use_https'] = True
config['web_ui_https_cert_path'] = os.environ['QBITTORRENT_UI_CERT']
config['web_ui_https_key_path'] = os.environ['QBITTORRENT_UI_KEY']

configure(
host=host,
port=port,
username='admin',
password=automatic_password,
new_config=config,
)

initial_config_done = True
4 changes: 4 additions & 0 deletions etc/docker/dev/docker-compose.ports.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ services:
xrd5:
ports:
- "127.0.0.1:1098:1098"
- "127.0.0.1:8098:8098"
web1:
ports:
- "127.0.0.1:8099:8099"
minio:
ports:
- "127.0.0.1:9000:9000"
Expand Down
13 changes: 12 additions & 1 deletion etc/docker/dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,14 @@ services:
environment:
- XRDHOST=xrd5
- XRDPORT=1098
- QBITTORRENT_UI_USERNAME=rucio
- QBITTORRENT_UI_PASSWORD=rucio90df
- QBITTORRENT_UI_PORT=8098
- QBITTORRENT_LISTEN_PORT=10000
volumes:
- ./xrd/entrypoint.sh:/docker-entrypoint.sh:ro
- ./xrd:/configs:ro
- ./configure_qbittorrent.py:/configure_qbittorrent.py:ro
- ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:z
- ../../certs/hostcert_xrd5.pem:/tmp/xrdcert.pem:Z
- ../../certs/hostcert_xrd5.key.pem:/tmp/xrdkey.pem:Z
Expand All @@ -228,8 +233,14 @@ services:
hard: 10240
web1:
image: rucio/webdav
command: bash -c "a2dismod want_digest; apache2 -D FOREGROUND"
environment:
- QBITTORRENT_UI_USERNAME=rucio
- QBITTORRENT_UI_PASSWORD=rucio90df
- QBITTORRENT_UI_PORT=8099
- QBITTORRENT_LISTEN_PORT=10000
volumes:
- ./web1/entrypoint.sh:/usr/local/bin/docker-entrypoint.sh:ro
- ./configure_qbittorrent.py:/configure_qbittorrent.py:ro
- ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:ro
- ../../certs/hostcert_web1.key.pem:/etc/grid-security/hostkey.pem:ro
- ../../certs/hostcert_web1.pem:/etc/grid-security/hostcert.pem:Z
Expand Down
34 changes: 34 additions & 0 deletions etc/docker/dev/web1/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/bash
# -*- coding: utf-8 -*-
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -e

. /etc/apache2/envvars

a2dismod want_digest
a2dismod zgridsite
a2ensite default-ssl
a2dissite default

if [ -n "$QBITTORRENT_UI_PORT" ]
then
export QBITTORRENT_UI_CERT=/etc/grid-security/hostcert.pem
export QBITTORRENT_UI_KEY=/etc/grid-security/hostkey.pem
chown www-data /var/www/
su -s /bin/bash -c qbittorrent-nox www-data | tee >(python3 /configure_qbittorrent.py) &
fi

exec "$@"
9 changes: 7 additions & 2 deletions etc/docker/dev/xrd/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ cp /tmp/xrdkey.pem /etc/grid-security/xrd/xrdkey.pem
chown -R xrootd:xrootd /etc/grid-security/xrd
chmod 0400 /etc/grid-security/xrd/xrdkey.pem

xrootd -R xrootd -n rucio -c /etc/xrootd/xrdrucio.cfg
if [ -n "$QBITTORRENT_UI_PORT" ]
then
export QBITTORRENT_UI_CERT=/tmp/xrdcert.pem
export QBITTORRENT_UI_KEY=/tmp/xrdkey.pem
su -s /bin/bash -c qbittorrent-nox xrootd | tee >(python3 /configure_qbittorrent.py) &
fi

exec "$@"
xrootd -R xrootd -n rucio -c /etc/xrootd/xrdrucio.cfg
2 changes: 1 addition & 1 deletion etc/docker/test/extra/rucio_autotests_common.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ carbon_port = 8125
user_scope = travis

[conveyor]
scheme = srm,root,davs,gsiftp,http,https,mock,file
scheme = srm,root,davs,gsiftp,http,https,mock,file,magnet
transfertool = fts3
ftshosts = https://fts:8446
cacert = /opt/rucio/etc/rucio_ca.pem
Expand Down
2 changes: 1 addition & 1 deletion etc/docker/test/extra/rucio_default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ carbon_port = 8125
user_scope = docker

[conveyor]
scheme = https,davs,gsiftp,root,srm,mock,file
scheme = https,davs,gsiftp,root,srm,mock,file,magnet
#scheme = https
#user_transfers = cms
#user_activities = ['dummy_user_activity']
Expand Down
18 changes: 16 additions & 2 deletions lib/rucio/client/uploadclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import base64
import copy
import json
import logging
Expand All @@ -24,13 +25,13 @@

from rucio import version
from rucio.client.client import Client
from rucio.common.config import config_get_int, config_get
from rucio.common.config import config_get_int, config_get, config_get_bool
from rucio.common.exception import (RucioException, RSEWriteBlocked, DataIdentifierAlreadyExists, RSEOperationNotSupported,
DataIdentifierNotFound, NoFilesUploaded, NotAllFilesUploaded, FileReplicaAlreadyExists,
ResourceTemporaryUnavailable, ServiceUnavailable, InputValidationError, RSEChecksumUnavailable,
ScopeNotFound)
from rucio.common.utils import (adler32, detect_client_location, execute, generate_uuid, make_valid_did, md5, send_trace,
retry, GLOBALLY_SUPPORTED_CHECKSUMS)
retry, bittorrent_v2_merkle_sha256, GLOBALLY_SUPPORTED_CHECKSUMS)
from rucio.rse import rsemanager as rsemgr


Expand Down Expand Up @@ -336,6 +337,18 @@ def _pick_random_rse(rse_expression):
raise NotAllFilesUploaded()
return 0

def _add_bittorrent_meta(self, file, logger):
if not config_get_bool('client', 'register_bittorrent_meta', default=False):
return

pieces_root, pieces_layers, piece_length = bittorrent_v2_merkle_sha256(os.path.join(file['dirname'], file['basename']))
bittorrent_meta = {
'bittorrent_pieces_root': base64.b64encode(pieces_root).decode(),
'bittorrent_pieces_layers': base64.b64encode(pieces_layers).decode(),
'bittorrent_piece_length': piece_length,
}
self.client.set_metadata_bulk(scope=file['did_scope'], name=file['did_name'], meta=bittorrent_meta)

def _register_file(self, file, registered_dataset_dids, ignore_availability=False, activity=None):
"""
Registers the given file in Rucio. Creates a dataset if
Expand Down Expand Up @@ -410,6 +423,7 @@ def _register_file(self, file, registered_dataset_dids, ignore_availability=Fals
except DataIdentifierNotFound:
logger(logging.DEBUG, 'File DID does not exist')
self.client.add_replicas(rse=rse, files=[replica_for_api])
self._add_bittorrent_meta(file=file, logger=logger)
logger(logging.INFO, 'Successfully added replica in Rucio catalogue at %s' % rse)
if not dataset_did_str:
# only need to add rules for files if no dataset is given
Expand Down
2 changes: 1 addition & 1 deletion lib/rucio/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
SCHEME_MAP['srm'].append('davs')
SCHEME_MAP['davs'].append('srm')

SUPPORTED_PROTOCOLS = ['gsiftp', 'srm', 'root', 'davs', 'http', 'https', 'file', 'storm', 'srm+https', 'scp', 'rsync', 'rclone']
SUPPORTED_PROTOCOLS = ['gsiftp', 'srm', 'root', 'davs', 'http', 'https', 'file', 'storm', 'srm+https', 'scp', 'rsync', 'rclone', 'magnet']

FTS_STATE = namedtuple('FTS_STATE', ['SUBMITTED', 'READY', 'ACTIVE', 'FAILED', 'FINISHED', 'FINISHEDDIRTY', 'NOT_USED',
'CANCELED'])('SUBMITTED', 'READY', 'ACTIVE', 'FAILED', 'FINISHED', 'FINISHEDDIRTY',
Expand Down
5 changes: 4 additions & 1 deletion lib/rucio/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Optional, TypedDict, Union
from typing import Any, Callable, Optional, TypedDict, Union


class InternalType(object):
Expand Down Expand Up @@ -104,6 +104,9 @@ def __init__(self, scope, vo='def', fromExternal=True):
super(InternalScope, self).__init__(value=scope, vo=vo, fromExternal=fromExternal)


LoggerFunction = Callable[..., Any]


class RSEDomainLANDict(TypedDict):
read: Optional[int]
write: Optional[int]
Expand Down
Loading

0 comments on commit 16951cc

Please sign in to comment.