用Python3实现一个简单的P2P文件和消息传输小工具

AKHYui2025-05-24 07:22:00工具搭建
# 信令服务器搭建

信令服务器搭建需要一个具有公网IP的服务器(VPS),信令服务器的功能包括以下几点:

  1. 节点注册 (/register/{peer_id}): 允许一个 P2P 节点(peer)注册其 ID 和它打算用于 P2P 通信的端口。服务器会自动记录请求来源的公网 IP 地址。

  2. 节点查询 (/query/{peer_id}): 允许一个节点查询另一个已注册节点的信息(公网 IP 和 P2P 端口)。

  3. 列出所有节点 (/list_peers): 显示当前所有已注册的节点信息(主要用于调试)。

  4. 节点注销 (/unregister/{peer_id}): 允许节点从服务器注销。


from fastapi import FastAPI, Request, HTTPException

from pydantic import BaseModel # 用于数据验证和序列化

import uvicorn # ASGI 服务器

import time # 用于记录时间戳

from typing import Dict, Any, Optional



# 初始化 FastAPI 应用

app = FastAPI(

    title="Simple P2P Signaling Server",

    description="A basic signaling server for P2P peer discovery.",

    version="0.1.0"

)



# 使用 Pydantic 定义请求体模型

class PeerRegisterInfo(BaseModel):

    p2p_port: int # Peer 宣称其用于 P2P UDP 通信的本地端口



# 使用 Pydantic 定义响应体模型

class PeerDetailsResponse(BaseModel):

    peer_id: str

    public_ip: str  # 服务器看到的该 Peer 的公网 IP

    p2p_port: int   # Peer 注册的 P2P 端口

    registered_at: float # 注册时的时间戳



# 简单的内存数据库来存储 Peer 信息

# 格式: { "peer_id": {"public_ip": "x.x.x.x", "p2p_port": 12345, "registered_at": timestamp} }

peers_db: Dict[str, Dict[str, Any]] = {}





@app.get("/")

async def read_root():

    """

    根路径,确认服务器正在运行。

    """

    return {"message": "P2P Signaling Server is running. Access /docs for API documentation."}





@app.post("/register/{peer_id}", status_code=201, response_model=PeerDetailsResponse)

async def register_peer(peer_id: str, peer_info: PeerRegisterInfo, request: Request):

    """

    注册或更新一个 Peer 的信息。

    - `peer_id`: Peer 的唯一标识符。

    - `peer_info`: 包含 `p2p_port`,即 Peer 用于 P2P 通信的端口。

    服务器会自动从请求中获取 Peer 的公网 IP (`request.client.host`)。

    """

    client_host = request.client.host  # 这是 FastAPI 服务器看到的客户端 IP 地址

                                     # 注意: 如果服务器在反向代理后,需要配置代理以传递正确的客户端IP



    current_time = time.time()

    peers_db[peer_id] = {

        "public_ip": client_host,

        "p2p_port": peer_info.p2p_port,

        "registered_at": current_time

    }

    print(f"[INFO] Peer registered/updated: {peer_id} -> IP: {client_host}, P2P Port: {peer_info.p2p_port}")

    

    return PeerDetailsResponse(

        peer_id=peer_id,

        public_ip=client_host,

        p2p_port=peer_info.p2p_port,

        registered_at=current_time

    )





@app.get("/query/{peer_id}", response_model=PeerDetailsResponse)

async def query_peer(peer_id: str):

    """

    查询指定 Peer ID 的注册信息。

    """

    if peer_id not in peers_db:

        print(f"[WARN] Peer query failed: {peer_id} not found.")

        raise HTTPException(status_code=404, detail=f"Peer '{peer_id}' not found")

    

    peer_data = peers_db[peer_id]

    print(f"[INFO] Peer query successful for: {peer_id} -> {peer_data}")

    return PeerDetailsResponse(

        peer_id=peer_id,

        public_ip=peer_data["public_ip"],

        p2p_port=peer_data["p2p_port"],

        registered_at=peer_data["registered_at"]

    )





@app.get("/list_peers")

async def list_all_peers():

    """

    列出所有当前已注册的 Peers 及其信息。

    主要用于调试。

    """

    if not peers_db:

        return {"message": "No peers currently registered."}

    return peers_db





@app.delete("/unregister/{peer_id}", status_code=200)

async def unregister_peer(peer_id: str):

    """

    从服务器注销一个 Peer。

    """

    if peer_id in peers_db:

        del peers_db[peer_id]

        print(f"[INFO] Peer unregistered: {peer_id}")

        return {"message": "Peer unregistered successfully", "peer_id": peer_id}

    else:

        print(f"[WARN] Attempt to unregister non-existent peer: {peer_id}")

        raise HTTPException(status_code=404, detail=f"Peer '{peer_id}' not found, cannot unregister")



# --- 如何运行此服务器 ---

# 1. 将此代码保存为 `signaling_server.py`。

# 2. 安装必要的库:

#    pip install fastapi "uvicorn[standard]" pydantic

# 3. 在终端中运行服务器:

#    uvicorn signaling_server:app --host 0.0.0.0 --port 8000 --reload

#

#    - `uvicorn signaling_server:app`: 告诉 Uvicorn 运行 `signaling_server.py` 文件中的 `app` FastAPI 实例。

#    - `--host 0.0.0.0`: 使服务器监听所有网络接口,这样局域网内或公网(如果端口已转发)的其他机器可以访问。

#    - `--port 8000`: 指定服务器监听的端口。

#    - `--reload`: (可选,用于开发) 当代码文件发生更改时,服务器会自动重新加载。

#

# 你的 P2P 客户端程序随后可以向 `http://<服务器的IP地址>:8000` 发送 HTTP 请求。

# 例如:

#   - 注册 peer "alice" 在 P2P 端口 12345:

#     POST http://<server_ip>:8000/register/alice  (Body: {"p2p_port": 12345})

#   - 查询 peer "alice" 的信息:

