简单写一个上传文件程序,要求同时上传文件数量可以人为控制
下面是一个使用 **Python** 编写的简单文件上传程序,支持**人为控制同时上传的文件数量**(即并发数)。这个示例使用了 `concurrent.futures.ThreadPoolExecutor` 来实现多线程上传,你可以根据需要调整并发数。 --- ### ✅ 功能说明: - 支持指定最大并发上传数量(如 5 个文件同时上传)。 - 可以选择要上传的文件路径(例如:`./files/` 目录下的所有 `.txt` 文件)。 - 每个文件上传完成后打印状态信息。 --- ### 🧾 示例代码(Python) ```python import os import time from concurrent.futures import ThreadPoolExecutor, as_completed # 模拟上传文件的函数(实际可替换为真实上传逻辑) def upload_file(file_path): file_name = os.path.basename(file_path) print(f"[{time.ctime()}] 开始上传: {file_name}") time.sleep(2) # 模拟上传耗时 print(f"[{time.ctime()}] 完成上传: {file_name}") return file_name def main(): # 设置上传目录和最大并发数 upload_dir = "./files" # 替换为你的文件目录 max_concurrent = 5 # 同时上传的文件数量 # 获取所有要上传的文件(这里以 .txt 文件为例) files_to_upload = [os.path.join(upload_dir, f) for f in os.listdir(upload_dir) if f.endswith('.txt')] if not files_to_upload: print("没有找到需要上传的文件。") return print(f"找到 {len(files_to_upload)} 个文件待上传,最大并发数: {max_concurrent}") # 使用线程池控制并发数 with ThreadPoolExecutor(max_workers=max_concurrent) as executor: futures = [executor.submit(upload_file, file) for file in files_to_upload] for future in as_completed(futures): try: result = future.result() print(f"✅ 上传完成: {result}") except Exception as e: print(f"❌ 上传失败: {e}") if __name__ == "__main__": main() ``` --- ### 📁 文件结构示例: ``` your_project/ │ ├── files/ │ ├── file1.txt │ ├── file2.txt │ └── file3.txt │ └── upload_script.py ``` --- ### 🔄 运行说明: 1. 将上述代码保存为 `upload_script.py`。 2. 在 `files/` 目录下放置一些 `.txt` 文件。 3. 运行脚本: ```bash python upload_script.py ``` --- ### 🛠️ 扩展建议: - 如果你想要上传到网络服务器,可以将 `upload_file` 函数改为使用 `requests` 或 `ftplib` 等库进行实际上传。 - 可以添加日志记录、错误重试、进度条等功能。 --- 如果你有特定的上传目标(比如 FTP、HTTP、S3 等),我可以帮你进一步完善代码。欢迎继续提问!