Skip to content

Add 1-Way and 2-Way TLS Support to Bulk Import Functions #2672

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 16, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 62 additions & 7 deletions pymilvus/bulk_writer/bulk_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import json
import logging
from typing import List, Optional
from typing import List, Optional, Union

import requests

Expand Down Expand Up @@ -45,11 +45,38 @@ def _handle_response(url: str, res: json):


def _post_request(
url: str, api_key: str, params: {}, timeout: int = 20, **kwargs
url: str,
api_key: str,
params: {},
timeout: int = 20,
verify: Optional[Union[bool, str]] = True,
cert: Optional[Union[str, tuple]] = None,
**kwargs,
) -> requests.Response:
"""Send a POST request with 1-way / 2-way optional certificate validation

Args:
url (str): The endpoint URL
api_key (str): API key for authentication
params (dict): JSON parameters for the request
timeout (int): Timeout for the request
verify (bool, str, optional): Either a boolean, to verify the server's TLS certificate
or a string, which must be server's certificate path. Defaults to `True`.
cert (str, tuple, optional): if String, path to ssl client cert file.
if Tuple, ('cert', 'key') pair.

Returns:
requests.Response: Response object.
"""
try:
resp = requests.post(
url=url, headers=_http_headers(api_key), json=params, timeout=timeout, **kwargs
url=url,
headers=_http_headers(api_key),
json=params,
timeout=timeout,
verify=verify,
cert=cert,
**kwargs,
)
if resp.status_code != 200:
_throw(f"Failed to post url: {url}, status code: {resp.status_code}")
Expand Down Expand Up @@ -85,6 +112,8 @@ def bulk_import(
api_key: str = "",
access_key: str = "",
secret_key: str = "",
verify: Optional[Union[bool, str]] = True,
cert: Optional[Union[str, tuple]] = None,
**kwargs,
) -> requests.Response:
"""call bulkinsert restful interface to import files
Expand All @@ -103,6 +132,10 @@ def bulk_import(
api_key (str): API key to authenticate your requests.
access_key (str): access key to access the object storage
secret_key (str): secret key to access the object storage
verify (bool, str, optional): Either a boolean, to verify the server's TLS certificate
or a string, which must be server's certificate path. Defaults to `True`.
cert (str, tuple, optional): if String, path to ssl client cert file.
if Tuple, ('cert', 'key') pair.

Returns:
response of the restful interface
Expand All @@ -125,13 +158,21 @@ def bulk_import(
if isinstance(options, dict):
params["options"] = options

resp = _post_request(url=request_url, api_key=api_key, params=params, **kwargs)
resp = _post_request(
url=request_url, api_key=api_key, params=params, verify=verify, cert=cert, **kwargs
)
_handle_response(request_url, resp.json())
return resp


def get_import_progress(
url: str, job_id: str, cluster_id: str = "", api_key: str = "", **kwargs
url: str,
job_id: str,
cluster_id: str = "",
api_key: str = "",
verify: Optional[Union[bool, str]] = True,
cert: Optional[Union[str, tuple]] = None,
**kwargs,
) -> requests.Response:
"""get job progress

Expand All @@ -140,6 +181,10 @@ def get_import_progress(
job_id (str): a job id
cluster_id (str): id of a milvus instance(for cloud)
api_key (str): API key to authenticate your requests.
verify (bool, str, optional): Either a boolean, to verify the server's TLS certificate
or a string, which must be server's certificate path. Defaults to `True`.
cert (str, tuple, optional): if String, path to ssl client cert file.
if Tuple, ('cert', 'key') pair.

Returns:
response of the restful interface
Expand All @@ -151,7 +196,9 @@ def get_import_progress(
"clusterId": cluster_id,
}

resp = _post_request(url=request_url, api_key=api_key, params=params, **kwargs)
resp = _post_request(
url=request_url, api_key=api_key, params=params, verify=verify, cert=cert, **kwargs
)
_handle_response(request_url, resp.json())
return resp

Expand All @@ -163,6 +210,8 @@ def list_import_jobs(
api_key: str = "",
page_size: int = 10,
current_page: int = 1,
verify: Optional[Union[bool, str]] = True,
cert: Optional[Union[str, tuple]] = None,
**kwargs,
) -> requests.Response:
"""list jobs in a cluster
Expand All @@ -174,6 +223,10 @@ def list_import_jobs(
api_key (str): API key to authenticate your requests.
page_size (int): pagination size
current_page (int): pagination
verify (bool, str, optional): Either a boolean, to verify the server's TLS certificate
or a string, which must be server's certificate path. Defaults to `True`.
cert (str, tuple, optional): if String, path to ssl client cert file.
if Tuple, ('cert', 'key') pair.

Returns:
response of the restful interface
Expand All @@ -187,6 +240,8 @@ def list_import_jobs(
"currentPage": current_page,
}

resp = _post_request(url=request_url, api_key=api_key, params=params, **kwargs)
resp = _post_request(
url=request_url, api_key=api_key, params=params, verify=verify, cert=cert, **kwargs
)
_handle_response(request_url, resp.json())
return resp