#     GET http://<server_ip>:8000/query/alice



if __name__ == "__main__":

    # 这部分通常不会直接运行,因为 FastAPI 应用推荐使用 Uvicorn 从命令行启动。

    # 但为了完整性,这里可以添加一行来提示如何运行。

    print("To run this FastAPI application, use the command:")

    print("uvicorn signaling_server:app --host 0.0.0.0 --port 8000 --reload")

    # uvicorn.run(app, host="0.0.0.0", port=8000) # 也可以取消注释这行来直接运行,但不推荐用于生产

如何使用信令服务器

如何使用这个信令服务器 (P2P 客户端的视角):

Peer A 启动:

Peer A 决定一个用于 P2P UDP 通信的本地端口 (例如,随机选择一个或用户指定一个,如 55555)。

Peer A 向信令服务器发送一个 POST 请求到 /register/peerA,请求体为 {"p2p_port": 55555}。

信令服务器收到请求,记录下 Peer A 的公网 IP (例如 1.2.3.4,这是服务器看到的 Peer A 的 IP) 和 P2P 端口 55555。

Peer B 启动:

Peer B 执行类似操作,注册其 P2P 端口 (例如 66666) 到 /register/peerB。

信令服务器记录 Peer B 的公网 IP (例如 5.6.7.8) 和 P2P 端口 66666。

Peer A 想要连接 Peer B:

Peer A 向信令服务器发送一个 GET 请求到 /query/peerB。

服务器响应 Peer B 的信息:{"public_ip": "5.6.7.8", "p2p_port": 66666, ...}。

Peer B 想要连接 Peer A:

Peer B 向信令服务器发送一个 GET 请求到 /query/peerA。

服务器响应 Peer A 的信息:{"public_ip": "1.2.3.4", "p2p_port": 55555, ...}。

尝试打洞:

现在 Peer A 知道了 Peer B 的公网端点 (5.6.7.8:66666)。

Peer B 知道了 Peer A 的公网端点 (1.2.3.4:55555)。

双方现在可以尝试向对方的公网端点发送 UDP 包以进行 NAT 打洞。

重要注意事项:

极其简化: 这是一个非常基础的信令服务器。

内存存储: 所有 Peer 数据都存储在内存中,服务器重启后数据会丢失。生产环境需要使用数据库(如 Redis, PostgreSQL, MongoDB 等)。

无身份验证/授权: 任何人都可以注册或查询任何 Peer ID。生产环境需要安全机制。

无错误恢复/重试: 客户端需要自己处理网络请求的失败和重试。

无 Peer 超时: 过期的 Peer 信息会一直保留,除非手动注销。可以添加时间戳并定期清理不活跃的 Peer。

公网 IP 地址 (request.client.host):

request.client.host 获取的是直接连接到 FastAPI 服务器的客户端的 IP 地址。

如果你的 FastAPI 服务器直接暴露在公网,这通常就是 Peer 的公网 IP。

如果 FastAPI 服务器部署在反向代理(如 Nginx、Traefik)之后,你需要确保反向代理正确设置了 X-Forwarded-For 或类似的头部,并且 FastAPI (或其 ASGI 服务器 Uvicorn) 被配置为信任这些头部来获取真实的客户端 IP。否则,request.client.host 可能会是反向代理的 IP 地址。

*不处理 NAT 类型: *这个信令服务器不扮演 STUN 服务器的角色,它不帮助客户端检测其 NAT 类型。它只是记录和交换客户端报告的 P2P 端口以及服务器观察到的客户端公网 IP。

HTTP 通信: 信令是通过 HTTP 进行的。P2P 客户端需要 HTTP 请求库(如 Python 的 requests 或 httpx)来与此服务器交互。

客户端服务

客户端需要实现以下基本功能:

  1. 向信令服务器注册自己的 peer_id 和一个宣称的 P2P 通信端口。

  2. 向信令服务器查询其他 peer_id 的网络信息。

  3. 列出所有已注册的节点。

  4. 从信令服务器注销自己。


import requests

import json

import random

import socket

import threading

import time

import queue

import os # 用于文件操作

import math # 用于计算块数



# --- 配置和全局变量 (与之前类似,增加文件传输相关) ---

SIGNALING_SERVER_URL = "http://信令服务器IP地址:8000"

current_peer_id = None

current_p2p_port = None



p2p_udp_socket = None

p2p_target_peer_id = None

p2p_target_reported_addr = None

p2p_target_actual_addr = None # 打洞成功后对方的 (ip, port)

p2p_listener_thread = None

p2p_stop_listener = threading.Event()

received_messages_q = queue.Queue() # 用于文本消息和控制消息



# 文件传输相关状态

CHUNK_SIZE = 8192  # bytes

FILE_TRANSFER_TIMEOUT = 5 # 秒,等待ACK的超时

FILE_TRANSFER_RETRIES = 3 # 重传次数



# 使用字典来管理不同类型事件的Event对象,用于线程同步

ack_events = {} # key: chunk_sq_num, value: threading.Event()

file_offer_response_event = threading.Event() # 用于等待对方接受/拒绝文件

file_offer_accepted = False # 标记对方是否接受文件

file_transfer_active = False # 标记当前是否有文件传输在进行

file_transfer_send_thread = None # 文件发送线程



# --- 信令服务器交互函数 (register_peer, query_peer, list_all_peers, unregister_peer) ---

# (请从您之前的 p2p_client.py 中复制这些函数过来,确保 unregister_peer 会调用 stop_current_p2p_activity)

# <在此处粘贴之前的信令函数>

# --- 信令服务器交互函数 (与之前类似,这里省略,假设它们已存在) ---

# register_peer, query_peer, list_all_peers, unregister_peer

# (请从您之前的 p2p_client.py 中复制这些函数过来)



# --- 新增:信令服务器交互函数 (确保它们在这里) ---

