-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain_no_sync.py
More file actions
207 lines (178 loc) · 10.6 KB
/
main_no_sync.py
File metadata and controls
207 lines (178 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import gradio as gr
import numpy as np
from PIL import Image
import requests
import os
import tempfile
import subprocess
# This function is no longer directly used for the primary "图像解码" tab functionality
# It might be useful if there was a separate audio effect/info display, but for reconstruction,
# we will use a new function.
# def decode_audio_file(audio_filepath):
# if audio_filepath is None:
# return "请先上传一个WAV音频文件。", None
# try:
# filename = os.path.basename(audio_filepath)
# info = f"音频文件: {filename}\n"
# info += "解码信息 (伪代码):\n"
# info += " - 步骤1: 加载音频数据...\n"
# info += " - 步骤2: 分析音频特征...\n"
# info += " - 步骤3: 提取隐藏信息或应用效果...\n"
# info += " - 步骤4: 解码完成。结果:'Audio Decoded Successfully!'\n"
# return info, audio_filepath
# except Exception as e:
# return f"处理音频时出错: {str(e)}", None
# New function to send audio to server and get a reconstructed image
def reconstruct_image_from_audio(audio_filepath):
if audio_filepath is None:
return "请先上传一个WAV音频文件。", None
temp_reconstructed_image_path = None
# Server URL for audio decoding to image
server_url = "http://jscc.wille.homjay.com:55000/decode_audio"
# Placeholder for the path where the decoded image will be saved (locally, before Gradio handles it)
# decoded_image_path = "temp_decoded_image.png" # Using NamedTemporaryFile is better
try:
print(f"[*] 正在将音频解码为图像:{audio_filepath} → Server")
with open(audio_filepath, 'rb') as audio_file_obj:
files = {'audio': (os.path.basename(audio_filepath), audio_file_obj, 'audio/wav')}
response = requests.post(server_url, files=files, timeout=60) # Increased timeout for potentially larger data
response.raise_for_status()
# Save the received image content to a temporary file
# Assuming the server returns a common image format like PNG or JPEG
# Forcing .png as a common, safe default. Server should ideally send Content-Type.
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_image_file_obj:
tmp_image_file_obj.write(response.content)
temp_reconstructed_image_path = tmp_image_file_obj.name
print(f"[+] 解码完成,重建图像文件已临时保存为:{temp_reconstructed_image_path}")
return f"音频解码成功!图像已重建。", temp_reconstructed_image_path
except requests.exceptions.Timeout:
return f"调用解码服务器超时 ({server_url}). 请检查服务器状态或网络连接。", None
except requests.exceptions.HTTPError as e:
error_detail = f"服务器响应内容: {e.response.text[:200]}" if e.response else "无详细响应内容"
return f"解码服务器返回错误: {e.response.status_code} - {e.response.reason}. {error_detail}", None
except requests.exceptions.RequestException as e:
return f"调用解码服务器时发生网络错误: {str(e)}", None
except Exception as e:
import traceback
print(f"处理音频解码时发生未知错误: {traceback.format_exc()}")
return f"处理音频解码时发生未知错误: {str(e)}", None
finally:
# Gradio will make a copy of temp_reconstructed_image_path if it's returned.
# So, we can attempt to clean it up here.
if temp_reconstructed_image_path and os.path.exists(temp_reconstructed_image_path):
try:
os.remove(temp_reconstructed_image_path)
print(f"[+] 已清理临时重建图像: {temp_reconstructed_image_path}")
except Exception as e_clean:
print(f"[!] 清理临时重建图像失败 {temp_reconstructed_image_path}: {e_clean}")
# encode_image_with_server 函数 - 输入UI将允许摄像头
def encode_image_with_server(image_pil_for_encode):
if image_pil_for_encode is None:
# 更新了提示信息以反映摄像头选项
return "请先上传图片或使用摄像头拍照。", None
temp_image_path = None
temp_audio_path = None
server_url_encode = "http://jscc.wille.homjay.com:55000/encode_image"
try:
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_image_file_obj:
image_pil_for_encode.save(tmp_image_file_obj.name, "PNG")
temp_image_path = tmp_image_file_obj.name
print(f"[*] 正在将图像编码为音频:{temp_image_path} → {server_url_encode}")
with open(temp_image_path, 'rb') as img_file:
files = {'image': (os.path.basename(temp_image_path), img_file, 'image/png')}
response = requests.post(server_url_encode, files=files, timeout=30)
response.raise_for_status()
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_audio_file_obj:
tmp_audio_file_obj.write(response.content)
temp_audio_path = tmp_audio_file_obj.name
print(f"[+] 编码完成,临时音频文件:{temp_audio_path}")
return f"图像编码成功!音频已生成,请在下方播放或下载。", temp_audio_path
except requests.exceptions.Timeout:
return f"调用编码服务器超时 ({server_url_encode}). 请检查服务器状态或网络连接。", None
except requests.exceptions.HTTPError as e:
error_detail = f"服务器响应内容: {e.response.text[:200]}" if e.response else "无详细响应内容"
return f"编码服务器返回错误: {e.response.status_code} - {e.response.reason}. {error_detail}", None
except requests.exceptions.RequestException as e:
return f"调用编码服务器时发生网络错误: {str(e)}", None
except Exception as e:
import traceback
print(f"处理图片编码时发生未知错误: {traceback.format_exc()}")
return f"处理图片编码时发生未知错误: {str(e)}", None
finally:
if temp_image_path and os.path.exists(temp_image_path):
try:
os.remove(temp_image_path)
print(f"[+] 已清理临时图片: {temp_image_path}")
except Exception as e_clean:
print(f"[!] 清理临时图片失败 {temp_image_path}: {e_clean}")
if temp_audio_path and os.path.exists(temp_audio_path):
print(f"[*] 音频文件已保存为:{temp_audio_path}")
# 在后台运行指定的 Python 脚本
def run_audio_script_background():
script_to_run = "test.py" # 假设 test.py 在同一目录或 Python 的 PATH 中
# 尝试找到 Python 解释器。通常 'python' 即可,但显式指定可能更可靠。
# 如果用户有特定的 venv, gradiorunner 激活的 python 可能是首选。
# 为简单起见,先用 'python'。
python_executable = "python"
command = [python_executable, script_to_run]
try:
# 使用 Popen 进行非阻塞后台执行
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 不等待 process.communicate() 或 process.wait() 以保持非阻塞
return f"音频文件已发送成功!"
except FileNotFoundError:
return f"错误: 无法找到 '{python_executable}' 命令或脚本 '{script_to_run}'。请确保 Python 已安装并在 PATH 中,且脚本存在。"
except Exception as e:
return f"启动脚本 '{script_to_run}' 时发生错误: {str(e)}"
# 创建 Gradio Blocks 界面
with gr.Blocks() as demo:
gr.Markdown("# 基于深度学习的图像传输系统") # 用户更新的应用标题
with gr.Tab("发送端"): # 用户更新的选项卡名称
gr.Markdown("上传一张图片或使用摄像头拍摄一张照片,然后点击\"图像编码\"按钮,将得到对图像进行编码后的音频文件。") # 用户更新的描述
# 第一行:图像输入和编码后的音频输出
with gr.Row():
with gr.Column(scale=3): # 左列:图像输入
encode_image_input = gr.Image(type="pil", label="上传图片或拍照 (编码)", sources=["upload", "webcam"])
with gr.Column(scale=3): # 右列:编码后的音频
encode_audio_output = gr.Audio(label="编码后的音频", type="filepath")
# 第二行:按钮和状态文本框
with gr.Row():
# 左下控制列:图像编码按钮和其状态
with gr.Column(scale=1, min_width=200):
encode_button = gr.Button("图像编码")
encode_output_text = gr.Textbox(label="编码状态", lines=5, interactive=False)
# 右下控制列:发送音频按钮和其状态
with gr.Column(scale=1, min_width=200):
run_script_button = gr.Button("发送音频") # 文本来自用户更新
# 标签来自用户更新,lines=5 以便与 encode_output_text 高度对齐
script_status_output = gr.Textbox(label="发送状态", lines=5, interactive=False)
# 事件处理器
encode_button.click(
fn=encode_image_with_server,
inputs=encode_image_input,
outputs=[encode_output_text, encode_audio_output]
)
run_script_button.click(
fn=run_audio_script_background,
inputs=None,
outputs=script_status_output
)
with gr.Tab("接收端"): # 用户更新的选项卡名称
gr.Markdown("上传接收到的WAV音频文件,然后点击\"图像重建\"按钮查看效果。")
with gr.Row():
with gr.Column(scale=1):
# Audio 输入, 仅允许上传
decode_audio_input = gr.Audio(type="filepath", label="上传WAV音频文件 (解码)", sources=["upload"])
decode_button = gr.Button("图像重建")
with gr.Column(scale=1):
# 更新了标签以更准确地反映其内容
decode_output_text = gr.Textbox(label="解码状态", lines=5, interactive=False)
# 将原来的 Audio 输出更改为 Image 输出以显示重建的图像
reconstructed_image_output = gr.Image(label="重建后的图像", type="filepath")
decode_button.click(
fn=reconstruct_image_from_audio, # 使用新的后端函数
inputs=decode_audio_input,
outputs=[decode_output_text, reconstructed_image_output] # 更新输出组件
)
if __name__ == "__main__":
demo.launch()