(项目)20251113大数据项目实施2——springboot编写脚本后台

二、封装模型重训练 + 重部署接口,实现状态监控并对接前端与 FastGPT

具体要做的事:

  • 编写脚本,封装模型重训练 + 重部署接口,支持从业务系统数据库获取数据用于重训练,训练完成后自动完成模型重部署;
  • 实现重训练 + 重部署的状态监控功能,实时跟踪流程进度(如数据获取中、训练中、部署中、完成 / 失败);
  • 提供两个对接前端的接口:一个用于触发重训练重部署的命令接口,另一个用于向前端推送 / 供前端获取状态的通知接口;
  • 确保重部署完成后,FastGPT 工作流调用的四个预测 API(进口价格 / 进口数量 / 出口价格 / 出口数量)能使用最新训练的模型进行预测。

现有三段功能不同的代码:

1.Trade_Transformer_LSTM_price.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
import logging
import os
from datetime import datetime

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import winsound
from sklearn.metrics import r2_score
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import pickle

from tqdm import tqdm

# Step 1: 加载和预处理数据
file_path = 'F:\Python\Transformer\demo\data\进口\merged_output.csv'
#file_path = 'F:\Python\Transformer\demo\data\出口\merged_output.csv'
df = pd.read_csv(file_path, encoding='UTF-8', thousands=',', low_memory=False)

# 提取年月列
df['year'] = df['数据年月'].astype(str).str[:4].astype(int)
df['month'] = df['数据年月'].astype(str).str[4:].astype(int)

# 特征列调整
categorical_cols = ['贸易伙伴编码', '商品编码', '贸易方式编码', '注册地编码', '计量单位']
continuous_cols = ['year', 'month', '数量', '人民币'] # 包含连续变量(已包含年月)
target_cols = ['单价'] # 原始目标变量名

# 清理数据
df[target_cols] = df[target_cols].replace({'-': np.nan, '': np.nan}, regex=False)
df[target_cols] = df[target_cols].apply(pd.to_numeric, errors='coerce')

# 在清洗完数据之后、标准化之前添加:
df[target_cols] = df[target_cols].clip(lower=0) # 先确保非负
df['log_单价'] = np.log1p(df['单价'])
target_cols_log = ['log_单价']

# 标准化 log 变换后的目标变量
scaler_y = StandardScaler()
df[target_cols_log] = scaler_y.fit_transform(df[target_cols_log])

# 清理连续变量中的异常值
for col in continuous_cols:
df[col] = df[col].replace({'-': np.nan, '': np.nan}, regex=False)
df[col] = pd.to_numeric(df[col], errors='coerce')

# 删除含有 NaN 的行(包括类别、连续和目标变量)
df.dropna(subset=categorical_cols + continuous_cols + target_cols_log, inplace=True)

# 对类别变量做 Label Encoding
label_encoders = {}
for col in categorical_cols:
le = LabelEncoder()
df[col] = le.fit_transform(df[col].astype(str)) # 强制转字符串防止报错
label_encoders[col] = le

# 归一化连续变量(包括 year 和 month)
scaler_X = StandardScaler()
df[continuous_cols] = scaler_X.fit_transform(df[continuous_cols])

# 按商品分组创建时间序列
grouped = df.groupby('商品编码')
sequences = []
seq_length = 12#历史窗口长度

for name, group in grouped:
group = group.sort_values(['year', 'month'])
if len(group) < seq_length + 1: # 确保有足够数据点
continue

# 创建序列
X_cat_group = group[categorical_cols].values
X_cont_group = group[continuous_cols].values
y_group = group[target_cols_log].values

for i in range(len(X_cat_group) - seq_length):
x_cat_seq = X_cat_group[i:i + seq_length]
x_cont_seq = X_cont_group[i:i + seq_length]
y_val = y_group[i + seq_length]
sequences.append((x_cat_seq, x_cont_seq, y_val))

# 转换为数组
if sequences:
X_cat, X_cont, y = zip(*sequences)
X_cat = np.array(X_cat)
X_cont = np.array(X_cont)
y = np.array(y)
else:
raise ValueError("没有足够数据创建序列,请检查数据或减小seq_length")

# 分割训练集和测试集(不打乱)
X_cat_train, X_cat_test, X_cont_train, X_cont_test, y_train, y_test = train_test_split(
X_cat, X_cont, y, test_size=0.2, shuffle=False
)

# 转换为PyTorch张量
X_cat_train_tensor = torch.tensor(X_cat_train, dtype=torch.long)
X_cont_train_tensor = torch.tensor(X_cont_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)

X_cat_test_tensor = torch.tensor(X_cat_test, dtype=torch.long)
X_cont_test_tensor = torch.tensor(X_cont_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# 创建DataLoader
train_dataset = torch.utils.data.TensorDataset(X_cat_train_tensor, X_cont_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) # 增大batch_size
test_dataset = torch.utils.data.TensorDataset(X_cat_test_tensor, X_cont_test_tensor, y_test_tensor)
test_loader = DataLoader(
test_dataset,
batch_size=16, # 小批量验证
shuffle=False,
num_workers=0,
pin_memory=False if not torch.cuda.is_available() else True
)

#早停机制类
class EarlyStopping:
def __init__(self, patience=10, verbose=False, delta=0, path='F:\\Python\\Transformer\\demo\\model\\trade_Transformer_LSTM\\in\\checkpoint_price.pth'):
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_score = None
self.early_stop = False
self.val_loss_min = np.Inf
self.delta = delta
self.path = path

def __call__(self, val_loss, model):
score = -val_loss

if self.best_score is None:
self.best_score = score
self.save_checkpoint(val_loss, model)
elif score < self.best_score + self.delta:
self.counter += 1
print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(val_loss, model)
self.counter = 0

def save_checkpoint(self, val_loss, model):
if self.verbose:
print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model...')
torch.save(model.state_dict(), self.path)
self.val_loss_min = val_loss

# 方案一:使用可学习的位置编码
class LearnablePositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super().__init__()
self.position_embeddings = nn.Embedding(max_len, d_model)

def forward(self, x):
positions = torch.arange(0, x.size(1), device=x.device).expand(x.size(0), -1)
x = x + self.position_embeddings(positions)
return x

# 混合 LSTM + Transformer 模型
class LSTMTransformer(nn.Module):
def __init__(self, num_embeddings_list, continuous_dim, model_dim=64, hidden_size=64,
num_heads=4, num_layers=2, dropout=0.3):
super().__init__()
self.cat_embeddings = nn.ModuleList([
nn.Embedding(num_emb, model_dim) for num_emb in num_embeddings_list
])
self.cont_proj = nn.Linear(continuous_dim, model_dim)
self.pos_encoder = LearnablePositionalEncoding(model_dim)

self.lstm = nn.LSTM(model_dim, hidden_size, batch_first=True, bidirectional=False)
encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_size, nhead=num_heads, dropout=dropout)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

self.norm = nn.LayerNorm(hidden_size)
self.fc_out = nn.Linear(hidden_size, 1)

def forward(self, cat_inputs, cont_inputs):
# Embeddings
embedded_cat = torch.stack([emb(cat_inputs[:, :, i]) for i, emb in enumerate(self.cat_embeddings)], dim=0).sum(dim=0)
embedded_cont = self.cont_proj(cont_inputs)
x = embedded_cat + embedded_cont
x = self.pos_encoder(x)

# LSTM
x, _ = self.lstm(x)