def register_peer(peer_id: str, p2p_port: int):

    global current_peer_id, current_p2p_port

    url = f"{SIGNALING_SERVER_URL}/register/{peer_id}"

    payload = {"p2p_port": p2p_port}

    try:

        response = requests.post(url, json=payload)

        response.raise_for_status()

        data = response.json()

        print(f"[注册成功] 服务器响应: {data}")

        current_peer_id = peer_id

        current_p2p_port = p2p_port # 保存我们声明的P2P端口

        return data

    except requests.exceptions.RequestException as e:

        print(f"[注册失败] 错误: {e}")

        if hasattr(e, 'response') and e.response is not None:

            try:

                print(f"服务器错误详情: {e.response.json()}")

            except json.JSONDecodeError:

                print(f"服务器错误详情 (非JSON): {e.response.text}")

        return None



def query_peer(peer_id_to_query: str):

    if not peer_id_to_query:

        print("错误: 需要提供要查询的 peer_id。")

        return None

    url = f"{SIGNALING_SERVER_URL}/query/{peer_id_to_query}"

    try:

        response = requests.get(url)

        response.raise_for_status()

        data = response.json()

        # 我们不在这里打印,让调用者决定如何使用

        # print(f"[查询成功] Peer '{peer_id_to_query}' 的信息: {data}")

        return data

    except requests.exceptions.RequestException as e:

        print(f"[查询失败] 错误: {e}")

        if hasattr(e, 'response') and e.response is not None:

            try:

                print(f"服务器错误详情: {e.response.json()}")

            except json.JSONDecodeError:

                print(f"服务器错误详情 (非JSON): {e.response.text}")

        return None



def list_all_peers():

    url = f"{SIGNALING_SERVER_URL}/list_peers"

    try:

        response = requests.get(url)

        response.raise_for_status()

        data = response.json()

        print("[列出所有 Peers] 服务器响应:")

        if isinstance(data, dict) and data.get("message") == "No peers currently registered.":

             print("  目前没有节点注册。")

        elif isinstance(data, dict):

            for peer_id, info in data.items():

                print(f"  - {peer_id}: {info}")

        else:

            print(data)

        return data

    except requests.exceptions.RequestException as e:

        print(f"[列出 Peers 失败] 错误: {e}")

        return None



def unregister_peer(peer_id_to_unregister: str):

    global current_peer_id, current_p2p_port

    if not peer_id_to_unregister:

        print("错误: 需要提供要注销的 peer_id。")

        return None

    

    stop_current_p2p_activity() # 注销前停止所有P2P活动



    url = f"{SIGNALING_SERVER_URL}/unregister/{peer_id_to_unregister}"

    try:

        response = requests.delete(url)

        response.raise_for_status()

        data = response.json()

        print(f"[注销成功] 服务器响应: {data}")

        if peer_id_to_unregister == current_peer_id:

            current_peer_id = None

            current_p2p_port = None

        return data

    except requests.exceptions.RequestException as e:

        print(f"[注销失败] 错误: {e}")

        if hasattr(e, 'response') and e.response is not None:

            try:

                print(f"服务器错误详情: {e.response.json()}")

            except json.JSONDecodeError:

                print(f"服务器错误详情 (非JSON): {e.response.text}")

        return None

# --- END 信令函数 ---





def initialize_p2p_socket(local_port: int):

    global p2p_udp_socket

    if p2p_udp_socket:

        try:

            p2p_udp_socket.close()

        except: pass # 忽略关闭时的错误

    

    p2p_udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    try:

        p2p_udp_socket.bind(('0.0.0.0', local_port))

        p2p_udp_socket.settimeout(1.0) # 短超时用于打洞和ACK等待

        print(f"[P2P] UDP Socket 已在 0.0.0.0:{local_port} 上初始化并绑定。")

        return True

    except OSError as e:

        print(f"[P2P] 错误: 绑定 UDP Socket 到端口 {local_port} 失败: {e}")

        p2p_udp_socket = None

        return False



def send_p2p_control_message(sock, target_addr, message_type, sender_id, payload_dict=None):

    """发送P2P控制消息 (JSON格式)"""

    if not sock: return

    try:

        message = {"type": message_type, "sender_id": sender_id}

        if payload_dict:

            message.update(payload_dict)

        sock.sendto(json.dumps(message).encode('utf-8'), target_addr)

        # print(f"DEBUG: Sent control msg {message_type} to {target_addr}")

    except Exception as e:

        print(f"[P2P] 发送控制消息 {message_type} 失败: {e}")



def send_p2p_data_chunk(sock, target_addr, sender_id, sq_num, data_bytes):

    """发送P2P文件数据块 (头部 + 原始数据)"""

    if not sock: return

    try:

        # 头部格式: "DATA:<sender_id>:<sq_num>:" 注意末尾的冒号作为分隔符

        header = f"DATA:{sender_id}:{sq_num}:".encode('utf-8')

        packet = header + data_bytes

        sock.sendto(packet, target_addr)

        # print(f"DEBUG: Sent DATA chunk {sq_num} ({len(data_bytes)}B) to {target_addr}")

    except Exception as e:

        print(f"[P2P] 发送数据块 {sq_num} 失败: {e}")





