|
21 | 21 | import html
|
22 | 22 | import http, http.client
|
23 | 23 | import urllib.parse
|
| 24 | +import urllib.request |
24 | 25 | import tempfile
|
25 | 26 | import time
|
26 | 27 | import datetime
|
|
33 | 34 | from test.support import (
|
34 | 35 | is_apple, import_helper, os_helper, threading_helper
|
35 | 36 | )
|
| 37 | +from test.support.script_helper import kill_python, spawn_python |
| 38 | +from test.support.socket_helper import find_unused_port |
36 | 39 |
|
37 | 40 | try:
|
38 | 41 | import ssl
|
@@ -1452,6 +1455,73 @@ def test_unknown_flag(self, _):
|
1452 | 1455 | self.assertIn('error', stderr.getvalue())
|
1453 | 1456 |
|
1454 | 1457 |
|
| 1458 | +class CommandLineRunTimeTestCase(unittest.TestCase): |
| 1459 | + served_data = os.urandom(32) |
| 1460 | + served_filename = 'served_filename' |
| 1461 | + tls_cert = certdata_file('ssl_cert.pem') |
| 1462 | + tls_key = certdata_file('ssl_key.pem') |
| 1463 | + tls_password = b'somepass' |
| 1464 | + tls_password_file = 'ssl_key_password' |
| 1465 | + |
| 1466 | + def setUp(self): |
| 1467 | + super().setUp() |
| 1468 | + server_dir_context = os_helper.temp_cwd() |
| 1469 | + server_dir = self.enterContext(server_dir_context) |
| 1470 | + with open(self.served_filename, 'wb') as f: |
| 1471 | + f.write(self.served_data) |
| 1472 | + with open(self.tls_password_file, 'wb') as f: |
| 1473 | + f.write(self.tls_password) |
| 1474 | + |
| 1475 | + def fetch_file(self, path, context=None): |
| 1476 | + req = urllib.request.Request(path, method='GET') |
| 1477 | + with urllib.request.urlopen(req, context=context) as res: |
| 1478 | + return res.read() |
| 1479 | + |
| 1480 | + def parse_cli_output(self, output): |
| 1481 | + match = re.search(r'Serving (HTTP|HTTPS) on (.+) port (\d+)', output) |
| 1482 | + if match is None: |
| 1483 | + return None, None, None |
| 1484 | + return match.group(1).lower(), match.group(2), int(match.group(3)) |
| 1485 | + |
| 1486 | + def wait_for_server(self, proc, protocol, bind, port): |
| 1487 | + """Check that the server has been successfully started.""" |
| 1488 | + line = proc.stdout.readline().strip() |
| 1489 | + if support.verbose: |
| 1490 | + print() |
| 1491 | + print('python -m http.server: ', line) |
| 1492 | + return self.parse_cli_output(line) == (protocol, bind, port) |
| 1493 | + |
| 1494 | + def test_http_client(self): |
| 1495 | + bind, port = '127.0.0.1', find_unused_port() |
| 1496 | + proc = spawn_python('-u', '-m', 'http.server', str(port), '-b', bind, |
| 1497 | + bufsize=1, text=True) |
| 1498 | + self.addCleanup(kill_python, proc) |
| 1499 | + self.addCleanup(proc.terminate) |
| 1500 | + self.assertTrue(self.wait_for_server(proc, 'http', bind, port)) |
| 1501 | + res = self.fetch_file(f'http://{bind}:{port}/{self.served_filename}') |
| 1502 | + self.assertEqual(res, self.served_data) |
| 1503 | + |
| 1504 | + @unittest.skipIf(ssl is None, "requires ssl") |
| 1505 | + def test_https_client(self): |
| 1506 | + context = ssl.create_default_context() |
| 1507 | + # allow self-signed certificates |
| 1508 | + context.check_hostname = False |
| 1509 | + context.verify_mode = ssl.CERT_NONE |
| 1510 | + |
| 1511 | + bind, port = '127.0.0.1', find_unused_port() |
| 1512 | + proc = spawn_python('-u', '-m', 'http.server', str(port), '-b', bind, |
| 1513 | + '--tls-cert', self.tls_cert, |
| 1514 | + '--tls-key', self.tls_key, |
| 1515 | + '--tls-password-file', self.tls_password_file, |
| 1516 | + bufsize=1, text=True) |
| 1517 | + self.addCleanup(kill_python, proc) |
| 1518 | + self.addCleanup(proc.terminate) |
| 1519 | + self.assertTrue(self.wait_for_server(proc, 'https', bind, port)) |
| 1520 | + url = f'https://{bind}:{port}/{self.served_filename}' |
| 1521 | + res = self.fetch_file(url, context=context) |
| 1522 | + self.assertEqual(res, self.served_data) |
| 1523 | + |
| 1524 | + |
1455 | 1525 | def setUpModule():
|
1456 | 1526 | unittest.addModuleCleanup(os.chdir, os.getcwd())
|
1457 | 1527 |
|
|
0 commit comments