# Transformer
x = x.permute(1, 0, 2) # [S, B, D]
x = self.transformer(x)
x = x.mean(dim=0) # 时间维度平均
x = self.norm(x)
return self.fc_out(x)

# Step 4: 初始化模型 & 训练
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 获取每个类别特征的唯一值数量
num_embeddings_list = [df[col].nunique() for col in categorical_cols]

# 方案二:混合 LSTM + Transformer 模型
model = LSTMTransformer(
num_embeddings_list=[df[col].nunique() for col in categorical_cols],
continuous_dim=len(continuous_cols),
model_dim=128,
hidden_size=128,
num_heads=64,
num_layers=10,
dropout=0.6
).to(device)

criterion = nn.MSELoss(0.5)# 给金额较低权重
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)

epochs = 1000
best_loss = float('inf')
losses = []
val_losses = []

early_stopping = EarlyStopping(patience=100, verbose=True)#patience表示耐心程度,当连续多少个epoch without improvement时,停止训练

print("开始训练...")
for epoch in range(epochs):
model.train()
total_loss = 0
loop = tqdm(train_loader, desc=f"Epoch [{epoch + 1}/{epochs}]")

for x_cat_batch, x_cont_batch, y_batch in loop:
x_cat_batch, x_cont_batch, y_batch = x_cat_batch.to(device), x_cont_batch.to(device), y_batch.to(device)

optimizer.zero_grad()
outputs = model(x_cat_batch, x_cont_batch)
loss = criterion(outputs, y_batch)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # 防止梯度爆炸
optimizer.step()

total_loss += loss.item()
loop.set_postfix(loss=loss.item())

avg_train_loss = total_loss / len(train_loader)
losses.append(avg_train_loss)

# 验证
model.eval()
val_loss = 0.0
with torch.no_grad():
for x_cat_val, x_cont_val, y_val in test_loader:
x_cat_val, x_cont_val, y_val = x_cat_val.to(device), x_cont_val.to(device), y_val.to(device)
val_outputs = model(x_cat_val, x_cont_val)
loss = criterion(val_outputs, y_val)
val_loss += loss.item() * x_cat_val.size(0)

val_loss /= len(test_loader.dataset)

scheduler.step(val_loss)
val_losses.append(val_loss)

print(f"Epoch {epoch + 1}/{epochs} | Train Loss: {avg_train_loss:.4f} | Val Loss: {val_loss:.4f}")
logging.basicConfig(filename='training_Trade_Transformer_Price.log', level=logging.INFO)
logging.info(f"Epoch {epoch + 1}/{epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {val_loss:.4f}")
early_stopping(val_loss, model)
if early_stopping.early_stop:
print("Early stopping")
break

# 保存模型
save_path = 'F:\\Python\\Transformer\\demo\\model\\trade_Transformer_LSTM\\in\\'
os.makedirs(os.path.dirname(save_path), exist_ok=True)

torch.save(model.state_dict(), save_path+'model_price.pth')

# Step 5: 评估最佳模型
print("开始验证...")
model.load_state_dict(torch.load(save_path+'model_price.pth'))
model.eval()

all_preds = []
all_true = []

with torch.no_grad():
for x_cat_batch, x_cont_batch, y_batch in test_loader:
x_cat_batch = x_cat_batch.to(device)
x_cont_batch = x_cont_batch.to(device)

outputs = model(x_cat_batch, x_cont_batch)

all_preds.append(outputs.cpu().numpy())
all_true.append(y_batch.cpu().numpy())

# 合并所有批次的结果
y_pred = np.concatenate(all_preds, axis=0)
y_true = np.concatenate(all_true, axis=0)

# 反标准化 + exp
y_pred_unscaled = np.expm1(scaler_y.inverse_transform(y_pred))
y_true_unscaled = np.expm1(scaler_y.inverse_transform(y_true))

pred_renminbi = y_pred_unscaled

true_renminbi = y_true_unscaled


# 计算指标
def evaluate(name, true, pred):
r2 = r2_score(true, pred)
mae = np.mean(np.abs(true - pred))
mse = np.mean((true - pred) ** 2)
print(f"\n{name} 评估:")
print(f"R² Score: {r2:.4f}")
print(f"MSE: {mse:.4f}")
print(f"MAE: {mae:.2f}")
return r2, mse, mae

# 数量评估
r2_q, mse_q, mae_q = evaluate("单价", true_renminbi, pred_renminbi)
logging.basicConfig(filename='training_Trade_Transformer_Price.log', level=logging.INFO)
time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logging.info(f"datetime:{datetime}, R2:{r2_q}, MSE:{mse_q}, MAE:{mae_q}")


# 打印部分样本
print("\n部分预测值 VS 真实值:")
for i in range(5):
print(f"预测: 单价={pred_renminbi[i].item():.2f}"
f"真实: 单价={true_renminbi[i].item():.2f}")

# --- 绘图部分 ---
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

# 绘制预测 vs 真实曲线
plt.figure(figsize=(12, 6))
plt.plot(true_renminbi, label='真实单价', color='blue')
plt.plot(pred_renminbi, label='预测单价', color='red', linestyle='--')
plt.title("单价:预测值 vs 真实值")
plt.xlabel("样本索引")
plt.ylabel("单价")
plt.legend()
plt.grid(True)
plt.annotate(f'R2={r2_q:.3f}', xy=(0.05, 0.9), xycoords='axes fraction', fontsize=12, color='green')
plt.annotate(f'MSE={mse_q:.3f}', xy=(0.05, 0.8), xycoords='axes fraction', fontsize=12, color='green')
plt.annotate(f'MAE={mae_q:.3f}', xy=(0.05, 0.7), xycoords='axes fraction', fontsize=12, color='green')
plt.tight_layout()
plt.savefig(save_path+'predicted_vs_true_price.png')
plt.close()

# 绘制误差分布直方图
errors = y_true.flatten() - y_pred.flatten()
plt.figure(figsize=(8, 5))
plt.hist(errors, bins=30, color='skyblue', edgecolor='black')
plt.axvline(0, color='red', linestyle='dashed', linewidth=1)
plt.title("预测误差分布", fontsize=14)
plt.xlabel("误差 = 真实值 - 预测值", fontsize=12)
plt.ylabel("频数", fontsize=12)
plt.grid(True)
plt.tight_layout()
plt.savefig(save_path+'error_distribution_price.png')
plt.close()

# 绘制 Loss 曲线(训练过程中记录的 losses)
plt.figure(figsize=(10, 6))
plt.plot(losses, label='Training Loss', color='blue')
plt.plot(val_losses, label='Validation Loss', color='red')
plt.xlabel('Epoch')
plt.ylabel('Loss (MSE)')
plt.title('训练损失曲线', fontsize=14)
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(save_path+'training_loss_curve_price.png')
plt.close()

# 保存 scaler 和 label encoders
with open(save_path+'scaler_X_price.pkl', 'wb') as f:
pickle.dump(scaler_X, f)

with open(save_path+'scaler_y_price.pkl', 'wb') as f:
pickle.dump(scaler_y, f)

with open(save_path+'label_encoders_price.pkl', 'wb') as f:
pickle.dump(label_encoders, f)

print("✅ 模型和预处理器已成功保存!")
# 同步播放(程序会暂停直到播放结束)
winsound.PlaySound("F:\\Python\\训练完成.wav", winsound.SND_FILENAME)