def udp_listener():

    global p2p_udp_socket, p2p_stop_listener, received_messages_q, ack_events

    global file_offer_response_event, file_offer_accepted, file_transfer_active

    global p2p_target_actual_addr # 监听线程也可能在打洞阶段更新这个



    print("[P2P Listener] 监听线程已启动。")

    if not p2p_udp_socket:

        print("[P2P Listener] 错误: Socket未初始化。")

        return



    active_file_receive_info = {} # 用于存储正在接收的文件的信息



    while not p2p_stop_listener.is_set():

        try:

            # raw_data, addr = p2p_udp_socket.recvfrom(CHUNK_SIZE + 256) # 缓冲区稍大于块大小以容纳头部

            raw_data, addr = p2p_udp_socket.recvfrom(65536) # 确保能接收大的块

            

            # 尝试解析为JSON (控制消息)

            try:

                message_str = raw_data.decode('utf-8')

                msg_data = json.loads(message_str)

                msg_type = msg_data.get("type")

                sender_id = msg_data.get("sender_id")



                # print(f"DEBUG Listener: Received JSON msg: {msg_data} from {addr}")



                if msg_type == "PUNCH_SYN" or msg_type == "PUNCH_ACK_RESPONSE": # 来自打洞过程

                     if not p2p_target_actual_addr or p2p_target_actual_addr[0] != addr[0]:

                        print(f"[P2P Listener] 收到来自 {addr} 的打洞相关消息: {msg_type}")

                        p2p_target_actual_addr = addr # 更新对方的实际地址

                        # 如果是SYN,回复ACK_RESPONSE

                        if msg_type == "PUNCH_SYN":

                            send_p2p_control_message(p2p_udp_socket, addr, "PUNCH_ACK_RESPONSE", current_peer_id)

                     # 将打洞消息也放入队列,让主打洞逻辑感知

                     received_messages_q.put((msg_data, addr))





                elif msg_type == "MSG": # 文本消息

                    received_messages_q.put((msg_data, addr))

                

                elif msg_type == "FILE_OFFER":

                    if file_transfer_active:

                        print(f"[P2P Listener] 收到来自 {sender_id} 的文件提议,但当前正忙于另一传输,已拒绝。")

                        send_p2p_control_message(p2p_udp_socket, addr, "FILE_REJECT", current_peer_id, 

                                                 {"filename": msg_data.get("filename"), "reason": "Receiver busy"})

                        continue

                    

                    print(f"\n[P2P Listener] 收到来自 {sender_id} ({addr}) 的文件提议: ")

                    print(f"  文件名: {msg_data.get('filename')}")

                    print(f"  大小: {msg_data.get('filesize')} bytes")

                    print(f"  块大小: {msg_data.get('chunk_size')}")

                    print(f"  总块数: {msg_data.get('total_chunks')}")

                    

                    user_choice = input("接受此文件吗? (y/N): ").strip().lower()

                    if user_choice == 'y':

                        file_transfer_active = True

                        active_file_receive_info = {

                            "filename": msg_data.get('filename'),

                            "filesize": msg_data.get('filesize'),

                            "chunk_size": msg_data.get('chunk_size'),

                            "total_chunks": msg_data.get('total_chunks'),

                            "sender_id": sender_id,

                            "sender_addr": addr,

                            "output_file": None,

                            "received_chunks": set(),

                            "next_expected_chunk": 0

                        }

                        # 创建接收文件夹

                        receive_dir = "received_files_p2p"

                        if not os.path.exists(receive_dir):

                            os.makedirs(receive_dir)

                        

                        # 构建保存路径,避免路径遍历漏洞 (简单替换)

                        safe_filename = os.path.basename(active_file_receive_info["filename"])

                        filepath = os.path.join(receive_dir, safe_filename)

                        

                        try:

                            active_file_receive_info["output_file"] = open(filepath, "wb")

                            print(f"[P2P Listener] 接受文件提议,将保存到 {filepath}")

                            send_p2p_control_message(p2p_udp_socket, addr, "FILE_ACCEPT", current_peer_id, 

                                                     {"filename": active_file_receive_info["filename"]})

                        except IOError as e:

                            print(f"[P2P Listener] 错误: 无法打开文件 {filepath} 进行写入: {e}")

                            send_p2p_control_message(p2p_udp_socket, addr, "FILE_REJECT", current_peer_id, 

                                                     {"filename": active_file_receive_info["filename"], "reason": "Cannot write file"})

                            file_transfer_active = False

                            active_file_receive_info = {}



                    else:

                        print(f"[P2P Listener] 拒绝文件提议: {msg_data.get('filename')}")

                        send_p2p_control_message(p2p_udp_socket, addr, "FILE_REJECT", current_peer_id, 

                                                 {"filename": msg_data.get('filename')})



                elif msg_type == "FILE_ACCEPT":

                    if sender_id == p2p_target_peer_id: # 确保是目标peer的响应

                        print(f"[P2P Listener] {sender_id} 接受了文件 '{msg_data.get('filename')}'。")

                        file_offer_accepted = True

                        file_offer_response_event.set() # 通知发送方可以开始传输



                elif msg_type == "FILE_REJECT":

                    if sender_id == p2p_target_peer_id:

                        print(f"[P2P Listener] {sender_id} 拒绝了文件 '{msg_data.get('filename')}'. 原因: {msg_data.get('reason')}")

                        file_offer_accepted = False

                        file_offer_response_event.set()



                elif msg_type == "ACK": # ACK:<receiver_id>:<chunk_sq_num>

                    sq_num = msg_data.get("sq_num")

                    if sq_num is not None and sender_id == p2p_target_peer_id:

                        # print(f"DEBUG Listener: Received ACK for chunk {sq_num} from {sender_id}")

                        if sq_num in ack_events:

                            ack_events[sq_num].set() # 通知发送线程对应的ACK已收到

                        else:

                            # print(f"DEBUG Listener: Received unexpected ACK for chunk {sq_num} or for a non-active transfer.")

                            pass

                

                elif msg_type == "FILE_END":

                    if file_transfer_active and active_file_receive_info and \

                       sender_id == active_file_receive_info["sender_id"] and \

                       msg_data.get("filename") == active_file_receive_info["filename"]:

                        

                        print(f"[P2P Listener] {sender_id} 完成发送文件 '{active_file_receive_info['filename']}'.")

                        if active_file_receive_info["output_file"]:

                            active_file_receive_info["output_file"].close()

                        

                        # 简单校验:块数是否匹配

                        if len(active_file_receive_info["received_chunks"]) == active_file_receive_info["total_chunks"]:

                            print(f"[P2P Listener] 文件 '{active_file_receive_info['filename']}' 接收完毕且块数匹配。")

                            send_p2p_control_message(p2p_udp_socket, active_file_receive_info["sender_addr"], 

                                                     "FILE_VERIFY", current_peer_id, 

                                                     {"filename": active_file_receive_info["filename"], "status": "OK"})

                        else:

                            print(f"[P2P Listener] 警告: 文件 '{active_file_receive_info['filename']}' 接收完毕但块数不匹配! "

                                  f"收到 {len(active_file_receive_info['received_chunks'])}/{active_file_receive_info['total_chunks']}")

                            send_p2p_control_message(p2p_udp_socket, active_file_receive_info["sender_addr"], 

                                                     "FILE_VERIFY", current_peer_id, 

                                                     {"filename": active_file_receive_info["filename"], "status": "ERROR_CHUNK_MISMATCH"})

                        

                        file_transfer_active = False

                        active_file_receive_info = {}

                

                elif msg_type == "FILE_VERIFY":

                    if file_transfer_active and sender_id == p2p_target_peer_id: # 确保是发送方在等待这个

                        print(f"[P2P Listener] 收到来自 {sender_id} 的文件接收确认: '{msg_data.get('filename')}' 状态: {msg_data.get('status')}")

                        # 这里可以设置一个Event来通知发送线程对方已完全接收并校验

                        # 例如: file_final_verify_event.set()

                        # For now, just print.

                        file_transfer_active = False # 发送方也标记传输结束



                # 其他控制消息...



            except (json.JSONDecodeError, UnicodeDecodeError): # 不是JSON,尝试解析为数据块

                header_part = raw_data.split(b':', 3) # DATA:sender:sq_num:actual_data

                if len(header_part) == 4 and header_part[0] == b'DATA':

                    try:

                        data_sender_id = header_part[1].decode('utf-8')

                        data_sq_num = int(header_part[2].decode('utf-8'))

                        actual_data = header_part[3]



                        # print(f"DEBUG Listener: Received DATA chunk {data_sq_num} from {data_sender_id} ({len(actual_data)}B)")



                        if file_transfer_active and active_file_receive_info and \

                           data_sender_id == active_file_receive_info["sender_id"] and \

                           addr[0] == active_file_receive_info["sender_addr"][0]: # 确保来自正确的发送者IP

                            

                            # 简单停止等待:只接受期望的下一个块

                            if data_sq_num == active_file_receive_info["next_expected_chunk"]:

                                if active_file_receive_info["output_file"]:

                                    active_file_receive_info["output_file"].write(actual_data)

                                active_file_receive_info["received_chunks"].add(data_sq_num)

                                active_file_receive_info["next_expected_chunk"] += 1

                                

                                # print(f"  接收并写入块 {data_sq_num}。发送ACK。")

                                send_p2p_control_message(p2p_udp_socket, active_file_receive_info["sender_addr"], "ACK", 

                                                         current_peer_id, {"sq_num": data_sq_num})

                                

                                # 打印进度 (简单版)

                                progress = (len(active_file_receive_info["received_chunks"]) / active_file_receive_info["total_chunks"]) * 100

                                print(f"\r  接收进度: {progress:.2f}% ({len(active_file_receive_info['received_chunks'])}/{active_file_receive_info['total_chunks']})", end="")





                            # 如果是重复的已接收块 (对方可能没收到我们的ACK),重发ACK

                            elif data_sq_num < active_file_receive_info["next_expected_chunk"] and \

                                 data_sq_num in active_file_receive_info["received_chunks"]:

                                # print(f"  收到重复块 {data_sq_num}。重发ACK。")

                                send_p2p_control_message(p2p_udp_socket, active_file_receive_info["sender_addr"], "ACK", 

                                                         current_peer_id, {"sq_num": data_sq_num})

                            # else: (忽略乱序的块,在停止等待中)

                            #    print(f"  收到乱序块 {data_sq_num} (期望 {active_file_receive_info['next_expected_chunk']})。已忽略。")





                        # else: (不是当前活动传输的数据块,忽略)

                        #    print(f"  收到来自 {data_sender_id} 的数据块,但当前无匹配的接收任务。")



                    except Exception as e_parse:

                        print(f"[P2P Listener] 解析DATA块头部失败: {e_parse}, raw_header: {header_part[:3]}")

                # else: (无法识别的原始数据包,忽略)

                #    print(f"[P2P Listener] 收到来自 {addr} 的无法解析的原始数据: {raw_data[:50]}...")





        except socket.timeout:

            continue # 超时是正常的,继续检查 stop_event

        except OSError as e:

            if not p2p_stop_listener.is_set():

                 print(f"[P2P Listener] Socket错误: {e}")

            break

        except Exception as e:

            if not p2p_stop_listener.is_set():

                print(f"[P2P Listener] 未知错误: {e}")

            break

    print("[P2P Listener] 监听线程已停止。")





