From 4c34ef012091f71e73c896cb8afc9d1150ca3930 Mon Sep 17 00:00:00 2001 From: Steve Dower Date: Fri, 29 May 2026 12:43:15 +0100 Subject: [PATCH 1/3] Add URL escaping/unescaping for winhttp URL functions. Fixes #351 --- src/_native/winhttp.cpp | 155 +++++++++++++++++++++++++++++----------- src/manage/urlutils.py | 6 +- tests/test_urlutils.py | 20 ++++++ 3 files changed, 137 insertions(+), 44 deletions(-) diff --git a/src/_native/winhttp.cpp b/src/_native/winhttp.cpp index 9ca3293..3aea574 100644 --- a/src/_native/winhttp.cpp +++ b/src/_native/winhttp.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -108,7 +109,7 @@ static wchar_t **split_to_array(wchar_t *str, wchar_t sep) { } -static int crack_url(wchar_t *url, URL_COMPONENTS *parts, int add_nuls) { +static int crack_url(const wchar_t *url, URL_COMPONENTS *parts) { parts->lpszScheme = NULL; parts->lpszUserName = NULL; parts->lpszPassword = NULL; @@ -125,29 +126,66 @@ static int crack_url(wchar_t *url, URL_COMPONENTS *parts, int add_nuls) { winhttp_error(); return 0; } - if (add_nuls) { - if (parts->lpszScheme && parts->dwSchemeLength > 0) { - parts->lpszScheme[parts->dwSchemeLength] = L'\0'; - } - if (parts->lpszUserName && parts->dwUserNameLength > 0) { - parts->lpszUserName[parts->dwUserNameLength] = L'\0'; - } - if (parts->lpszPassword && parts->dwPasswordLength > 0) { - parts->lpszPassword[parts->dwPasswordLength] = L'\0'; - } - if (parts->lpszHostName && parts->dwHostNameLength > 0) { - parts->lpszHostName[parts->dwHostNameLength] = L'\0'; - } - if (parts->lpszUrlPath && parts->dwUrlPathLength > 0) { - parts->lpszUrlPath[parts->dwUrlPathLength] = L'\0'; + return 1; +} + +static wchar_t *_recode_url_part(bool encode, const wchar_t *url_part, DWORD cch, bool allow_env=false) +{ + if (!url_part) { + return NULL; + } + // Need to copy the incoming string to ensure it's null terminated + cch += 1; + wchar_t *url_string = (wchar_t *)PyMem_Malloc(sizeof(wchar_t) * cch); + if (!url_string) { + PyErr_NoMemory(); + return NULL; + } + wcsncpy_s(url_string, cch, url_part, cch - 1); + if (!url_string[0] || cch > 32767) { + // Too long/empty for the API, just bail out + return url_string; + } + if (allow_env && cch > 2 && url_string[0] == L'%' && url_string[cch - 2] == L'%') { + // Looks like an environment variable, so we won't change it. + return url_string; + } + + wchar_t *result = NULL; + HRESULT r = E_POINTER; + for (int retries = 3; retries > 0 && r == E_POINTER; --retries) { + result = (wchar_t *)PyMem_Realloc(result, sizeof(wchar_t) * cch); + if (!result) { + PyMem_Free(url_string); + PyErr_NoMemory(); + return NULL; } - if (parts->lpszExtraInfo && parts->dwExtraInfoLength > 0) { - parts->lpszExtraInfo[parts->dwExtraInfoLength] = L'\0'; + if (encode) { + // "SEGMENT_ONLY" means we want to escape the entire string + r = UrlEscapeW(url_string, result, &cch, URL_ESCAPE_SEGMENT_ONLY | URL_ESCAPE_ASCII_URI_COMPONENT); + } else { + r = UrlUnescapeW(url_string, result, &cch, 0); } } - return 1; + PyMem_Free(url_string); + if (r) { + err_SetFromWindowsErrWithMessage((DWORD)r); + return NULL; + } + return result; +} + +static wchar_t *encode_url_part(const wchar_t *url_part, DWORD cch, bool allow_env=false) +{ + return _recode_url_part(true, url_part, cch, allow_env); } +static wchar_t *decode_url_part(const wchar_t *url_part, DWORD cch, bool allow_env=false) +{ + return _recode_url_part(false, url_part, cch, allow_env); +} + + extern "C" { #define CHECK_WINHTTP(x) if (!x) { winhttp_error(); goto exit; } @@ -226,7 +264,6 @@ static bool winhttp_apply_proxy(HINTERNET hSession, HINTERNET hRequest, const wc PyObject *winhttp_urlopen(PyObject *, PyObject *args, PyObject *kwargs) { static const char * keywords[] = {"url", "method", "headers", "accepts", "chunksize", "on_progress", "on_cred_request", NULL}; wchar_t *url = NULL; - wchar_t *url2 = NULL; // a copy of url for splitting wchar_t *method = NULL; wchar_t *headers = NULL; wchar_t *accepts = NULL; @@ -246,7 +283,11 @@ PyObject *winhttp_urlopen(PyObject *, PyObject *args, PyObject *kwargs) { uint64_t content_length; PyObject *chunks = NULL; uint64_t content_read = 0; - size_t n = 0; + + wchar_t *hostname = NULL; + wchar_t *urlpath = NULL; + wchar_t *user = NULL; + wchar_t *pass = NULL; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O&O&O&O&|nOO:winhttp_urlopen", keywords, as_utf16, &url, as_utf16, &method, as_utf16, &headers, as_utf16, &accepts, &chunksize, &on_progress, &on_cred_request)) { @@ -264,17 +305,19 @@ PyObject *winhttp_urlopen(PyObject *, PyObject *args, PyObject *kwargs) { if (!accepts_array) { goto exit; } - n = wcslen(url) + 1; - url2 = (wchar_t *)PyMem_Malloc(n * sizeof(wchar_t)); - if (!url2) { - PyErr_NoMemory(); + if (!crack_url(url, &url_parts)) { + goto exit; + } + hostname = decode_url_part(url_parts.lpszHostName, url_parts.dwHostNameLength); + if (!hostname) { goto exit; } - wcscpy_s(url2, n, url); - if (!crack_url(url2, &url_parts, 1)) { + urlpath = decode_url_part(url_parts.lpszUrlPath, url_parts.dwUrlPathLength); + if (!urlpath) { goto exit; } + hSession = WinHttpOpen( NULL, WINHTTP_ACCESS_TYPE_DEFAULT_PROXY, @@ -310,19 +353,16 @@ PyObject *winhttp_urlopen(PyObject *, PyObject *args, PyObject *kwargs) { hConnection = WinHttpConnect( hSession, - url_parts.lpszHostName, + hostname, url_parts.nPort, 0 ); CHECK_WINHTTP(hConnection); - if (url_parts.dwUrlPathLength && !url_parts.lpszUrlPath[0]) { - url_parts.lpszUrlPath[0] = L'/'; - } hRequest = WinHttpOpenRequest( hConnection, method, - url_parts.lpszUrlPath, + urlpath && urlpath[0] ? urlpath : L"/", NULL, WINHTTP_NO_REFERER, accepts_array, @@ -341,12 +381,20 @@ PyObject *winhttp_urlopen(PyObject *, PyObject *args, PyObject *kwargs) { )); if (url_parts.dwUserNameLength || url_parts.dwPasswordLength) { + user = decode_url_part(url_parts.lpszUserName, url_parts.dwUserNameLength); + if (!user) { + goto exit; + } + pass = decode_url_part(url_parts.lpszPassword, url_parts.dwPasswordLength); + if (!pass) { + goto exit; + } CHECK_WINHTTP(WinHttpSetCredentials( hRequest, WINHTTP_AUTH_TARGET_SERVER, WINHTTP_AUTH_SCHEME_BASIC, - url_parts.lpszUserName, - url_parts.lpszPassword, + user, + pass, NULL )); } @@ -464,11 +512,14 @@ PyObject *winhttp_urlopen(PyObject *, PyObject *args, PyObject *kwargs) { if (hSession) { WinHttpCloseHandle(hSession); } + PyMem_Free(user); + PyMem_Free(pass); + PyMem_Free(hostname); + PyMem_Free(urlpath); PyMem_Free(accepts_array); PyMem_Free(accepts); PyMem_Free(headers); PyMem_Free(method); - PyMem_Free(url2); PyMem_Free(url); return result; } @@ -510,19 +561,26 @@ PyObject *winhttp_urlsplit(PyObject *, PyObject *args, PyObject *kwargs) { return NULL; } URL_COMPONENTS url_parts = { sizeof(URL_COMPONENTS) }; - if (!crack_url(url, &url_parts, 0)) { + if (!crack_url(url, &url_parts)) { PyMem_Free(url); return NULL; } + // Deliberately not decoding host or path. We never use a blacklist, we only + // match against values specified by the user, or pass it to. If they want + // to provide the same URL with different encoding, that's their fault. + wchar_t *user = decode_url_part(url_parts.lpszUserName, url_parts.dwUserNameLength, true); + wchar_t *pass = decode_url_part(url_parts.lpszPassword, url_parts.dwPasswordLength, true); PyObject *r = Py_BuildValue("(u#u#u#u#nu#u#)", url_parts.lpszScheme, (Py_ssize_t)url_parts.dwSchemeLength, - url_parts.lpszUserName, (Py_ssize_t)url_parts.dwUserNameLength, - url_parts.lpszPassword, (Py_ssize_t)url_parts.dwPasswordLength, + user, (Py_ssize_t)(user ? wcslen(user) : 0), + pass, (Py_ssize_t)(pass ? wcslen(pass) : 0), url_parts.lpszHostName, (Py_ssize_t)url_parts.dwHostNameLength, (Py_ssize_t)url_parts.nPort, url_parts.lpszUrlPath, (Py_ssize_t)url_parts.dwUrlPathLength, url_parts.lpszExtraInfo, (Py_ssize_t)url_parts.dwExtraInfoLength ); + PyMem_Free(user); + PyMem_Free(pass); PyMem_Free(url); return r; } @@ -532,10 +590,12 @@ PyObject *winhttp_urlunsplit(PyObject *, PyObject *args, PyObject *kwargs) { static const char * keywords[] = {"scheme", "user", "password", "netloc", "port", "path", "extra", NULL}; URL_COMPONENTS url = { sizeof(URL_COMPONENTS) }; Py_ssize_t port = 0; + wchar_t *user = NULL; + wchar_t *pass = NULL; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O&O&O&O&nO&O&:winhttp_urlunsplit", keywords, as_utf16, &url.lpszScheme, - as_utf16, &url.lpszUserName, - as_utf16, &url.lpszPassword, + as_utf16, &user, + as_utf16, &pass, as_utf16, &url.lpszHostName, &port, as_utf16, &url.lpszUrlPath, @@ -545,8 +605,16 @@ PyObject *winhttp_urlunsplit(PyObject *, PyObject *args, PyObject *kwargs) { } DWORD cch = 0; PyObject *r = NULL; + url.lpszUserName = encode_url_part(user, user ? wcslen(user) : 0, true); + if (user && !url.lpszUserName) { + goto exit; + } + url.lpszPassword = encode_url_part(pass, pass ? wcslen(pass) : 0, true); + if (pass && !url.lpszPassword) { + goto exit; + } url.nPort = (INTERNET_PORT)port; - if (WinHttpCreateUrl(&url, ICU_ESCAPE, NULL, &cch)) { + if (WinHttpCreateUrl(&url, 0, NULL, &cch)) { // Success path, because it should've failed with ERROR_INSUFFICIENT_BUFFER PyErr_SetString(PyExc_ValueError, "unable to unsplit URL"); } else if (GetLastError() != ERROR_INSUFFICIENT_BUFFER) { @@ -556,7 +624,7 @@ PyObject *winhttp_urlunsplit(PyObject *, PyObject *args, PyObject *kwargs) { wchar_t *buf = (wchar_t*)PyMem_Malloc(cch * sizeof(wchar_t)); if (!buf) { PyErr_NoMemory(); - } else if (!WinHttpCreateUrl(&url, ICU_ESCAPE, buf, &cch)) { + } else if (!WinHttpCreateUrl(&url, 0, buf, &cch)) { winhttp_error(); PyMem_Free(buf); } else { @@ -564,6 +632,9 @@ PyObject *winhttp_urlunsplit(PyObject *, PyObject *args, PyObject *kwargs) { PyMem_Free(buf); } } +exit: + PyMem_Free(user); + PyMem_Free(pass); PyMem_Free(url.lpszScheme); PyMem_Free(url.lpszUserName); PyMem_Free(url.lpszPassword); diff --git a/src/manage/urlutils.py b/src/manage/urlutils.py index 540fd53..f88f0a1 100644 --- a/src/manage/urlutils.py +++ b/src/manage/urlutils.py @@ -552,9 +552,11 @@ def sanitise_url(url): if ex.winerror in (12005, 12006): return url raise - p[U_USERNAME] = None + u = p[U_USERNAME] + if u and not (u.startswith("%") and u.endswith("%")): + p[U_USERNAME] = None pw = p[U_PASSWORD] - if pw and not (pw.startswith("%") and pw.startswith("%")): + if pw and not (pw.startswith("%") and pw.endswith("%")): p[U_PASSWORD] = None return winhttp_urlunsplit(*p) diff --git a/tests/test_urlutils.py b/tests/test_urlutils.py index 926bfe1..5ec537b 100644 --- a/tests/test_urlutils.py +++ b/tests/test_urlutils.py @@ -11,6 +11,8 @@ ("https://example.com/", "https://example.com/"), ("https://user@example.com/", "https://example.com/"), ("https://user:placeholder@example.com/", "https://example.com/"), + ("https://%placeholder%@example.com/", "https://%placeholder%@example.com/"), + ("https://%user%:%placeholder%@example.com/", "https://%user%:%placeholder%@example.com/"), ]]) def test_urlsanitise(url, expect): assert expect == UU.sanitise_url(url) @@ -30,10 +32,23 @@ def test_urlunsanitise(): assert None == UU.unsanitise_url(url, candidates) +def test_urlunsanitise_encoded(): + candidates = ["https://user%40example.com:place%40holder@example.com/"] + url = "https://example.com/my_path" + expect = "https://user%40example.com:place%40holder@example.com/my_path" + actual = UU.unsanitise_url(url, candidates) + print(actual) + print(UU.winhttp_urlsplit(actual)) + assert expect == UU.unsanitise_url(url, candidates) + + def test_extract_url_auth(): assert "1", "2" == UU.extract_url_auth("https://1:2@example.com") assert "1", "" == UU.extract_url_auth("https://1@example.com") + assert "1", "2" == UU.extract_url_auth("https://%31:%32@example.com") + assert "1", "" == UU.extract_url_auth("https://%31@example.com") + os.environ["PYMANAGER_TEST_VALUE"] = v = str(time.time()) assert "1", v == UU.extract_url_auth("https://1:%PYMANAGER_TEST_VALUE%@example.com") @@ -48,6 +63,11 @@ def test_extract_url_auth(): ("https://example.com/A/B/C", "//EXAMPLE.COM/A", True, "https://EXAMPLE.COM/A"), ("https://example.com/A/B/C", "//EXAMPLE.COM/", None, "https://EXAMPLE.COM/"), + # We are intentionally blind to encoded chars. + ("https://example.com/A/B/C", "%2fD", False, "https://example.com/A/B/C/%2fD"), + ("https://example.com/A/B/C", "%2f%2fD", False, "https://example.com/A/B/C/%2f%2fD"), + ("https://example.com/A/B%2fC", "D", True, "https://example.com/A/D"), + ("file:///C:/local/path", "file.json", False, "file:///C:/local/path/file.json"), ("file:///C:/local/path", "file.json", True, "file:///C:/local/file.json"), ("file:///C:/local/path", ".\\dir\\file.json", False, "file:///C:/local/path/dir/file.json"), From 235e8166e1450bbd44271ebe64c074e7dce27bd4 Mon Sep 17 00:00:00 2001 From: Steve Dower Date: Fri, 29 May 2026 12:44:48 +0100 Subject: [PATCH 2/3] Rename to escape --- src/_native/winhttp.cpp | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/_native/winhttp.cpp b/src/_native/winhttp.cpp index 3aea574..8d7d580 100644 --- a/src/_native/winhttp.cpp +++ b/src/_native/winhttp.cpp @@ -129,7 +129,7 @@ static int crack_url(const wchar_t *url, URL_COMPONENTS *parts) { return 1; } -static wchar_t *_recode_url_part(bool encode, const wchar_t *url_part, DWORD cch, bool allow_env=false) +static wchar_t *_escape_url_part(bool encode, const wchar_t *url_part, DWORD cch, bool allow_env=false) { if (!url_part) { return NULL; @@ -175,14 +175,14 @@ static wchar_t *_recode_url_part(bool encode, const wchar_t *url_part, DWORD cch return result; } -static wchar_t *encode_url_part(const wchar_t *url_part, DWORD cch, bool allow_env=false) +static wchar_t *escape_url_part(const wchar_t *url_part, DWORD cch, bool allow_env=false) { - return _recode_url_part(true, url_part, cch, allow_env); + return _escape_url_part(true, url_part, cch, allow_env); } -static wchar_t *decode_url_part(const wchar_t *url_part, DWORD cch, bool allow_env=false) +static wchar_t *unescape_url_part(const wchar_t *url_part, DWORD cch, bool allow_env=false) { - return _recode_url_part(false, url_part, cch, allow_env); + return _escape_url_part(false, url_part, cch, allow_env); } @@ -308,11 +308,11 @@ PyObject *winhttp_urlopen(PyObject *, PyObject *args, PyObject *kwargs) { if (!crack_url(url, &url_parts)) { goto exit; } - hostname = decode_url_part(url_parts.lpszHostName, url_parts.dwHostNameLength); + hostname = unescape_url_part(url_parts.lpszHostName, url_parts.dwHostNameLength); if (!hostname) { goto exit; } - urlpath = decode_url_part(url_parts.lpszUrlPath, url_parts.dwUrlPathLength); + urlpath = unescape_url_part(url_parts.lpszUrlPath, url_parts.dwUrlPathLength); if (!urlpath) { goto exit; } @@ -381,11 +381,11 @@ PyObject *winhttp_urlopen(PyObject *, PyObject *args, PyObject *kwargs) { )); if (url_parts.dwUserNameLength || url_parts.dwPasswordLength) { - user = decode_url_part(url_parts.lpszUserName, url_parts.dwUserNameLength); + user = unescape_url_part(url_parts.lpszUserName, url_parts.dwUserNameLength); if (!user) { goto exit; } - pass = decode_url_part(url_parts.lpszPassword, url_parts.dwPasswordLength); + pass = unescape_url_part(url_parts.lpszPassword, url_parts.dwPasswordLength); if (!pass) { goto exit; } @@ -568,8 +568,8 @@ PyObject *winhttp_urlsplit(PyObject *, PyObject *args, PyObject *kwargs) { // Deliberately not decoding host or path. We never use a blacklist, we only // match against values specified by the user, or pass it to. If they want // to provide the same URL with different encoding, that's their fault. - wchar_t *user = decode_url_part(url_parts.lpszUserName, url_parts.dwUserNameLength, true); - wchar_t *pass = decode_url_part(url_parts.lpszPassword, url_parts.dwPasswordLength, true); + wchar_t *user = unescape_url_part(url_parts.lpszUserName, url_parts.dwUserNameLength, true); + wchar_t *pass = unescape_url_part(url_parts.lpszPassword, url_parts.dwPasswordLength, true); PyObject *r = Py_BuildValue("(u#u#u#u#nu#u#)", url_parts.lpszScheme, (Py_ssize_t)url_parts.dwSchemeLength, user, (Py_ssize_t)(user ? wcslen(user) : 0), @@ -605,11 +605,11 @@ PyObject *winhttp_urlunsplit(PyObject *, PyObject *args, PyObject *kwargs) { } DWORD cch = 0; PyObject *r = NULL; - url.lpszUserName = encode_url_part(user, user ? wcslen(user) : 0, true); + url.lpszUserName = escape_url_part(user, user ? wcslen(user) : 0, true); if (user && !url.lpszUserName) { goto exit; } - url.lpszPassword = encode_url_part(pass, pass ? wcslen(pass) : 0, true); + url.lpszPassword = escape_url_part(pass, pass ? wcslen(pass) : 0, true); if (pass && !url.lpszPassword) { goto exit; } From e2bbd16d8b1c61927a1717f61e988c09c6d255e3 Mon Sep 17 00:00:00 2001 From: Steve Dower Date: Fri, 29 May 2026 12:56:19 +0100 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- tests/test_urlutils.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/test_urlutils.py b/tests/test_urlutils.py index 5ec537b..289856a 100644 --- a/tests/test_urlutils.py +++ b/tests/test_urlutils.py @@ -36,9 +36,6 @@ def test_urlunsanitise_encoded(): candidates = ["https://user%40example.com:place%40holder@example.com/"] url = "https://example.com/my_path" expect = "https://user%40example.com:place%40holder@example.com/my_path" - actual = UU.unsanitise_url(url, candidates) - print(actual) - print(UU.winhttp_urlsplit(actual)) assert expect == UU.unsanitise_url(url, candidates) @@ -46,8 +43,8 @@ def test_extract_url_auth(): assert "1", "2" == UU.extract_url_auth("https://1:2@example.com") assert "1", "" == UU.extract_url_auth("https://1@example.com") - assert "1", "2" == UU.extract_url_auth("https://%31:%32@example.com") - assert "1", "" == UU.extract_url_auth("https://%31@example.com") + assert ("1", "2") == UU.extract_url_auth("https://%31:%32@example.com") + assert ("1", "") == UU.extract_url_auth("https://%31@example.com") os.environ["PYMANAGER_TEST_VALUE"] = v = str(time.time()) assert "1", v == UU.extract_url_auth("https://1:%PYMANAGER_TEST_VALUE%@example.com")