2.Trade_Transformer_LSTM_quantity.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
import logging
import os
from datetime import datetime

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import winsound
from sklearn.metrics import r2_score
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import pickle

from tqdm import tqdm

# Step 1: 加载和预处理数据
#file_path = 'F:\Python\Transformer\demo\data\进口\merged_output.csv'
file_path = 'F:\Python\Transformer\demo\data\出口\merged_output.csv'
#file_path = 'E:\\admin\\Desktop\\2.csv'
df = pd.read_csv(file_path, encoding='UTF-8', thousands=',', low_memory=False)

# 提取年月列
df['year'] = df['数据年月'].astype(str).str[:4].astype(int)
df['month'] = df['数据年月'].astype(str).str[4:].astype(int)

# 特征列调整
categorical_cols = ['贸易伙伴编码', '商品编码', '贸易方式编码', '注册地编码', '计量单位']
continuous_cols = ['year', 'month', '数量', '人民币'] # 包含连续变量(已包含年月)
target_cols = ['数量'] # 原始目标变量名

# 清理数据
df[target_cols] = df[target_cols].replace({'-': np.nan, '': np.nan}, regex=False)
df[target_cols] = df[target_cols].apply(pd.to_numeric, errors='coerce')

# 在清洗完数据之后、标准化之前添加:
df[target_cols] = df[target_cols].clip(lower=0) # 先确保非负
df['log_数量'] = np.log1p(df['数量'])
target_cols_log = ['log_数量']

# 标准化 log 变换后的目标变量
scaler_y = StandardScaler()
df[target_cols_log] = scaler_y.fit_transform(df[target_cols_log])

# 清理连续变量中的异常值
for col in continuous_cols:
df[col] = df[col].replace({'-': np.nan, '': np.nan}, regex=False)
df[col] = pd.to_numeric(df[col], errors='coerce')

# 删除含有 NaN 的行(包括类别、连续和目标变量)
df.dropna(subset=categorical_cols + continuous_cols + target_cols_log, inplace=True)

# 对类别变量做 Label Encoding
label_encoders = {}
for col in categorical_cols:
le = LabelEncoder()
df[col] = le.fit_transform(df[col].astype(str)) # 强制转字符串防止报错
label_encoders[col] = le

# 归一化连续变量(包括 year 和 month)
scaler_X = StandardScaler()
df[continuous_cols] = scaler_X.fit_transform(df[continuous_cols])

# 按商品分组创建时间序列
grouped = df.groupby('商品编码')
sequences = []
seq_length = 12#历史窗口长度

for name, group in grouped:
group = group.sort_values(['year', 'month'])
if len(group) < seq_length + 1: # 确保有足够数据点
continue

# 创建序列
X_cat_group = group[categorical_cols].values
X_cont_group = group[continuous_cols].values
y_group = group[target_cols_log].values

for i in range(len(X_cat_group) - seq_length):
x_cat_seq = X_cat_group[i:i + seq_length]
x_cont_seq = X_cont_group[i:i + seq_length]
y_val = y_group[i + seq_length]
sequences.append((x_cat_seq, x_cont_seq, y_val))

# 转换为数组
if sequences:
X_cat, X_cont, y = zip(*sequences)
X_cat = np.array(X_cat)
X_cont = np.array(X_cont)
y = np.array(y)
else:
raise ValueError("没有足够数据创建序列,请检查数据或减小seq_length")

# 分割训练集和测试集(不打乱)
X_cat_train, X_cat_test, X_cont_train, X_cont_test, y_train, y_test = train_test_split(
X_cat, X_cont, y, test_size=0.2, shuffle=False
)

# 转换为PyTorch张量
X_cat_train_tensor = torch.tensor(X_cat_train, dtype=torch.long)
X_cont_train_tensor = torch.tensor(X_cont_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)

X_cat_test_tensor = torch.tensor(X_cat_test, dtype=torch.long)
X_cont_test_tensor = torch.tensor(X_cont_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# 创建DataLoader
train_dataset = torch.utils.data.TensorDataset(X_cat_train_tensor, X_cont_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) # 增大batch_size
test_dataset = torch.utils.data.TensorDataset(X_cat_test_tensor, X_cont_test_tensor, y_test_tensor)
test_loader = DataLoader(
test_dataset,
batch_size=32, # 小批量验证
shuffle=False,
num_workers=0,
pin_memory=False if not torch.cuda.is_available() else True
)

CheckPointModelPath = 'F:\\Python\\Transformer\\demo\\model\\trade_Transformer_LSTM\\out\\checkpoint_quantity.pth'
#CheckPointModelPath = 'F:\\Python\\Transformer\\demo\\model\\temp\\in\\checkpoint_quantity.pth'
#早停机制类
class EarlyStopping:
def __init__(self, patience=10, verbose=False, delta=0, path=CheckPointModelPath):
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_score = None
self.early_stop = False
self.val_loss_min = np.Inf
self.delta = delta
self.path = path

def __call__(self, val_loss, model):
score = -val_loss

if self.best_score is None:
self.best_score = score
self.save_checkpoint(val_loss, model)
elif score < self.best_score + self.delta:
self.counter += 1
print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(val_loss, model)
self.counter = 0

def save_checkpoint(self, val_loss, model):
if self.verbose:
print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model...')
torch.save(model.state_dict(), self.path)
self.val_loss_min = val_loss

# 方案一:使用可学习的位置编码
class LearnablePositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super().__init__()
self.position_embeddings = nn.Embedding(max_len, d_model)

def forward(self, x):
positions = torch.arange(0, x.size(1), device=x.device).expand(x.size(0), -1)
x = x + self.position_embeddings(positions)
return x

# 混合 LSTM + Transformer 模型
class LSTMTransformer(nn.Module):
def __init__(self, num_embeddings_list, continuous_dim, model_dim=64, hidden_size=64,
num_heads=4, num_layers=2, dropout=0.3):
super().__init__()
self.cat_embeddings = nn.ModuleList([
nn.Embedding(num_emb, model_dim) for num_emb in num_embeddings_list
])
self.cont_proj = nn.Linear(continuous_dim, model_dim)
self.pos_encoder = LearnablePositionalEncoding(model_dim)

self.lstm = nn.LSTM(model_dim, hidden_size, batch_first=True, bidirectional=False)
encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_size, nhead=num_heads, dropout=dropout)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

self.norm = nn.LayerNorm(hidden_size)
self.fc_out = nn.Linear(hidden_size, 1)

def forward(self, cat_inputs, cont_inputs):
# Embeddings
embedded_cat = torch.stack([emb(cat_inputs[:, :, i]) for i, emb in enumerate(self.cat_embeddings)], dim=0).sum(dim=0)
embedded_cont = self.cont_proj(cont_inputs)
x = embedded_cat + embedded_cont
x = self.pos_encoder(x)

# LSTM
x, _ = self.lstm(x)

# Transformer
x = x.permute(1, 0, 2) # [S, B, D]
x = self.transformer(x)
x = x.mean(dim=0) # 时间维度平均
x = self.norm(x)
return self.fc_out(x)

# Step 4: 初始化模型 & 训练
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 获取每个类别特征的唯一值数量
num_embeddings_list = [df[col].nunique() for col in categorical_cols]

model = LSTMTransformer(
num_embeddings_list=[df[col].nunique() for col in categorical_cols],
continuous_dim=len(continuous_cols),
model_dim=128,#输入维度大小
hidden_size=128,#隐藏层大小
num_heads=16,#注意力头数
num_layers=5,#Transformer层的数量
dropout=0.6
).to(device)