def attempt_udp_hole_punch(target_id: str, target_ip: str, target_port: int):

    global p2p_udp_socket, p2p_target_actual_addr, p2p_target_peer_id, p2p_target_reported_addr

    

    if not p2p_udp_socket:

        print("[P2P] 错误: 本地UDP Socket未初始化。")

        return False



    print(f"[P2P] 开始尝试与 {target_id} ({target_ip}:{target_port}) 进行UDP打洞...")

    p2p_target_peer_id = target_id

    p2p_target_reported_addr = (target_ip, target_port)

    p2p_target_actual_addr = None # 重置



    # 发送几次打洞包

    for i in range(3): #减少次数,依赖监听线程收到的PUNCH_SYN和回复的ACK_RESPONSE

        print(f"  发送打洞包 (PUNCH_SYN) 第 {i+1} 次到 {target_ip}:{target_port}")

        send_p2p_control_message(p2p_udp_socket, (target_ip, target_port), "PUNCH_SYN", current_peer_id)

        time.sleep(0.5) 



    print(f"[P2P] 等待来自 {target_id} 的回应 (最多5秒)...")

    start_time = time.time()

    responded_correctly = False

    while time.time() - start_time < 5: 

        try:

            msg_data, addr = received_messages_q.get_nowait() # 从监听线程队列获取

            msg_type = msg_data.get("type")

            sender_id = msg_data.get("sender_id")



            if addr[0] == target_ip and sender_id == target_id: # 确保是目标peer的IP和ID

                if msg_type == "PUNCH_SYN" or msg_type == "PUNCH_ACK_RESPONSE":

                    p2p_target_actual_addr = addr # 核心:使用对方实际发包的地址

                    print(f"[P2P] 收到来自 {sender_id} ({addr}) 的 {msg_type}。打洞可能成功!")

                    print(f"      对方实际地址已更新为: {p2p_target_actual_addr}")

                    if msg_type == "PUNCH_SYN": # 如果对方也发了SYN,我们回复ACK_RESPONSE

                         send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "PUNCH_ACK_RESPONSE", current_peer_id)

                    responded_correctly = True

                    break

        except queue.Empty:

            time.sleep(0.2)

            continue

        except Exception as e:

            print(f"  处理打洞回应时出错: {e}")

            time.sleep(0.2)

            continue

            

    if responded_correctly and p2p_target_actual_addr:

        print(f"[P2P] 与 {target_id} 的UDP打洞过程完成。")

        return True

    else:

        print(f"[P2P] 未能从 {target_id} 收到有效回应,打洞可能失败。")

        p2p_target_actual_addr = None

        return False



