\u200E

• 项目地址

https://aistudio.baidu.com/aistudio/projectdetail/5663515

U-Net 网络模型

```

as nn

as F

import weight_norm

# 创建基础卷积层

def create_layer(in_channels, out_channels, kernel_size, wn=True, bn=True,                 activation=nn.ReLU, convolution=nn.Conv2D):

assert kernel_size %
2 ==
1
layer = [ ]
conv = convolution(in_channels, out_channels, kernel_size, padding=kernel_size //
2)

if wn:
conv = weight_norm(conv)
layer.append(conv)

if activation
is
not
None:
layer.append(activation())

if bn:
layer.append(nn.BatchNorm2D(out_channels))

return nn.Sequential(*layer)

# 创建Encoder中的单个块

def create_encoder_block(in_channels, out_channels, kernel_size, wn=True, bn=True,                         activation=nn.ReLU, layers=2):
encoder = [ ]

for i
in range(layers):
_in = out_channels
_out = out_channels

if i ==
0:
_in = in_channels
encoder.append(create_layer(_in, _out, kernel_size, wn, bn, activation, nn.Conv2D))

return nn.Sequential(*encoder)

# 创建Decoder中的单个块

def create_decoder_block(in_channels, out_channels, kernel_size, wn=True, bn=True,                         activation=nn.ReLU, layers=2, final_layer=False):
decoder = [ ]

for i
in range(layers):
_in = in_channels
_out = in_channels
_bn = bn
_activation = activation

if i ==
0:
_in = in_channels *
2

if i == layers -
1:
_out = out_channels

if final_layer:
_bn =
False
_activation =
None
decoder.append(create_layer(_in, _out, kernel_size, wn, _bn, _activation, nn.Conv2DTranspose))

return nn.Sequential(*decoder)

# 创建Encoder

def create_encoder(in_channels, filters, kernel_size, wn=True, bn=True, activation=nn.ReLU, layers=2):
encoder = [ ]

for i
in range(len(filters)):

if i ==
0:
encoder_layer = create_encoder_block(in_channels, filters[i], kernel_size, wn, bn, activation, layers)

else:
encoder_layer = create_encoder_block(filters[i -
1], filters[i], kernel_size, wn, bn, activation, layers)
encoder = encoder + [encoder_layer]

return nn.Sequential(*encoder)

# 创建Decoder

def create_decoder(out_channels, filters, kernel_size, wn=True, bn=True, activation=nn.ReLU, layers=2):
decoder = []

for i
in range(len(filters)):

if i ==
0:
decoder_layer = create_decoder_block(filters[i], out_channels, kernel_size, wn, bn, activation, layers,
final_layer=
True)

else:
decoder_layer = create_decoder_block(filters[i], filters[i -
1], kernel_size, wn, bn, activation, layers,
final_layer=
False)
decoder = [decoder_layer] + decoder

return nn.Sequential(*decoder)

# 创建网络

class UNetEx(nn.Layer):

def __init__(self, in_channels, out_channels, kernel_size=3, filters=[16, 32, 64], layers=3,                 weight_norm=True, batch_norm=True, activation=nn.ReLU, final_activation=None):
super().__init__()

assert len(filters) >
0
self.final_activation = final_activation
self.encoder = create_encoder(in_channels, filters, kernel_size, weight_norm, batch_norm, activation, layers)
decoders = [ ]

# for i in range(out_channels):
decoders.append(create_decoder(out_channels, filters, kernel_size, weight_norm, batch_norm, activation, layers))
self.decoders = nn.Sequential(*decoders)

def encode(self, x):
tensors = [ ]
indices = [ ]
sizes = [ ]

for encoder
in self.encoder:
x = encoder(x)
sizes.append(x.shape)
tensors.append(x)
x, ind = F.max_pool2d(x,
2,
True)
indices.append(ind)

return x, tensors, indices, sizes

def decode(self, _x, _tensors, _indices, _sizes):
y = [ ]

for _decoder
in self.decoders:
x = _x
tensors = _tensors[:]
indices = _indices[:]
sizes = _sizes[:]

for decoder
in _decoder:
tensor = tensors.pop()
size = sizes.pop()
ind = indices.pop()

# 反池化操作，为上采样
x = F.max_unpool2d(x, ind,
2,
2, output_size=size)
1)
x = decoder(x)
y.append(x)

1)

def forward(self, x):
x, tensors, indices, sizes = self.encode(x)
x = self.decode(x, tensors, indices, sizes)

if self.final_activation
is
not
None:
x = self.final_activation(x)

return x

```