criterion = nn.MSELoss(1.0)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)

epochs = 150
best_loss = float('inf')
losses = []
val_losses = []

early_stopping = EarlyStopping(patience=100, verbose=True)#patience表示耐心程度,当连续多少个epoch without improvement时,停止训练

print("开始训练...")
for epoch in range(epochs):
model.train()
total_loss = 0
loop = tqdm(train_loader, desc=f"Epoch [{epoch + 1}/{epochs}]")

for x_cat_batch, x_cont_batch, y_batch in loop:
x_cat_batch, x_cont_batch, y_batch = x_cat_batch.to(device), x_cont_batch.to(device), y_batch.to(device)

optimizer.zero_grad()
outputs = model(x_cat_batch, x_cont_batch)
loss = criterion(outputs, y_batch)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # 防止梯度爆炸
optimizer.step()

total_loss += loss.item()
loop.set_postfix(loss=loss.item())

avg_train_loss = total_loss / len(train_loader)
losses.append(avg_train_loss)

# 验证
model.eval()
val_loss = 0.0
with torch.no_grad():
for x_cat_val, x_cont_val, y_val in test_loader:
x_cat_val, x_cont_val, y_val = x_cat_val.to(device), x_cont_val.to(device), y_val.to(device)
val_outputs = model(x_cat_val, x_cont_val)
loss = criterion(val_outputs, y_val)
val_loss += loss.item() * x_cat_val.size(0)

val_loss /= len(test_loader.dataset)

scheduler.step(val_loss)
val_losses.append(val_loss)

print(f"Epoch {epoch + 1}/{epochs} | Train Loss: {avg_train_loss:.4f} | Val Loss: {val_loss:.4f}")
logging.basicConfig(filename='training_Trade_Transformer_Quantity.log', level=logging.INFO)
logging.info(f"Epoch {epoch + 1}/{epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {val_loss:.4f}")
early_stopping(val_loss, model)
if early_stopping.early_stop:
print("Early stopping")
break

# 保存模型
save_path = 'F:\\Python\\Transformer\\demo\\model\\trade_Transformer_LSTM\\out\\'
#save_path = 'F:\\Python\\Transformer\\demo\\model\\temp\\in\\'
os.makedirs(os.path.dirname(save_path), exist_ok=True)

torch.save(model.state_dict(), save_path+'model_quantity.pth')

# Step 5: 评估最佳模型
print("开始验证...")
model.load_state_dict(torch.load(save_path+'model_quantity.pth'))
model.eval()

all_preds = []
all_true = []

with torch.no_grad():
for x_cat_batch, x_cont_batch, y_batch in test_loader:
x_cat_batch = x_cat_batch.to(device)
x_cont_batch = x_cont_batch.to(device)

outputs = model(x_cat_batch, x_cont_batch)

all_preds.append(outputs.cpu().numpy())
all_true.append(y_batch.cpu().numpy())

# 合并所有批次的结果
y_pred = np.concatenate(all_preds, axis=0)
y_true = np.concatenate(all_true, axis=0)

# 反标准化 + exp
y_pred_unscaled = np.expm1(scaler_y.inverse_transform(y_pred))
y_true_unscaled = np.expm1(scaler_y.inverse_transform(y_true))

pred_quantity = y_pred_unscaled

true_quantity = y_true_unscaled

# 计算指标
def evaluate(name, true, pred):
r2 = r2_score(true, pred)
mae = np.mean(np.abs(true - pred))
mse = np.mean((true - pred) ** 2)
print(f"\n{name} 评估:")
print(f"R² Score: {r2:.4f}")
print(f"MSE: {mse:.4f}")
print(f"MAE: {mae:.2f}")
return r2, mse, mae

# 数量评估
r2_q, mse_q, mae_q = evaluate("数量", true_quantity, pred_quantity)
logging.basicConfig(filename='training_Trade_Transformer_Quantity.log', level=logging.INFO)
time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logging.info(f"datetime:{datetime}, R2:{r2_q}, MSE:{mse_q}, MAE:{mae_q}")


# 打印部分样本
print("\n部分预测值 VS 真实值:")
for i in range(5):
print(f"预测: 数量={pred_quantity[i].item():.2f}"
f"真实: 数量={true_quantity[i].item():.2f}")

# --- 绘图部分 ---
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

# 绘制预测 vs 真实曲线
plt.figure(figsize=(12, 6))
plt.plot(true_quantity, label='真实数量', color='blue')
plt.plot(pred_quantity, label='预测数量', color='red', linestyle='--')
plt.title("数量:预测值 vs 真实值")
plt.xlabel("样本索引")
plt.ylabel("数量")
plt.legend()
plt.grid(True)
plt.annotate(f'R2={r2_q:.3f}', xy=(0.05, 0.9), xycoords='axes fraction', fontsize=12, color='green')
plt.annotate(f'MSE={mse_q:.3f}', xy=(0.05, 0.8), xycoords='axes fraction', fontsize=12, color='green')
plt.annotate(f'MAE={mae_q:.3f}', xy=(0.05, 0.7), xycoords='axes fraction', fontsize=12, color='green')
plt.tight_layout()
plt.savefig(save_path+'predicted_vs_true_quantity.png')
plt.close()

# 绘制误差分布直方图
errors = y_true.flatten() - y_pred.flatten()
plt.figure(figsize=(8, 5))
plt.hist(errors, bins=30, color='skyblue', edgecolor='black')
plt.axvline(0, color='red', linestyle='dashed', linewidth=1)
plt.title("预测误差分布", fontsize=14)
plt.xlabel("误差 = 真实值 - 预测值", fontsize=12)
plt.ylabel("频数", fontsize=12)
plt.grid(True)
plt.tight_layout()
plt.savefig(save_path+'error_distribution_quantity.png')
plt.close()

# 绘制 Loss 曲线(训练过程中记录的 losses)
plt.figure(figsize=(10, 6))
plt.plot(losses, label='Training Loss', color='blue')
plt.plot(val_losses, label='Validation Loss', color='red')
plt.xlabel('Epoch')
plt.ylabel('Loss (MSE)')
plt.title('训练损失曲线', fontsize=14)
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(save_path+'training_loss_curve_quantity.png')
plt.close()

# 保存 scaler 和 label encoders
with open(save_path+'scaler_X_quantity.pkl', 'wb') as f:
pickle.dump(scaler_X, f)

with open(save_path+'scaler_y_quantity.pkl', 'wb') as f:
pickle.dump(scaler_y, f)

with open(save_path+'label_encoders_quantity.pkl', 'wb') as f:
pickle.dump(label_encoders, f)

print("✅ 模型和预处理器已成功保存!")
# 同步播放(程序会暂停直到播放结束)
winsound.PlaySound("F:\\Python\\训练完成.wav", winsound.SND_FILENAME)

3.app.py/Trade_FlaskApp_Transformer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
import pickle
from datetime import datetime

import pandas as pd
import pymysql
from flask import Flask, request, jsonify
import torch
import torch.nn as nn
import numpy as np
from sklearn.preprocessing import StandardScaler

app = Flask(__name__)