def _send_file_in_thread(filepath, filesize, chunk_size, total_chunks, target_addr, target_id_for_ack):

    global p2p_udp_socket, ack_events, file_transfer_active

    print(f"[文件发送线程] 开始发送 '{os.path.basename(filepath)}' ({total_chunks} 块)...")

    

    try:

        with open(filepath, "rb") as f:

            for i in range(total_chunks):

                if p2p_stop_listener.is_set(): # 检查是否被外部停止

                    print("[文件发送线程] 发送被中止。")

                    break



                chunk_data = f.read(chunk_size)

                if not chunk_data: # 文件提前结束?不太可能如果filesize正确

                    print(f"[文件发送线程] 错误: 读取文件块 {i} 时数据为空。")

                    break

                

                ack_events[i] = threading.Event() # 为当前块创建ACK事件

                

                retries_left = FILE_TRANSFER_RETRIES

                chunk_sent_successfully = False

                while retries_left > 0:

                    if p2p_stop_listener.is_set(): break

                    

                    # print(f"  发送块 {i}/{total_chunks-1} (尝试 {FILE_TRANSFER_RETRIES - retries_left + 1})...")

                    send_p2p_data_chunk(p2p_udp_socket, target_addr, current_peer_id, i, chunk_data)

                    

                    # 等待ACK

                    ack_received = ack_events[i].wait(timeout=FILE_TRANSFER_TIMEOUT)

                    

                    if ack_received:

                        # print(f"  收到块 {i} 的ACK。")

                        chunk_sent_successfully = True

                        progress = ((i + 1) / total_chunks) * 100

                        print(f"\r  发送进度: {progress:.2f}% ({i+1}/{total_chunks})", end="")

                        break 

                    else:

                        print(f"\n  块 {i} ACK超时。剩余尝试 {retries_left - 1} 次。")

                        retries_left -= 1

                

                del ack_events[i] # 清理事件



                if not chunk_sent_successfully:

                    print(f"\n[文件发送线程] 块 {i} 发送失败,已达最大重试次数。中止传输。")

                    file_transfer_active = False # 标记传输失败

                    return # 直接从线程退出



            if chunk_sent_successfully: # 只有所有块都成功才发送FILE_END

                print(f"\n[文件发送线程] 所有 {total_chunks} 块已发送并确认。发送FILE_END。")

                send_p2p_control_message(p2p_udp_socket, target_addr, "FILE_END", current_peer_id, 

                                         {"filename": os.path.basename(filepath)})

                # 此处可以等待对方的 FILE_VERIFY 消息,但为简化,我们只打印

            

    except IOError as e:

        print(f"[文件发送线程] 文件读写错误: {e}")

    except Exception as e:

        print(f"[文件发送线程] 未知错误: {e}")

    finally:

        print(f"[文件发送线程] '{os.path.basename(filepath)}' 发送处理完毕。")

        # file_transfer_active = False # 应该在收到FILE_VERIFY或超时后设置



def initiate_file_transfer():

    global p2p_target_actual_addr, p2p_target_peer_id, p2p_udp_socket

    global file_offer_response_event, file_offer_accepted, file_transfer_active, file_transfer_send_thread



    if not p2p_target_actual_addr:

        print("错误: 未建立P2P连接,或对方实际地址未知。请先进行打洞。")

        return

    if file_transfer_active:

        print("错误: 当前已有文件传输正在进行中。")

        return



    filepath = input("请输入要发送的完整文件路径: ").strip()

    if not os.path.exists(filepath) or not os.path.isfile(filepath):

        print(f"错误: 文件 '{filepath}' 不存在或不是一个文件。")

        return



    try:

        filesize = os.path.getsize(filepath)

        filename = os.path.basename(filepath)

        total_chunks = math.ceil(filesize / CHUNK_SIZE)

    except Exception as e:

        print(f"获取文件信息失败: {e}")

        return



    print(f"准备发送文件: '{filename}', 大小: {filesize} bytes, 分为 {total_chunks} 块 (每块 {CHUNK_SIZE} B)。")



    # 重置事件和状态

    file_offer_response_event.clear()

    file_offer_accepted = False

    

    # 发送FILE_OFFER

    send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "FILE_OFFER", current_peer_id,

                             {"filename": filename, "filesize": filesize, 

                              "chunk_size": CHUNK_SIZE, "total_chunks": total_chunks})

    

    print(f"已向 {p2p_target_peer_id} 发送文件提议,等待响应 (最多15秒)...")

    

    # 等待对方通过监听线程设置 file_offer_response_event

    offer_responded = file_offer_response_event.wait(timeout=15.0)



    if not offer_responded:

        print("对方未在规定时间内响应提议。")

        return

    

    if file_offer_accepted:

        print(f"{p2p_target_peer_id} 已接受文件。开始传输...")

        file_transfer_active = True # 标记传输开始

        # 创建并启动文件发送线程

        file_transfer_send_thread = threading.Thread(target=_send_file_in_thread, 

                                                     args=(filepath, filesize, CHUNK_SIZE, total_chunks, 

                                                           p2p_target_actual_addr, p2p_target_peer_id),

                                                     daemon=True)

        file_transfer_send_thread.start()

        # 主线程返回菜单,发送在后台进行

        print("文件正在后台发送。您可以在主菜单进行其他操作,或等待发送完成提示。")

    else:

        print(f"{p2p_target_peer_id} 拒绝了文件提议或发生错误。")





