简单写一个上传文件程序,要求同时上传文件数量可以人为控制
下面是一个使用 **Python** 编写的简单文件上传程序,支持**人为控制同时上传的文件数量**(即并发数)。这个示例使用了 `concurrent.futures.ThreadPoolExecutor` 来实现多线程上传,你可以根据需要调整并发数。
---
### ✅ 功能说明:
- 支持指定最大并发上传数量(如 5 个文件同时上传)。
- 可以选择要上传的文件路径(例如:`./files/` 目录下的所有 `.txt` 文件)。
- 每个文件上传完成后打印状态信息。
---
### 🧾 示例代码(Python)
```python
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
# 模拟上传文件的函数(实际可替换为真实上传逻辑)
def upload_file(file_path):
file_name = os.path.basename(file_path)
print(f"[{time.ctime()}] 开始上传: {file_name}")
time.sleep(2) # 模拟上传耗时
print(f"[{time.ctime()}] 完成上传: {file_name}")
return file_name
def main():
# 设置上传目录和最大并发数
upload_dir = "./files" # 替换为你的文件目录
max_concurrent = 5 # 同时上传的文件数量
# 获取所有要上传的文件(这里以 .txt 文件为例)
files_to_upload = [os.path.join(upload_dir, f) for f in os.listdir(upload_dir) if f.endswith('.txt')]
if not files_to_upload:
print("没有找到需要上传的文件。")
return
print(f"找到 {len(files_to_upload)} 个文件待上传,最大并发数: {max_concurrent}")
# 使用线程池控制并发数
with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
futures = [executor.submit(upload_file, file) for file in files_to_upload]
for future in as_completed(futures):
try:
result = future.result()
print(f"✅ 上传完成: {result}")
except Exception as e:
print(f"❌ 上传失败: {e}")
if __name__ == "__main__":
main()
```
---
### 📁 文件结构示例:
```
your_project/
│
├── files/
│ ├── file1.txt
│ ├── file2.txt
│ └── file3.txt
│
└── upload_script.py
```
---
### 🔄 运行说明:
1. 将上述代码保存为 `upload_script.py`。
2. 在 `files/` 目录下放置一些 `.txt` 文件。
3. 运行脚本:
```bash
python upload_script.py
```
---
### 🛠️ 扩展建议:
- 如果你想要上传到网络服务器,可以将 `upload_file` 函数改为使用 `requests` 或 `ftplib` 等库进行实际上传。
- 可以添加日志记录、错误重试、进度条等功能。
---
如果你有特定的上传目标(比如 FTP、HTTP、S3 等),我可以帮你进一步完善代码。欢迎继续提问!