# 加载模型参数
# 进口
# 单价
IN_PRICE_MODEL_PATH = 'model/trade_Transformer/in/model_price.pth'
IN_PRICE_SCALER_X_PATH = 'model/trade_Transformer/in/scaler_X_price.pkl'
IN_PRICE_SCALER_Y_PATH = 'model/trade_Transformer/in/scaler_y_price.pkl'
IN_PRICE_LABEL_ENCODERS_PATH = 'model/trade_Transformer/in/label_encoders_price.pkl'

# 数量
IN_QUANTITY_MODEL_PATH = 'model/trade_Transformer/in/model_quantity.pth'
IN_QUANTITY_SCALER_X_PATH = 'model/trade_Transformer/in/scaler_X_quantity.pkl'
IN_QUANTITY_SCALER_Y_PATH = 'model/trade_Transformer/in/scaler_y_quantity.pkl'
IN_QUANTITY_LABEL_ENCODERS_PATH = 'model/trade_Transformer/in/label_encoders_quantity.pkl'

# 出口
# 单价
OUT_PRICE_MODEL_PATH = 'model/trade_Transformer/out/model_price.pth'
OUT_PRICE_SCALER_X_PATH = 'model/trade_Transformer/out/scaler_X_price.pkl'
OUT_PRICE_SCALER_Y_PATH = 'model/trade_Transformer/out/scaler_y_price.pkl'
OUT_PRICE_LABEL_ENCODERS_PATH = 'model/trade_Transformer/out/label_encoders_price.pkl'

# 数量
OUT_QUANTITY_MODEL_PATH = 'model/trade_Transformer/out/model_quantity.pth'
OUT_QUANTITY_SCALER_X_PATH = 'model/trade_Transformer/out/scaler_X_quantity.pkl'
OUT_QUANTITY_SCALER_Y_PATH = 'model/trade_Transformer/out/scaler_y_quantity.pkl'
OUT_QUANTITY_LABEL_ENCODERS_PATH = 'model/trade_Transformer/out/label_encoders_quantity.pkl'

# 定义类别特征列和连续特征列(必须与训练时一致)
categorical_cols = ['贸易伙伴编码', '商品编码', '贸易方式编码', '注册地编码', '计量单位']
continuous_cols = ['year', 'month', '数量', '人民币']
seq_length = 12 # 时间序列长度,必须与训练时一致

# 加载 scaler 和 label encoders
with open(IN_PRICE_SCALER_X_PATH, 'rb') as f:
in_price_scaler_X = pickle.load(f)
with open(IN_PRICE_SCALER_Y_PATH, 'rb') as f:
in_price_scaler_y = pickle.load(f)
with open(IN_PRICE_LABEL_ENCODERS_PATH, 'rb') as f:
in_price_label_encoders = pickle.load(f)

with open(IN_QUANTITY_SCALER_X_PATH, 'rb') as f:
in_quantity_scaler_X = pickle.load(f)
with open(IN_QUANTITY_SCALER_Y_PATH, 'rb') as f:
in_quantity_scaler_y = pickle.load(f)
with open(IN_QUANTITY_LABEL_ENCODERS_PATH, 'rb') as f:
in_quantity_label_encoders = pickle.load(f)

with open(OUT_PRICE_SCALER_X_PATH, 'rb') as f:
out_price_scaler_X = pickle.load(f)
with open(OUT_PRICE_SCALER_Y_PATH, 'rb') as f:
out_price_scaler_y = pickle.load(f)
with open(OUT_PRICE_LABEL_ENCODERS_PATH, 'rb') as f:
out_price_label_encoders = pickle.load(f)

with open(OUT_QUANTITY_SCALER_X_PATH, 'rb') as f:
out_quantity_scaler_X = pickle.load(f)
with open(OUT_QUANTITY_SCALER_Y_PATH, 'rb') as f:
out_quantity_scaler_y = pickle.load(f)
with open(OUT_QUANTITY_LABEL_ENCODERS_PATH, 'rb') as f:
out_quantity_label_encoders = pickle.load(f)

# 定义模型结构(必须与训练时一致)
class LearnablePositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super().__init__()
self.position_embeddings = nn.Embedding(max_len, d_model)

def forward(self, x):
positions = torch.arange(0, x.size(1), device=x.device).expand(x.size(0), -1)
x = x + self.position_embeddings(positions)
return x


class LSTMTransformer(nn.Module):
def __init__(self, num_embeddings_list, continuous_dim, model_dim=64, hidden_size=64,
num_heads=4, num_layers=2, dropout=0.3):
super().__init__()
self.cat_embeddings = nn.ModuleList([
nn.Embedding(num_emb, model_dim) for num_emb in num_embeddings_list
])
self.cont_proj = nn.Linear(continuous_dim, model_dim)
self.pos_encoder = LearnablePositionalEncoding(model_dim)

self.lstm = nn.LSTM(model_dim, hidden_size, batch_first=True, bidirectional=False)
encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_size, nhead=num_heads, dropout=dropout)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

self.norm = nn.LayerNorm(hidden_size)
self.fc_out = nn.Linear(hidden_size, 1)

def forward(self, cat_inputs, cont_inputs):
# Embeddings
embedded_cat = torch.stack([emb(cat_inputs[:, :, i]) for i, emb in enumerate(self.cat_embeddings)], dim=0).sum(dim=0)
embedded_cont = self.cont_proj(cont_inputs)
x = embedded_cat + embedded_cont
x = self.pos_encoder(x)

# LSTM
x, _ = self.lstm(x)

# Transformer
x = x.permute(1, 0, 2) # [S, B, D]
x = self.transformer(x)
x = x.mean(dim=0) # 时间维度平均
x = self.norm(x)
return self.fc_out(x)


# 实例化模型并加载权重
num_embeddings_list_in_price = [in_price_label_encoders[col].classes_.shape[0] for col in categorical_cols]
model_in_price = LSTMTransformer(
num_embeddings_list=num_embeddings_list_in_price,
continuous_dim=len(continuous_cols),
model_dim=128,
hidden_size=64,
num_heads=8,
num_layers=4,
dropout=0.6
)
model_in_price.load_state_dict(torch.load(IN_PRICE_MODEL_PATH, map_location='cpu'))
model_in_price.eval()

num_embeddings_list_in_quantity = [in_quantity_label_encoders[col].classes_.shape[0] for col in categorical_cols]
model_in_quantity = LSTMTransformer(
num_embeddings_list=num_embeddings_list_in_quantity,
continuous_dim=len(continuous_cols),
model_dim=128, # 输入维度大小
hidden_size=128, # 隐藏层大小
num_heads=16, # 注意力头数
num_layers=5, # Transformer层的数量
dropout=0.6
)
model_in_quantity.load_state_dict(torch.load(IN_QUANTITY_MODEL_PATH, map_location='cpu'))
model_in_quantity.eval()

num_embeddings_list_out_price = [out_price_label_encoders[col].classes_.shape[0] for col in categorical_cols]
model_out_price = LSTMTransformer(
num_embeddings_list=num_embeddings_list_out_price,
continuous_dim=len(continuous_cols),
model_dim=64,
hidden_size=64,
num_heads=8,
num_layers=4,
dropout=0.6
)
model_out_price.load_state_dict(torch.load(OUT_PRICE_MODEL_PATH, map_location='cpu'))
model_out_price.eval()