def start_p2p_chat_and_file_menu():

    global p2p_target_peer_id, p2p_target_reported_addr, p2p_target_actual_addr

    global p2p_listener_thread, p2p_stop_listener, received_messages_q, file_transfer_active



    if not current_peer_id or not current_p2p_port:

        print("请先注册您的 Peer ID 和 P2P 端口。")

        return



    # 如果当前已有P2P会话,先询问是否结束

    if p2p_target_peer_id:

        print(f"当前已连接到 {p2p_target_peer_id} ({p2p_target_actual_addr}).")

        choice = input("是否要结束当前P2P会话并尝试连接新的Peer? (y/N): ").strip().lower()

        if choice == 'y':

            stop_current_p2p_activity()

        else: # 进入当前会话的菜单

            pass # 下面的逻辑会处理



    # 如果没有活动P2P目标,或者用户选择连接新的,则进行查询和打洞

    if not p2p_target_actual_addr: # p2p_target_actual_addr 是打洞成功的标志

        peer_to_connect = input("请输入您想连接的 Peer ID: ").strip()

        if not peer_to_connect: return

        if peer_to_connect == current_peer_id:

            print("不能连接自己。"); return



        print(f"正在查询 {peer_to_connect} 的信息...")

        peer_info = query_peer(peer_to_connect)

        if not peer_info:

            print(f"未能获取 {peer_to_connect} 的信息。"); return

        

        print(f"获取到 {peer_to_connect} 的信息: IP={peer_info['public_ip']}, P2P Port={peer_info['p2p_port']}")

        

        stop_current_p2p_activity() # 确保旧的已停止



        p2p_stop_listener = threading.Event() # 重置

        received_messages_q = queue.Queue() # 重置



        if not initialize_p2p_socket(current_p2p_port): return



        p2p_listener_thread = threading.Thread(target=udp_listener, daemon=True)

        p2p_listener_thread.start()

        time.sleep(0.1)



        if not attempt_udp_hole_punch(peer_to_connect, peer_info['public_ip'], peer_info['p2p_port']):

            print(f"无法与 {peer_to_connect} 建立P2P连接。")

            stop_current_p2p_activity()

            return

    

    # --- P2P 会话菜单 ---

    print(f"\n--- P2P 会话已连接到 {p2p_target_peer_id} ({p2p_target_actual_addr}) ---")

    while p2p_target_actual_addr and not p2p_stop_listener.is_set(): # 只要连接有效

        print("\nP2P 操作:")

        print("1. 发送文本消息")

        print("2. 发送文件")

        print("3. 返回主菜单 (结束当前P2P会话)")

        

        # 检查接收到的消息 (只显示文本消息,文件消息由listener处理)

        try:

            msg_data, addr = received_messages_q.get_nowait()

            if p2p_target_actual_addr and addr[0] == p2p_target_actual_addr[0]:

                msg_type = msg_data.get("type")

                sender_id = msg_data.get("sender_id")

                if msg_type == "MSG":

                    text_msg = msg_data.get("text")

                    print(f"\r[{time.strftime('%H:%M:%S')}] {sender_id}: {text_msg}\n> ", end="")

                # 其他类型的消息(如打洞消息)已在listener或打洞逻辑中处理,或在此忽略

            else: # 来自未知源

                 print(f"\r收到来自未知源 {addr} 的P2P消息: {msg_data}\n> ", end="")

        except queue.Empty:

            pass

        except Exception as e:

            print(f"处理消息队列时出错: {e}")





        try:

            p2p_choice = input("> ").strip()

            if p2p_choice == '1':

                message_text = input("输入消息: ").strip()

                if message_text:

                    send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "MSG", 

                                             current_peer_id, {"text": message_text})

                    print(f"[{time.strftime('%H:%M:%S')}] 我: {message_text}")

            elif p2p_choice == '2':

                initiate_file_transfer()

            elif p2p_choice == '3':

                print("结束当前P2P会话...")

                send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "MSG",

                                         current_peer_id, {"text": "/quit_chat"}) # 通知对方

                stop_current_p2p_activity()

                break # 退出P2P会话循环

            else:

                if p2p_choice: # 如果输入了东西但不是有效选项

                    print("无效的P2P操作选项。")

        

        except KeyboardInterrupt:

             print("\n检测到 Ctrl+C,结束P2P会话...")

             if p2p_target_actual_addr:

                send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "MSG",

                                         current_peer_id, {"text": "/quit_chat_ctrl_c"})

             stop_current_p2p_activity()

             break

        except EOFError:

             print("\n输入流结束,结束P2P会话...")

             if p2p_target_actual_addr:

                send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "MSG",

                                         current_peer_id, {"text": "/quit_chat_eof"})

             stop_current_p2p_activity()

             break

        

        # 短暂休眠,让监听线程有机会处理消息并打印

        # 避免input()完全阻塞消息的显示 (虽然不是完美的解决方案)

        time.sleep(0.1)