```

# 训练方法

def train(model, train_dataset, criterion, optimizer, device, num_epochs):
loss_history = [ ]
epoch_loss =
0

# 遍历批次

for epoch
in range(num_epochs):

for batch_id
in range(len(train_dataset)
-1):
inputs = train_dataset[batch_id]
outputs_true = train_dataset[batch_id+
1]

inputs = T.ToTensor()(inputs)
0)
outputs_true = T.ToTensor()(outputs_true)
0)

# 训练
outputs = model(inputs)

# 计算损失值
loss = criterion(outputs, outputs_true)

if batch_id %
10 ==
0:
print(
'epoch:',epoch,
'batch_id:',batch_id,
'loss:',loss.numpy())
loss.backward()

epoch_loss += loss.item()
optimizer.step()
epoch_loss /= len(train_dataset)

loss_history.append(epoch_loss)
print(
"Epoch [{}/{}], Loss: {:.8f}".format(epoch +
1, num_epochs, loss.numpy()[
0]))

# 保存模型

if epoch %
10 ==
0:
save_model(model,
'/home/aistudio/pollution_model.pdparams')

print(
"Training complete.")

return loss_history

```

```

def test():

# 初始化结果列表
results = [ ]

# 测试集合起始点
inputs = test_dataset[
0]
inputs = T.ToTensor()(inputs)
0)

# 是否supervise
flag_supervise =
True

'gpu'
else
'cpu')

# 加载模型
model = UNetEx(
3,
3,
3)
'/home/aistudio/pollution_model.pdparams',device)

for num
in range(
1,
10):

# 进行预测
outputs = model(inputs)

outputs_np = outputs.numpy()
outputs_np = np.squeeze(outputs_np, axis=
0)
# 去除第一个维度（batch_size）
outputs_np = np.transpose(outputs_np, (
1,
2,
0))
# 将通道维度调整为最后一个维度
outputs_np = (
255 * np.clip(outputs_np,
0,
1)).astype(
'uint8')

#outputs_np = outputs_np.transpose([1, 2, 0])

#outputs_np_uint8 = (outputs_np * 255).astype(np.uint8)

# 将预测结果添加到结果列表中
results.append(outputs_np)

if flag_supervise ==
False:

# 将预测结果作为下一帧的输入
inputs = outputs

else:

# 使用真实数据预测
inputs = test_dataset[num+
1]
inputs = T.ToTensor()(inputs)
0)

return results

results = test()

```

• 基于前一时刻的污染物浓度云图，预测后十个时刻、二十个时刻，四十个时刻的污染物浓度云图；

• 尝试用多时刻预测多时刻。

• 尝试引入更先进的网络架构，如 transformer；
• 对于网络层数和每层网络的神经元个数，尝试进行敏感性分析和误差分析；
• 尝试引入更多种类的激活函数如 tanh，silu 等；
• 尝试对 learning rate、batch size 等超参数进行调整实验。

• 尝试引入物理先验知识，对建筑、边界位置施加 loss 软约束；
• 尝试利用流体 NS 方程对模型进行修正。

• 尝试引入更多参数作为输入：如污染源位置、污染源初始浓度等提高模型的适应能力；
• 增加模型参数量级，探索大模型对复杂多态问题的处理能力；
• 尝试和传统流体求解方法进行融合。

• 飞桨 AI for Science 共创计划