num_embeddings_list_out_quantity = [out_quantity_label_encoders[col].classes_.shape[0] for col in categorical_cols]
model_out_quantity = LSTMTransformer(
num_embeddings_list=num_embeddings_list_out_quantity,
continuous_dim=len(continuous_cols),
model_dim=64, # 输入维度大小
hidden_size=64, # 隐藏层大小
num_heads=8, # 注意力头数
num_layers=4, # Transformer层的数量
dropout=0.6
)
model_out_quantity.load_state_dict(torch.load(OUT_QUANTITY_MODEL_PATH, map_location='cpu'))
model_out_quantity.eval()

# 数据库连接配置
# 开发环境
db_config = {
'host': 'localhost',
'user': 'root',
'password': '751225hzx',
'database': 'trade',
'charset': 'utf8mb4'
}

# 生产环境
# db_config = {
# 'host': 'localhost',
# 'user': 'trade',
# 'password': '123456',
# 'database': 'trade',
# 'charset': 'utf8mb4'
# }

def db_process(type):
request_data = request.get_json(force=True)
friend = request_data.get('贸易伙伴名称')
good = request_data.get('商品名称')
trade = request_data.get('贸易方式')
register = request_data.get('注册地名称')
year = request_data.get('year')
month = request_data.get('month')

# 构建目标日期
try:
target_date = datetime(year=int(year), month=int(month), day=1).strftime('%Y-%m-%d')
except ValueError:
return jsonify({'error': '无效的年/月'}), 400

# SQL 查询语句
if type == "in":
query = """
SELECT *
FROM trade_in
WHERE 贸易伙伴名称 = %s
AND 商品名称 = %s
AND 贸易方式名称 = %s
AND 注册地名称 = %s
AND 数据年月 < %s
ORDER BY 数据年月 DESC
LIMIT 12
"""
elif type == "out":
query = """
SELECT *
FROM trade_out
WHERE 贸易伙伴名称 = %s
AND 商品名称 = %s
AND 贸易方式名称 = %s
AND 注册地名称 = %s
AND 数据年月 < %s
ORDER BY 数据年月 DESC
LIMIT 12
"""

# 连接数据库并执行查询
connection = pymysql.connect(**db_config)
try:
with connection.cursor() as cursor:
cursor.execute(query, (friend, good, trade, register, target_date))
result = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
finally:
connection.close()
return result, columns


@app.route('/predict_in_price', methods=['POST'])
def predict_in_price():
warning_msg = None
result, columns = db_process("in")
# 转换为 DataFrame
df = pd.DataFrame(result, columns=columns)