def stop_current_p2p_activity():

    global p2p_listener_thread, p2p_stop_listener, p2p_udp_socket

    global p2p_target_peer_id, p2p_target_actual_addr, p2p_target_reported_addr

    global file_transfer_active, file_transfer_send_thread, ack_events



    print("[系统] 正在停止所有P2P活动...")

    file_transfer_active = False # 停止任何进行中的文件传输标记



    if p2p_stop_listener:

        p2p_stop_listener.set() # 发送停止信号



    if file_transfer_send_thread and file_transfer_send_thread.is_alive():

        print("  等待文件发送线程结束...")

        file_transfer_send_thread.join(timeout=2.0)

        if file_transfer_send_thread.is_alive():

            print("  警告: 文件发送线程未能正常停止。")

    file_transfer_send_thread = None

    ack_events.clear() # 清空ACK事件



    if p2p_listener_thread and p2p_listener_thread.is_alive():

        print("  等待监听线程结束...")

        p2p_listener_thread.join(timeout=2.0) 

        if p2p_listener_thread.is_alive():

            print("  警告: 监听线程未能正常停止。")

    p2p_listener_thread = None

    

    if p2p_udp_socket:

        print("  关闭UDP Socket。")

        try:

            p2p_udp_socket.close()

        except Exception as e_close:

            print(f"  关闭socket时出错: {e_close}")

        p2p_udp_socket = None

    

    # 清理队列中的剩余消息 (可选)

    while not received_messages_q.empty():

        try:

            received_messages_q.get_nowait()

        except queue.Empty:

            break

    

    print("[系统] P2P活动已清理。")

    p2p_target_peer_id = None

    p2p_target_actual_addr = None

    p2p_target_reported_addr = None



def main_menu():

    # ... (主菜单逻辑与上一个版本类似,但选项3会调用 start_p2p_chat_and_file_menu) ...

    # 确保注销和退出时调用 stop_current_p2p_activity()

    global current_peer_id, current_p2p_port



    if not current_peer_id:

        # ... (注册逻辑,与之前相同) ...

        print("\n--- P2P 客户端 (未注册) ---")

        my_id = input("请输入你的 Peer ID (例如: alice, bob): ").strip()

        if not my_id:

            print("Peer ID 不能为空。")

            return True 



        try:

            my_port_str = input(f"请输入你用于P2P的UDP端口 (50000-65000, 回车随机): ").strip()

            if not my_port_str:

                my_port = random.randint(50000, 65000)

                print(f"已随机生成P2P端口: {my_port}")

            else:

                my_port = int(my_port_str)

                if not (1024 < my_port < 65536):

                    print("端口号不合法,将使用随机端口。")

                    my_port = random.randint(50000, 65000)

            print(f"将使用P2P端口: {my_port}")

        except ValueError:

            print("无效的端口输入,将使用随机端口。")

            my_port = random.randint(50000, 65000)

            print(f"已随机生成P2P端口: {my_port}")



        if not register_peer(my_id, my_port):

            return True 

    

    print(f"\n--- P2P 客户端 (ID: {current_peer_id}, P2P端口: {current_p2p_port}) ---")

    print("选择操作:")

    print("1. [信令] 查询其他 Peer 的信息")

    print("2. [信令] 列出所有已注册的 Peers")

    print("3. [P2P]  开始/管理 P2P 会话 (聊天/文件)")

    print("4. [信令] 注销我的 Peer ID")

    print("5. [信令] 重新注册 (使用新的ID或端口)")

    print("6. 退出")



    choice = input("请输入选项 (1-6): ").strip()



    if choice == '1':

        peer_to_query = input("请输入要查询的 Peer ID: ").strip()

        info = query_peer(peer_to_query)

        if info:

             print(f"Peer '{peer_to_query}' 的信息: {info}")

    elif choice == '2':

        list_all_peers()

    elif choice == '3':

        start_p2p_chat_and_file_menu() # 进入P2P会话管理

    elif choice == '4':

        if current_peer_id:

            confirm = input(f"确定要注销 Peer ID '{current_peer_id}'吗? (y/N): ").strip().lower()

            if confirm == 'y':

                unregister_peer(current_peer_id) 

        else:

            print("你还没有注册任何 Peer ID。")

    elif choice == '5':

        if current_peer_id:

             print(f"你当前注册为 '{current_peer_id}'。重新注册会使用新的ID和端口。")

             unregister_peer(current_peer_id) 

        current_peer_id = None 

        current_p2p_port = None

    elif choice == '6':

        print("正在退出...")

        if current_peer_id:

            unregister_peer(current_peer_id)

        return False 

    else:

        print("无效选项,请重试。")

    return True



if __name__ == "__main__":

    print("欢迎使用 P2P 客户端 Demo (包含P2P文本聊天和文件传输)!")

    # ... (与之前相同的服务器连接检查) ...

    print(f"将连接到信令服务器: {SIGNALING_SERVER_URL}")

    print("请确保信令服务器正在运行。")

    print("------------------------------------")



    try:

        ping_response = requests.get(SIGNALING_SERVER_URL, timeout=3)

        if ping_response.status_code == 200:

            print("成功连接到信令服务器。")

        else:

            print(f"警告: 信令服务器响应状态码 {ping_response.status_code}。")

    except requests.exceptions.ConnectionError:

        print(f"错误: 无法连接到信令服务器 {SIGNALING_SERVER_URL}。请检查服务器。")

        exit()

    except requests.exceptions.Timeout:

        print(f"错误: 连接信令服务器超时。")

        exit()



    try:

        while True:

            if not main_menu():

                break

    except KeyboardInterrupt:

        print("\n检测到 Ctrl+C,正在退出程序...")

    finally:

        stop_current_p2p_activity() # 确保所有P2P资源被清理

        print("客户端已退出。")

其中SIGNALING_SERVER_URL全局变量需要修改为自己的信令服务器地址,端口默认为8000。

Last Updated 9/17/2025, 7:13:55 AM