from typing import List, Optional
import torch
import torch.nn.functional as F
from torch.functional import Tensor
from torch.nn import Linear, Dropout, Sequential, ModuleList
from torch_geometric.utils import to_dense_batch, unbatch, degree
from ..utils.utility import setup_linear_nn, setup_conv_layers, setup_LRL_nn, setup_cnn_layers, setup_maxpool_layers
from ..utils.constants import ACTIVATION_LAYERS
[docs]class GraphSim(torch.nn.Module):
r"""
End to end implementation of GraphSim from the `"Learning-based Efficient Graph Similarity Computation via Multi-Scale
Convolutional Set Matching" <https://arxiv.org/abs/1809.04440>`_ paper.
NOTE: Model assumes that node features of input graph data are arranged according to Breadth-First Search of the graph
TODO: Provide description of implementation and differences from paper if any
Args:
input_dim (int): Input dimension of node feature vectors.
gnn (str, optional): Type of Graph Neural Network to use to embed the node features (:obj:`"Neuro-PNA"` or
:obj:`"PNA"` or :obj:`"GCN"` or :obj:`"GAT"`or :obj:`"SAGE"` or :obj:`"GIN"`
or :obj:`"graph"` or :obj:`"gated"`).
(default: :obj:`'GCN'`)
gnn_filters ([int], optional): Number of hidden neurons in each layer of
the GNN for embedding input node features.
(default: :obj:`[64,32,16]`)
conv_filters (torch.nn.ModuleList, optional): List of Convolution Filters
to be applied to each similarity matrix generated from each GNN pass.
(default: :obj:`None`)
mlp_neurons ([int], optional): Number of hidden neurons in each layer of decoder MLP
(default: :obj:`[32,16,8,4,1]`)
padding_correction (bool, optional): Flag whether to include padding correction as specified in the paper
which is voided due to batching of graphs
(default: :obj:`True`)
resize_dim (int, optional): Dimension to resize the similarity image matrices to.
(default: :obj:`10`)
resize_mode (str, optional): Interpolation method to resize the similarity images
(:obj:`nearest'` | :obj:`'linear'` | :obj:`'bilinear'` | :obj:`'bicubic'` | :obj:`'trilinear'` |
:obj:`'area'` | :obj:`'nearest-exact'`).
(default: :obj:`'bilinear'`)
gnn_activation (str, optional): Activation to be used in the GNN layers
(default: :obj:`relu`)
mlp_activation (str, optional): Activation to be used in the MLP decoder layers
(default: :obj:`relu`)
activation_slope (int, optional): Slope of negative part in case of :obj:`"leaky_relu"` activation
(default: :obj:`0.1`)
"""
def __init__(self, input_dim: int, gnn: str = "GCN", gnn_filters: List[int] = [64, 32, 16], conv_filters: ModuleList = None,
mlp_neurons: List[int] = [32,16,8,4,1], padding_correction: bool = True, resize_dim: int = 10,
resize_mode = "bilinear", gnn_activation: str = "relu", mlp_activation: str = "relu", gnn_dropout_p: float = 0.5,
activation_slope: Optional[float] = 0.1):
super(GraphSim, self).__init__()
# GNN Arguments
self.input_dim = input_dim
self.gnn_type = gnn
self.gnn_filters = gnn_filters
self.gnn_activation = gnn_activation
self.gnn_dropout_p = gnn_dropout_p
# Similarity Matrix Arguments
self.padding_correction = padding_correction
self.sim_mat_dim = resize_dim
self.resize_mode = resize_mode
# Convolution Layer
# XXX: Should users be allowed to pass torch.nn.Sequential layers for Conv directly?
# XXX: Do we need to make additional Image Conv setup utility methods?
self.conv_filters = conv_filters
# MLP Layer which takes Convolution Output as Input
self.mlp_neurons = mlp_neurons
self.mlp_activation = mlp_activation
self.setup_layers()
# self.reset_parameters()
def setup_layers(self):
# GCN Layers
self.gnn_layers = setup_conv_layers(self.input_dim, self.gnn_type, filters=self.gnn_filters)
# Fully Connected Layer
W = torch.randn(2, 1, self.sim_mat_dim, self.sim_mat_dim) # Dummy Matrix
W = self.conv_filters(W).view(2,-1)
self.mlp = setup_LRL_nn(input_dim=W.shape[1], hidden_sizes=self.mlp_neurons, activation=self.mlp_activation)
del W
# Scoring Layer to get the final Graph Similarity between (0,1)
self.scoring_layer = setup_linear_nn(input_dim=self.mlp_neurons[-1], hidden_sizes=[1,])
def reset_parameters(self):
for gnn_layer in self.gnn_layers:
gnn_layer.reset_parameter()
# TODO: Test correctness
for filter in self.conv_filters:
filter.reset_parameters()
self.mlp.reset_parameters()
for layer in self.scoring_layer:
layer.reset_parameters()
def forward(self, x_i: Tensor, x_j: Tensor, edge_index_i: Tensor, edge_index_j: Tensor, batch_i:Tensor, batch_j:Tensor):
"""
Forward pass with graphs.
:param x_i (Tensor): A (N_1+N_2+...+N_B, D) tensor containing 'i' Graphs Features.
:param x_j (Tensor): A (N_1+N_2+...+N_B, D) tensor containing 'j' Graphs Features
:param edge_index_i (Tensor) : A (2, num_edges) tensor containing edges of Graphs in 'i'
:param edge_index_j (Tensor) : A (2, num_edges) tensor containing edges of Graphs in 'j'
:param batch_i (Tensor) : A (B,) tensor containing information of the graph each node belongs to
:param batch_j (Tensor) : A (B,) tensor containing information of the graph each node belongs to
:return score (Tensor): Similarity score.
"""
# Tensor of number of nodes in each graph
N_i, N_j = degree(batch_i), degree(batch_j) # Size (B,)
N_i_j = torch.maximum(N_i, N_j) # (B,)
B = batch_i.shape[0]
# Converting Input Nodes to Similarity Matrices
sim_matrix_list = []
gnn_activation = ACTIVATION_LAYERS[self.gnn_activation]
for layer_num, gnn_layer in enumerate(self.gnn_layers):
# Pass through GNN
x_i = gnn_layer(x_i)
x_j = gnn_layer(x_j)
if layer_num != len(self.gnn_layers)-1:
x_i = gnn_activation(x_i) # Default is a ReLU activation
x_i = Dropout(x_i, p=self.gnn_dropout_p, training=self.training)
x_j = gnn_activation(x_j)
x_j = Dropout(x_j, p=self.gnn_dropout_p, training=self.training)
# Generate Similarity Matrix after (layer_num + 1)th GNN Embedding Pass
h_i, mask_i = to_dense_batch(x_i, batch_i) # (B, N_max, D), {0,1}^(B, N_max) - 1 if true node, 0 if padded
h_j, mask_j = to_dense_batch(x_j, batch_j) # (B, N_max, D), {0,1}^(B, N_max) - 1 if true node, 0 if padded
sim_matrix = torch.matmul(h_i, h_j.permute(0,2,1)) # (B, N_max_i, D) * (B, D, N_max_j) -> (B, N_max_i, N_max_j)
# XXX: Can we just collect Similarity Matrices in this pass and perform other operations outside this loop?
# Correcting Similarity Matrix Size as per Paper's Specifications
if self.padding_correction:
N_max_batch_i, N_max_batch_j = sim_matrix.shape[0], sim_matrix.shape[1]
pads_i, pads_j = N_i_j - N_max_batch_i, N_i_j - N_max_batch_j
repadded_sim_matrices = list(map(lambda x, pad_i, pad_j: F.pad(x,(0,pad_i,0,pad_j)),
list(sim_matrix), pads_i, pads_j))
resized_sim_matrices = list(map(lambda x: F.interpolate(x.unsqueeze(0), size=self.sim_mat_dim,
mode=self.resize_mode).squeeze(0), repadded_sim_matrices))
batched_resized_sim_matrices = torch.stack(resized_sim_matrices)
else:
batched_resized_sim_matrices = F.interpolate(sim_matrix, size=self.sim_mat_dim, mode=self.resize_mode)
sim_matrix_list.append(batched_resized_sim_matrices) # [(B, N_reduced, N_reduced)]
# Passing similarity images through Conv2d and MLP to get similarity score
# sim_matrix_batch = torch.stack(sim_matrix_list, dim=-1) # (B, N_reduced, N_reduced, N_gnn_layers)
# XXX: Can we use Group Convolutions instead of Looping over Convolved Multi-Scale Sim Matrices
#sim_matrix_img_batch = sim_matrix_batch.permute(0,3,1,2)
image_embedding_list = list(map(lambda x, conv_layer: conv_layer(x.unsqueeze(0)).squeeze(0),
sim_matrix_list, self.conv_filters)) # [(C,H,W),]
similarity_scores = torch.stack(image_embedding_list).view(B,-1) # (B, C*H*W)
# Passing Input to MLP
similarity_scores = self.mlp(similarity_scores)
similarity_scores = self.scoring_layer(similarity_scores)
similarity_scores = torch.nn.Sigmoid(similarity_scores)
return similarity_scores.view(-1)
[docs]class GraphSim_v2(torch.nn.Module):
r"""
A more efficient implementation of GraphSim from the `"Learning-based Efficient Graph Similarity Computation via Multi-Scale
Convolutional Set Matching" <https://arxiv.org/abs/1809.04440>`_ paper.
Uses the grouped convolution layer in :object:`PyTorch`to speed up the embedding of heirarchical similarity
image matrices by parallelizing computations. Prefer using this variant over version 1 if the convolution
network architecture is the same for all similarity image matrices.
TODO: Provide description of implementation and differences from paper if any and update argument description
Args:
input_dim (int): Input dimension of node feature vectors.
gnn (str, optional): Type of Graph Neural Network to use to embed the node features (:obj:`"Neuro-PNA"` or
:obj:`"PNA"` or :obj:`"GCN"` or :obj:`"GAT"`or :obj:`"SAGE"` or :obj:`"GIN"`
or :obj:`"graph"` or :obj:`"gated"`).
(default: :obj:`'GCN'`)
gnn_filters ([int], optional): Number of hidden neurons in each layer of
the GNN for embedding input node features.
(default: :obj:`[64,32,16]`)
conv_filters (torch.nn.ModuleList, optional): List of Convolution Filters
to be applied to each similarity matrix generated from each GNN pass.
(default: :obj:`None`)
mlp_neurons ([int], optional): Number of hidden neurons in each layer of decoder MLP
(default: :obj:`[32,16,8,4,1]`)
padding_correction (bool, optional): Flag whether to include padding correction as specified in the paper
which is voided due to batching of graphs
(default: :obj:`True`)
resize_dim (int, optional): Dimension to resize the similarity image matrices to.
(default: :obj:`10`)
resize_mode (str, optional): Interpolation method to resize the similarity images
(:obj:`nearest'` | :obj:`'linear'` | :obj:`'bilinear'` | :obj:`'bicubic'` | :obj:`'trilinear'` |
:obj:`'area'` | :obj:`'nearest-exact'`).
(default: :obj:`'bilinear'`)
gnn_activation (str, optional): Activation to be used in the GNN layers
(default: :obj:`relu`)
mlp_activation (str, optional): Activation to be used in the MLP decoder layers
(default: :obj:`relu`)
activation_slope (int, optional): Slope of negative part in case of :obj:`"leaky_relu"` activation
(default: :obj:`0.1`)
"""
def __init__(self, input_dim: int, conv_kernel_sizes, conv_in_channels, conv_out_channels,
conv_stride, maxpool_kernel_sizes, maxpool_stride, cnn_dropout_p = 0.2,
gnn: str = "GCN", gnn_filters: List[int] = [64, 32, 16],
mlp_neurons: List[int] = [32,16,8,4,1], padding_correction: bool = True, resize_dim: int = 10,
resize_mode = "bilinear", gnn_activation: str = "relu", mlp_activation: str = "relu", gnn_dropout_p: float = 0.5,
activation_slope: Optional[float] = 0.1):
super(GraphSim, self).__init__()
# GNN Arguments
self.input_dim = input_dim
self.gnn_type = gnn
self.gnn_filters = gnn_filters
self.gnn_activation = gnn_activation
self.gnn_dropout_p = gnn_dropout_p
# Similarity Matrix Arguments
self.padding_correction = padding_correction
self.sim_mat_dim = resize_dim
self.resize_mode = resize_mode
# Convolution Layer
# XXX: Should users be allowed to pass torch.nn.Sequential layers for Conv directly?
# XXX: Do we need to make additional Image Conv setup utility methods?
self.conv_kernel_sizes = conv_kernel_sizes
self.conv_in_channels = conv_in_channels
self.conv_out_channels = conv_out_channels
self.conv_stride = conv_stride
self.maxpool_kernel_sizes = maxpool_kernel_sizes
self.maxpool_stride = maxpool_stride
self.cnn_dropout_p = cnn_dropout_p
# MLP Layer which takes Convolution Output as Input
self.mlp_neurons = mlp_neurons
self.mlp_activation = mlp_activation
self.setup_layers()
# self.reset_parameters()
def setup_layers(self):
# GCN Layers
self.gnn_layers = setup_conv_layers(self.input_dim, self.gnn_type, filters=self.gnn_filters)
# CNN Layer
self.cnn_layers = setup_cnn_layers(self.conv_kernel_sizes, self.conv_in_channels, self.conv_out_channels,
self.conv_stride, groups=len(self.gnn_layers))
self.maxpool_layers = setup_maxpool_layers(self.maxpool_kernel_sizes, self.maxpool_stride)
# Fully Connected Layer
W = torch.randn(2, 1, self.sim_mat_dim, self.sim_mat_dim) # Dummy Matrix
W = self.conv_filters(W).view(2,-1)
self.mlp = setup_LRL_nn(input_dim=W.shape[1], hidden_sizes=self.mlp_neurons, activation=self.mlp_activation)
del W
# Scoring Layer to get the final Graph Similarity between (0,1)
self.scoring_layer = setup_linear_nn(input_dim=self.mlp_neurons[-1], hidden_sizes=[1,])
def reset_parameters(self):
for gnn_layer in self.gnn_layers:
gnn_layer.reset_parameter()
# TODO: Test correctness
for filter in self.conv_filters:
filter.reset_parameters()
self.mlp.reset_parameters()
for layer in self.scoring_layer:
layer.reset_parameters()
def forward(self, x_i: Tensor, x_j: Tensor, edge_index_i: Tensor, edge_index_j: Tensor, batch_i:Tensor, batch_j:Tensor):
"""
Forward pass with graphs.
:param x_i (Tensor): A (N_1+N_2+...+N_B, D) tensor containing 'i' Graphs Features.
:param x_j (Tensor): A (N_1+N_2+...+N_B, D) tensor containing 'j' Graphs Features
:param edge_index_i (Tensor) : A (2, num_edges) tensor containing edges of Graphs in 'i'
:param edge_index_j (Tensor) : A (2, num_edges) tensor containing edges of Graphs in 'j'
:param batch_i (Tensor) : A (B,) tensor containing information of the graph each node belongs to
:param batch_j (Tensor) : A (B,) tensor containing information of the graph each node belongs to
:return score (Tensor): Similarity score.
"""
# Tensor of number of nodes in each graph
N_i, N_j = degree(batch_i), degree(batch_j) # Size (B,)
N_i_j = torch.maximum(N_i, N_j) # (B,)
B = batch_i.shape[0]
# Converting Input Nodes to Similarity Matrices
sim_matrix_list = []
gnn_activation = ACTIVATION_LAYERS[self.gnn_activation]
for layer_num, gnn_layer in enumerate(self.gnn_layers):
# Pass through GNN
x_i = gnn_layer(x_i)
x_j = gnn_layer(x_j)
if layer_num != len(self.gnn_layers)-1:
x_i = gnn_activation(x_i) # Default is a ReLU activation
x_i = Dropout(x_i, p=self.gnn_dropout_p, training=self.training)
x_j = gnn_activation(x_j)
x_j = Dropout(x_j, p=self.gnn_dropout_p, training=self.training)
# Generate Similarity Matrix after (layer_num + 1)th GNN Embedding Pass
h_i, mask_i = to_dense_batch(x_i, batch_i) # (B, N_max, D), {0,1}^(B, N_max) - 1 if true node, 0 if padded
h_j, mask_j = to_dense_batch(x_j, batch_j) # (B, N_max, D), {0,1}^(B, N_max) - 1 if true node, 0 if padded
sim_matrix = torch.matmul(h_i, h_j.permute(0,2,1)) # (B, N_max_i, D) * (B, D, N_max_j) -> (B, N_max_i, N_max_j)
# XXX: Can we just collect Similarity Matrices in this pass and perform other operations outside this loop?
# Correcting Similarity Matrix Size as per Paper's Specifications
if self.padding_correction:
N_max_batch_i, N_max_batch_j = sim_matrix.shape[0], sim_matrix.shape[1]
pads_i, pads_j = N_i_j - N_max_batch_i, N_i_j - N_max_batch_j
repadded_sim_matrices = list(map(lambda x, pad_i, pad_j: F.pad(x,(0,pad_i,0,pad_j)),
list(sim_matrix), pads_i, pads_j))
resized_sim_matrices = list(map(lambda x: F.interpolate(x.unsqueeze(0), size=self.sim_mat_dim,
mode=self.resize_mode).squeeze(0), repadded_sim_matrices))
batched_resized_sim_matrices = torch.stack(resized_sim_matrices)
else:
batched_resized_sim_matrices = F.interpolate(sim_matrix, size=self.sim_mat_dim, mode=self.resize_mode)
sim_matrix_list.append(batched_resized_sim_matrices) # [(B, N_reduced, N_reduced)]
# Passing similarity images through Conv2d to get heirarchical image features
sim_matrix_batch = torch.stack(sim_matrix_list, dim=1) # (B, N_gnn_layers, N_reduced, N_reduced)
for i in range(len(self.cnn_layers)):
sim_matrix_batch = self.cnn_layers[i](sim_matrix_batch)
sim_matrix_batch = torch.nn.functional.relu(sim_matrix_batch)
sim_matrix_batch = self.maxpool_layers[i](sim_matrix_batch)
sim_matrix_batch = torch.nn.functional.dropout(sim_matrix_batch, p=self.cnn_dropout_p)
# Flattening the image features into one vector per graph batch
# (B, N_gnn_layers, N_gnn_layers*N_out, N_gnn_layers*N_out) -> (B, N_gnn_layers^3 * N_out^2)
similarity_scores = sim_matrix_batch.view(B,-1)
# Passing Input to MLP
similarity_scores = self.mlp(similarity_scores)
similarity_scores = self.scoring_layer(similarity_scores)
similarity_scores = torch.nn.Sigmoid(similarity_scores)
return similarity_scores.view(-1)