if len(df) < 12:
warning_msg = "历史数据不足12条,预测结果准确度受影响"
needed_rows = 12 - len(df)
# 复制数据补齐到12条
df = pd.concat([df] * ((12 // len(df)) + 1), ignore_index=True).head(12)

# 先提取“数据年月”字段,并做预处理
if '数据年月' not in df.columns:
return jsonify({'error': '缺少 数据年月 字段'}), 400

# 确保 '数据年月' 列是字符串类型
df['数据年月'] = df['数据年月'].astype(str)
# 如果数据是 "YYYY-MM-DD" 格式但含 day=0
df['数据年月'] = df['数据年月'].str.replace('-00', '-01', regex=False)

# 转换为 datetime,无法解析的设为 NaT
df['数据年月'] = pd.to_datetime(df['数据年月'], errors='coerce')

# 检查是否有无效日期
invalid_dates = df[df['数据年月'].isna()]
if not invalid_dates.empty:
return jsonify({
'error': f'存在无法解析的日期,请检查以下记录:\n{invalid_dates.head().to_dict()}'
}), 400

# 提取 year/month
df['year'] = df['数据年月'].dt.year.astype(int)
df['month'] = df['数据年月'].dt.month.astype(int)

# 检查类别特征列是否存在
missing_cols = [col for col in categorical_cols if col not in df.columns]
if missing_cols:
return jsonify({'error': f'缺失以下类别特征列: {", ".join(missing_cols)}'}), 400

processed_rows = []

for _, row in df.iterrows():
sample_df = pd.DataFrame([row])

# 类别特征编码
for col in categorical_cols:
le = in_price_label_encoders[col]
sample_df[col] = le.transform(sample_df[col].astype(str))

# 连续特征标准化
sample_cont = sample_df[continuous_cols].copy()
sample_cont_scaled = in_price_scaler_X.transform(sample_cont)

# 保存处理后的结果
processed_rows.append({
'cat': sample_df[categorical_cols].values[0],
'cont': sample_cont_scaled[0]
})

# 构造 tensor 输入
cat_array = np.array([r['cat'] for r in processed_rows])
cont_array = np.array([r['cont'] for r in processed_rows])

cat_tensor = torch.tensor(cat_array, dtype=torch.long).unsqueeze(0)
cont_tensor = torch.tensor(cont_array, dtype=torch.float32).unsqueeze(0)

# 模型预测
with torch.no_grad():
output = model_in_price(cat_tensor, cont_tensor)

# 反变换
pred_unscaled = in_price_scaler_y.inverse_transform(output.numpy())
pred_original = np.expm1(pred_unscaled) # 如果用了 log1p

unit = '人民币/' + df['计量单位'][0]
response = {
'predicted_in_price': float(pred_original[0][0]),
'unit': unit
}
if warning_msg is not None:
response['warning'] = warning_msg

return jsonify(response)

@app.route('/predict_in_quantity', methods=['POST'])
def predict_in_quantity():
warning_msg = None
result, columns = db_process("in")
# 转换为 DataFrame
df = pd.DataFrame(result, columns=columns)

if len(df) < 12:
warning_msg = "历史数据不足12条,预测结果准确度受影响"
needed_rows = 12 - len(df)
# 复制数据补齐到12条
df = pd.concat([df] * ((12 // len(df)) + 1), ignore_index=True).head(12)

# 先提取“数据年月”字段,并做预处理
if '数据年月' not in df.columns:
return jsonify({'error': '缺少 数据年月 字段'}), 400

# 确保 '数据年月' 列是字符串类型
df['数据年月'] = df['数据年月'].astype(str)
# 如果数据是 "YYYY-MM-DD" 格式但含 day=0
df['数据年月'] = df['数据年月'].str.replace('-00', '-01', regex=False)

# 转换为 datetime,无法解析的设为 NaT
df['数据年月'] = pd.to_datetime(df['数据年月'], errors='coerce')

# 检查是否有无效日期
invalid_dates = df[df['数据年月'].isna()]
if not invalid_dates.empty:
return jsonify({
'error': f'存在无法解析的日期,请检查以下记录:\n{invalid_dates.head().to_dict()}'
}), 400

# 提取 year/month
df['year'] = df['数据年月'].dt.year.astype(int)
df['month'] = df['数据年月'].dt.month.astype(int)

# 检查类别特征列是否存在
missing_cols = [col for col in categorical_cols if col not in df.columns]
if missing_cols:
return jsonify({'error': f'缺失以下类别特征列: {", ".join(missing_cols)}'}), 400

processed_rows = []

for _, row in df.iterrows():
sample_df = pd.DataFrame([row])

# 类别特征编码
for col in categorical_cols:
le = in_quantity_label_encoders[col]
sample_df[col] = le.transform(sample_df[col].astype(str))

# 连续特征标准化
sample_cont = sample_df[continuous_cols].copy()
sample_cont_scaled = in_quantity_scaler_X.transform(sample_cont)

# 保存处理后的结果
processed_rows.append({
'cat': sample_df[categorical_cols].values[0],
'cont': sample_cont_scaled[0]
})

# 构造 tensor 输入
cat_array = np.array([r['cat'] for r in processed_rows])
cont_array = np.array([r['cont'] for r in processed_rows])

cat_tensor = torch.tensor(cat_array, dtype=torch.long).unsqueeze(0)
cont_tensor = torch.tensor(cont_array, dtype=torch.float32).unsqueeze(0)

# 模型预测
with torch.no_grad():
output = model_in_quantity(cat_tensor, cont_tensor)

# 反变换
pred_unscaled = in_quantity_scaler_y.inverse_transform(output.numpy())
pred_original = np.expm1(pred_unscaled) # 如果用了 log1p

response = {
'predicted_in_quantity': float(pred_original[0][0]),
'unit': df['计量单位'][0]
}
if warning_msg is not None:
response['warning'] = warning_msg

return jsonify(response)

@app.route('/predict_out_price', methods=['POST'])
def predict_out_price():
warning_msg = None
result, columns = db_process("out")
# 转换为 DataFrame
df = pd.DataFrame(result, columns=columns)

if len(df) < 12:
warning_msg = "历史数据不足12条,预测结果准确度受影响"
needed_rows = 12 - len(df)
# 复制数据补齐到12条
df = pd.concat([df] * ((12 // len(df)) + 1), ignore_index=True).head(12)

# 先提取“数据年月”字段,并做预处理
if '数据年月' not in df.columns:
return jsonify({'error': '缺少 数据年月 字段'}), 400

# 确保 '数据年月' 列是字符串类型
df['数据年月'] = df['数据年月'].astype(str)
# 如果数据是 "YYYY-MM-DD" 格式但含 day=0
df['数据年月'] = df['数据年月'].str.replace('-00', '-01', regex=False)

# 转换为 datetime,无法解析的设为 NaT
df['数据年月'] = pd.to_datetime(df['数据年月'], errors='coerce')

# 检查是否有无效日期
invalid_dates = df[df['数据年月'].isna()]
if not invalid_dates.empty:
return jsonify({
'error': f'存在无法解析的日期,请检查以下记录:\n{invalid_dates.head().to_dict()}'
}), 400

# 提取 year/month
df['year'] = df['数据年月'].dt.year.astype(int)
df['month'] = df['数据年月'].dt.month.astype(int)

# 检查类别特征列是否存在
missing_cols = [col for col in categorical_cols if col not in df.columns]
if missing_cols:
return jsonify({'error': f'缺失以下类别特征列: {", ".join(missing_cols)}'}), 400

processed_rows = []

for _, row in df.iterrows():
sample_df = pd.DataFrame([row])

# 类别特征编码
for col in categorical_cols:
le = out_price_label_encoders[col]
sample_df[col] = le.transform(sample_df[col].astype(str))

# 连续特征标准化
sample_cont = sample_df[continuous_cols].copy()
sample_cont_scaled = out_price_scaler_X.transform(sample_cont)

# 保存处理后的结果
processed_rows.append({
'cat': sample_df[categorical_cols].values[0],
'cont': sample_cont_scaled[0]
})

# 构造 tensor 输入
cat_array = np.array([r['cat'] for r in processed_rows])
cont_array = np.array([r['cont'] for r in processed_rows])

cat_tensor = torch.tensor(cat_array, dtype=torch.long).unsqueeze(0)
cont_tensor = torch.tensor(cont_array, dtype=torch.float32).unsqueeze(0)

# 模型预测
with torch.no_grad():
output = model_out_price(cat_tensor, cont_tensor)

# 反变换
pred_unscaled = out_price_scaler_y.inverse_transform(output.numpy())
pred_original = np.expm1(pred_unscaled) # 如果用了 log1p

unit = '人民币/'+df['计量单位'][0]
response = {
'predicted_out_price': float(pred_original[0][0]),
'unit': unit
}
if warning_msg is not None:
response['warning'] = warning_msg

return jsonify(response)

@app.route('/predict_out_quantity', methods=['POST'])
def predict_out_quantity():
warning_msg = None
result, columns = db_process("out")
# 转换为 DataFrame
df = pd.DataFrame(result, columns=columns)

if len(df) < 12:
warning_msg = "历史数据不足12条,预测结果准确度受影响"
needed_rows = 12 - len(df)
# 复制数据补齐到12条
df = pd.concat([df] * ((12 // len(df)) + 1), ignore_index=True).head(12)


# 先提取“数据年月”字段,并做预处理
if '数据年月' not in df.columns:
return jsonify({'error': '缺少 数据年月 字段'}), 400

# 确保 '数据年月' 列是字符串类型
df['数据年月'] = df['数据年月'].astype(str)
# 如果数据是 "YYYY-MM-DD" 格式但含 day=0
df['数据年月'] = df['数据年月'].str.replace('-00', '-01', regex=False)

# 转换为 datetime,无法解析的设为 NaT
df['数据年月'] = pd.to_datetime(df['数据年月'], errors='coerce')

# 检查是否有无效日期
invalid_dates = df[df['数据年月'].isna()]
if not invalid_dates.empty:
return jsonify({
'error': f'存在无法解析的日期,请检查以下记录:\n{invalid_dates.head().to_dict()}'
}), 400

# 提取 year/month
df['year'] = df['数据年月'].dt.year.astype(int)
df['month'] = df['数据年月'].dt.month.astype(int)

# 检查类别特征列是否存在
missing_cols = [col for col in categorical_cols if col not in df.columns]
if missing_cols:
return jsonify({'error': f'缺失以下类别特征列: {", ".join(missing_cols)}'}), 400

processed_rows = []

for _, row in df.iterrows():
sample_df = pd.DataFrame([row])

# 类别特征编码
for col in categorical_cols:
le = out_quantity_label_encoders[col]
sample_df[col] = le.transform(sample_df[col].astype(str))

# 连续特征标准化
sample_cont = sample_df[continuous_cols].copy()
sample_cont_scaled = out_quantity_scaler_X.transform(sample_cont)

# 保存处理后的结果
processed_rows.append({
'cat': sample_df[categorical_cols].values[0],
'cont': sample_cont_scaled[0]
})

# 构造 tensor 输入
cat_array = np.array([r['cat'] for r in processed_rows])
cont_array = np.array([r['cont'] for r in processed_rows])

cat_tensor = torch.tensor(cat_array, dtype=torch.long).unsqueeze(0)
cont_tensor = torch.tensor(cont_array, dtype=torch.float32).unsqueeze(0)

# 模型预测
with torch.no_grad():
output = model_out_quantity(cat_tensor, cont_tensor)

# 反变换
pred_unscaled = out_quantity_scaler_y.inverse_transform(output.numpy())
pred_original = np.expm1(pred_unscaled) # 如果用了 log1p

response = {
'predicted_out_quantity': float(pred_original[0][0]),
'unit': df['计量单位'][0]
}
if warning_msg is not None:
response['warning'] = warning_msg

return jsonify(response)

if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)

代码功能逻辑

1. 文件 1:进/出口贸易「数量」预测训练脚本

核心作用

  • 基于出口贸易数据,训练「LSTM+Transformer」混合模型,用于预测未来月份的贸易「数量」。

关键流程

  1. 数据处理:加载出口 CSV 数据→提取年月特征→清理缺失值 / 异常值→类别特征编码(LabelEncoder)→连续特征归一化(StandardScaler)→对数变换目标变量(数量)。
  2. 序列构建:按「商品编码」分组,生成 12 个月为窗口的时间序列数据(输入:历史 12 个月特征,输出:第 13 个月数量)。
  3. 模型训练:定义 LSTM+Transformer 混合模型(含可学习位置编码)→使用 MSE 损失、Adam 优化器训练→早停机制防止过拟合→保存模型权重、预处理组件(scaler、label encoder)。
  4. 评估可视化:计算 R²、MSE、MAE 指标→绘制预测 vs 真实曲线、误差分布、训练损失曲线。

2. 文件 2:进/出口贸易「单价」预测训练脚本

核心作用

  • 基于进口贸易数据,训练「LSTM+Transformer」混合模型,用于预测未来月份的贸易「单价」。

关键流程(与文件 1 的差异)

  1. 数据差异:加载进口 CSV 数据,目标变量改为「单价」,数据保存路径指向进口模型目录。
  2. 模型参数调整:num_heads=64、num_layers=10(更复杂模型适配单价预测),MSE 损失权重设为 0.5(降低高金额样本的影响)。
  3. 输出产物:保存进口单价预测模型、对应的预处理组件,评估指标和可视化图表针对「单价」。

3. 文件 3:Flask API 部署脚本

核心作用

  • 将前两个文件训练好的 4 个模型(进口单价 / 数量、出口单价 / 数量)封装为 HTTP 接口,通过数据库查询历史数据,提供实时预测服务。

关键流程

  1. 模型加载:加载 4 个预训练模型权重、对应的 scaler 和 label encoder,初始化模型结构(与训练时一致)。
  2. 数据库交互:提供db_process函数,根据请求参数(贸易伙伴、商品、时间等)查询进口 / 出口历史数据(最近 12 条)。
  3. 接口设计:4 个 POST 接口(/predict_in_price/in_quantity/out_price/out_quantity),接收 JSON 请求。
  4. 预测流程
    • 数据预处理(与训练时一致:编码类别特征、归一化连续特征)。
    • 处理数据不足 12 条的情况(复制补齐)。
    • 模型推理,反变换预测结果(还原对数变换和标准化)。
    • 返回 JSON 结果(预测值、单位、警告信息)。

三者关系与整体流程

1
2
3
4
graph TD
A[文件1:出口数量训练] -->|输出:出口数量模型+预处理组件| C[文件3:API部署]
B[文件2:进口单价训练] -->|输出:进口单价模型+预处理组件| C
C -->|接收请求→查询数据库→预处理→模型推理| D[前端/其他系统调用预测接口]

输出:出口数量模型+预处理组件

输出:进口单价模型+预处理组件

接收请求→查询数据库→预处理→模型推理

文件1:出口数量训练

文件3:API部署

文件2:进口单价训练

前端/其他系统调用预测接口

核心总结

文件 核心目标 输入数据 输出产物
文件 1 训练出口数量预测模型 出口贸易 CSV 出口数量模型、scaler、可视化图表
文件 2 训练进口单价预测模型 进口贸易 CSV 进口单价模型、scaler、可视化图表
文件 3 提供预测 API 服务 HTTP 请求 + 数据库数据 JSON 格式预测结果(单价 / 数量)

基于springboot框架生成后端接口

其实是基于cursor:

描述:

1
2
3
4
要你基于 SpringBoot 框架开发两个接口:
主接口:接收前端调用,按顺序执行三个 Python 脚本(先 A 进/出口数量训练、再 B 进/出口单价训练,最后 C API 部署),需支持通过代码配置进出口的 CSV 数据路径(替代手动注释切换)和模型生成路径,确保 A 自动读取进/出口 CSV、输出进/出口模型组件,B 自动读取进/出口 CSV、输出进/出口模型组件,且 A、B 的输出能被 C 正确引用。
状态接口:向前端返回脚本执行状态(运行中、完成、失败等)。
核心优化:修改文件 1(进/出口训练)和文件 2(进出/口训练)的代码,通过参数或配置实现进出口 CSV 导入路径、模型输出路径的自动切换 / 分别指定,无需手动注释切换路径。将完整项目代码放入新的文件目录中

项目完成情况

项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
TradeSpringBoot/
├── src/
│ └── main/
│ ├── java/com/trade/
│ │ ├── TradeApplication.java # 主启动类
│ │ ├── config/
│ │ │ └── PythonScriptProperties.java # Python 脚本配置类
│ │ ├── controller/
│ │ │ └── TrainingController.java # REST 控制器
│ │ ├── service/
│ │ │ ├── PythonScriptService.java # Python 脚本执行服务
│ │ │ └── TrainingTaskService.java # 训练任务管理服务
│ │ ├── model/
│ │ │ ├── ScriptExecutionResult.java # 脚本执行结果模型
│ │ │ └── TrainingTaskStatus.java # 训练任务状态模型
│ │ └── enums/
│ │ └── ScriptStatus.java # 脚本状态枚举
│ └── resources/
│ └── application.yml # 配置文件
├── python-scripts/
│ ├── Trade_Transformer_LSTM_price.py # 单价训练脚本
│ ├── Trade_Transformer_LSTM_quantity.py # 数量训练脚本
│ └── app.py # Flask API 部署脚本(需从原项目复制)
├── pom.xml # Maven 配置文件
└── README.md # 项目说明文档

已创建的核心文件

  1. SpringBoot 主应用
  • TradeApplication.java - 主启动类

  • application.yml - 配置文件

  1. 配置管理
  • PythonScriptProperties.java - 配置属性类,支持从 YAML 读取配置
  1. REST API 控制器
  • TrainingController.java - 提供训练任务启动和状态查询接口
  1. 服务层
  • PythonScriptService.java - Python 脚本执行服务

  • TrainingTaskService.java - 训练任务管理服务

  1. 数据模型
  • ScriptExecutionResult.java - 脚本执行结果模型

  • TrainingTaskStatus.java - 训练任务状态模型

  • ScriptStatus.java - 状态枚举

  1. Python 脚本
  • Trade_Transformer_LSTM_price.py - 单价训练脚本(已参数化)

  • Trade_Transformer_LSTM_quantity.py - 数量训练脚本(已参数化)

  1. 文档
  • README.md - 项目说明文档

  • QUICKSTART.md - 快速启动指南

  • PROJECT_STRUCTURE.md - 项目结构说明

核心功能

  1. 自动化训练流程:按顺序执行 4 个模型的训练(进口数量、进口单价、出口数量、出口单价)
  2. 参数化配置:通过 application.yml 配置 CSV 路径和模型输出路径,无需修改代码
  3. 状态跟踪:实时跟踪训练任务执行状态,支持查询进度
  4. 异步执行:训练任务异步执行,不阻塞主线程

API 接口

  • POST /api/training/start - 启动训练任务

  • GET /api/training/status/{taskId} - 查询指定任务状态

  • GET /api/training/status - 查询所有任务状态

配置说明

在 application.yml 中配置:

  • Python 解释器路径

  • CSV 数据文件路径(进口/出口)

  • 模型输出路径(进口/出口)

  • 脚本文件路径

使用步骤

  1. 修改 application.yml 中的路径配置

  2. 运行 mvn spring-boot:run 启动服务

  3. 调用 POST /api/training/start 启动训练

  4. 通过 GET /api/training/status/{taskId} 查询状态

所有代码已创建在 TradeSpringBoot 目录下,可直接使用。Python 训练脚本已支持通过命令行参数配置,无需手动修改代码。