Custom Blocks

This example demonstrates how to create custom neural network blocks and integrate them into the PyNAS architecture search framework. You’ll learn to extend the existing block vocabulary and implement domain-specific architectural components.

Overview

PyNAS provides a flexible framework for defining custom blocks that can be used in architecture search. This includes:

  • Custom Convolution Blocks: Specialized convolutions for specific tasks

  • Attention Mechanisms: Self-attention and cross-attention blocks

  • Domain-Specific Modules: Blocks tailored for specific applications

  • Block Integration: Adding custom blocks to the search space

Creating Custom Convolution Blocks

Basic Custom Block Structure

import torch
import torch.nn as nn
import torch.nn.functional as F
from pynas.blocks.activations import ReLU
from pynas.blocks.convolutions import ConvBnAct

class SeparableConvBlock(nn.Module):
    """
    Depthwise separable convolution block.

    This block implements depthwise separable convolution which is more
    parameter-efficient than standard convolution.

    Args:
        in_channels (int): Number of input channels
        out_channels (int): Number of output channels
        kernel_size (int): Kernel size for depthwise convolution
        stride (int): Stride for convolution
        padding (int): Padding for convolution
        activation (nn.Module): Activation function class
    """

    def __init__(self, in_channels, out_channels, kernel_size=3,
                 stride=1, padding=1, activation=ReLU):
        super(SeparableConvBlock, self).__init__()

        # Depthwise convolution
        self.depthwise = nn.Conv2d(
            in_channels, in_channels, kernel_size=kernel_size,
            stride=stride, padding=padding, groups=in_channels, bias=False
        )
        self.bn1 = nn.BatchNorm2d(in_channels)

        # Pointwise convolution
        self.pointwise = nn.Conv2d(
            in_channels, out_channels, kernel_size=1, bias=False
        )
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.activation = activation()

    def forward(self, x):
        """Forward pass through separable convolution."""
        # Depthwise
        x = self.depthwise(x)
        x = self.bn1(x)
        x = self.activation(x)

        # Pointwise
        x = self.pointwise(x)
        x = self.bn2(x)
        x = self.activation(x)

        return x

class DilatedConvBlock(nn.Module):
    """
    Dilated convolution block for capturing multi-scale features.

    Args:
        in_channels (int): Number of input channels
        out_channels (int): Number of output channels
        dilation_rates (list): List of dilation rates to use
        activation (nn.Module): Activation function class
    """

    def __init__(self, in_channels, out_channels,
                 dilation_rates=[1, 2, 4], activation=ReLU):
        super(DilatedConvBlock, self).__init__()

        self.dilated_convs = nn.ModuleList()

        for dilation in dilation_rates:
            conv = nn.Sequential(
                nn.Conv2d(
                    in_channels, out_channels // len(dilation_rates),
                    kernel_size=3, padding=dilation, dilation=dilation, bias=False
                ),
                nn.BatchNorm2d(out_channels // len(dilation_rates)),
                activation()
            )
            self.dilated_convs.append(conv)

        # Final 1x1 conv to combine features
        self.combine = nn.Sequential(
            nn.Conv2d(out_channels, out_channels, kernel_size=1, bias=False),
            nn.BatchNorm2d(out_channels),
            activation()
        )

    def forward(self, x):
        """Forward pass through dilated convolution block."""
        # Apply different dilated convolutions
        features = []
        for conv in self.dilated_convs:
            features.append(conv(x))

        # Concatenate features
        combined = torch.cat(features, dim=1)

        # Final combination
        output = self.combine(combined)

        return output

Attention Mechanism Blocks

Self-Attention Block

import math

class SelfAttentionBlock(nn.Module):
    """
    Self-attention block for capturing long-range dependencies.

    Args:
        in_channels (int): Number of input channels
        reduction_ratio (int): Reduction ratio for attention computation
        activation (nn.Module): Activation function class
    """

    def __init__(self, in_channels, reduction_ratio=8, activation=ReLU):
        super(SelfAttentionBlock, self).__init__()

        self.in_channels = in_channels
        self.reduction_ratio = reduction_ratio

        # Query, Key, Value projections
        self.query_conv = nn.Conv2d(in_channels, in_channels // reduction_ratio, 1)
        self.key_conv = nn.Conv2d(in_channels, in_channels // reduction_ratio, 1)
        self.value_conv = nn.Conv2d(in_channels, in_channels, 1)

        # Output projection
        self.output_conv = nn.Conv2d(in_channels, in_channels, 1)

        # Layer normalization
        self.layer_norm = nn.GroupNorm(1, in_channels)

        self.activation = activation()
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        """Forward pass through self-attention block."""
        batch_size, channels, height, width = x.size()

        # Store residual
        residual = x

        # Generate queries, keys, values
        queries = self.query_conv(x).view(batch_size, -1, height * width)
        keys = self.key_conv(x).view(batch_size, -1, height * width)
        values = self.value_conv(x).view(batch_size, -1, height * width)

        # Compute attention weights
        attention_weights = torch.bmm(queries.transpose(1, 2), keys)
        attention_weights = attention_weights / math.sqrt(channels // self.reduction_ratio)
        attention_weights = self.softmax(attention_weights)

        # Apply attention to values
        attended_values = torch.bmm(values, attention_weights.transpose(1, 2))
        attended_values = attended_values.view(batch_size, channels, height, width)

        # Output projection
        output = self.output_conv(attended_values)

        # Residual connection and layer norm
        output = residual + output
        output = self.layer_norm(output)

        return output

class ChannelAttentionBlock(nn.Module):
    """
    Channel attention block (enhanced Squeeze-and-Excitation).

    Args:
        in_channels (int): Number of input channels
        reduction_ratio (int): Reduction ratio for channel attention
        use_spatial (bool): Whether to include spatial attention
        activation (nn.Module): Activation function class
    """

    def __init__(self, in_channels, reduction_ratio=16,
                 use_spatial=True, activation=ReLU):
        super(ChannelAttentionBlock, self).__init__()

        self.in_channels = in_channels
        self.use_spatial = use_spatial

        # Channel attention
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)

        reduced_channels = max(in_channels // reduction_ratio, 1)

        self.channel_attention = nn.Sequential(
            nn.Linear(in_channels, reduced_channels, bias=False),
            activation(),
            nn.Linear(reduced_channels, in_channels, bias=False)
        )

        # Spatial attention (optional)
        if use_spatial:
            self.spatial_attention = nn.Sequential(
                nn.Conv2d(2, 1, kernel_size=7, padding=3, bias=False),
                nn.Sigmoid()
            )

        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        """Forward pass through channel attention block."""
        batch_size, channels, height, width = x.size()

        # Channel attention
        avg_pooled = self.avg_pool(x).view(batch_size, channels)
        max_pooled = self.max_pool(x).view(batch_size, channels)

        avg_attention = self.channel_attention(avg_pooled)
        max_attention = self.channel_attention(max_pooled)

        channel_attention = self.sigmoid(avg_attention + max_attention)
        channel_attention = channel_attention.view(batch_size, channels, 1, 1)

        # Apply channel attention
        x = x * channel_attention

        # Spatial attention (if enabled)
        if self.use_spatial:
            avg_spatial = torch.mean(x, dim=1, keepdim=True)
            max_spatial, _ = torch.max(x, dim=1, keepdim=True)
            spatial_input = torch.cat([avg_spatial, max_spatial], dim=1)

            spatial_attention = self.spatial_attention(spatial_input)
            x = x * spatial_attention

        return x

Domain-Specific Blocks

Remote Sensing Block

class RemoteSensingBlock(nn.Module):
    """
    Specialized block for remote sensing applications.

    This block is designed to handle multi-spectral imagery and
    capture both spectral and spatial features effectively.

    Args:
        in_channels (int): Number of input channels (spectral bands)
        out_channels (int): Number of output channels
        spectral_reduction (int): Reduction factor for spectral processing
        activation (nn.Module): Activation function class
    """

    def __init__(self, in_channels, out_channels,
                 spectral_reduction=4, activation=ReLU):
        super(RemoteSensingBlock, self).__init__()

        self.in_channels = in_channels
        self.out_channels = out_channels

        # Spectral feature extraction
        self.spectral_conv = nn.Sequential(
            nn.Conv2d(in_channels, in_channels // spectral_reduction, 1),
            nn.BatchNorm2d(in_channels // spectral_reduction),
            activation(),
            nn.Conv2d(in_channels // spectral_reduction, out_channels // 2, 1),
            nn.BatchNorm2d(out_channels // 2),
            activation()
        )

        # Spatial feature extraction with different scales
        self.spatial_conv_3x3 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels // 4, 3, padding=1),
            nn.BatchNorm2d(out_channels // 4),
            activation()
        )

        self.spatial_conv_5x5 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels // 4, 5, padding=2),
            nn.BatchNorm2d(out_channels // 4),
            activation()
        )

        # Feature fusion
        self.fusion = nn.Sequential(
            nn.Conv2d(out_channels, out_channels, 1),
            nn.BatchNorm2d(out_channels),
            activation()
        )

        # Adaptive feature weighting
        self.feature_weighting = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(out_channels, out_channels // 4, 1),
            activation(),
            nn.Conv2d(out_channels // 4, out_channels, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        """Forward pass through remote sensing block."""
        # Extract different types of features
        spectral_features = self.spectral_conv(x)
        spatial_features_3x3 = self.spatial_conv_3x3(x)
        spatial_features_5x5 = self.spatial_conv_5x5(x)

        # Combine features
        combined_features = torch.cat([
            spectral_features,
            spatial_features_3x3,
            spatial_features_5x5
        ], dim=1)

        # Feature fusion
        fused_features = self.fusion(combined_features)

        # Adaptive weighting
        weights = self.feature_weighting(fused_features)
        output = fused_features * weights

        return output

class EdgeDetectionBlock(nn.Module):
    """
    Edge detection block using learned filters.

    Args:
        in_channels (int): Number of input channels
        out_channels (int): Number of output channels
        edge_types (list): Types of edges to detect ('horizontal', 'vertical', 'diagonal')
        activation (nn.Module): Activation function class
    """

    def __init__(self, in_channels, out_channels,
                 edge_types=['horizontal', 'vertical', 'diagonal'],
                 activation=ReLU):
        super(EdgeDetectionBlock, self).__init__()

        self.edge_types = edge_types
        self.edge_detectors = nn.ModuleList()

        # Create edge detection filters for each type
        for edge_type in edge_types:
            if edge_type == 'horizontal':
                # Horizontal edge detection
                detector = nn.Conv2d(
                    in_channels, out_channels // len(edge_types),
                    kernel_size=3, padding=1, bias=False
                )
                # Initialize with horizontal edge filter
                with torch.no_grad():
                    detector.weight.fill_(0)
                    detector.weight[:, :, 0, :] = -1
                    detector.weight[:, :, 2, :] = 1

            elif edge_type == 'vertical':
                # Vertical edge detection
                detector = nn.Conv2d(
                    in_channels, out_channels // len(edge_types),
                    kernel_size=3, padding=1, bias=False
                )
                with torch.no_grad():
                    detector.weight.fill_(0)
                    detector.weight[:, :, :, 0] = -1
                    detector.weight[:, :, :, 2] = 1

            elif edge_type == 'diagonal':
                # Diagonal edge detection
                detector = nn.Conv2d(
                    in_channels, out_channels // len(edge_types),
                    kernel_size=3, padding=1, bias=False
                )
                with torch.no_grad():
                    detector.weight.fill_(0)
                    detector.weight[:, :, 0, 0] = -1
                    detector.weight[:, :, 2, 2] = 1

            self.edge_detectors.append(nn.Sequential(
                detector,
                nn.BatchNorm2d(out_channels // len(edge_types)),
                activation()
            ))

        # Feature combination
        self.combiner = nn.Sequential(
            nn.Conv2d(out_channels, out_channels, 1),
            nn.BatchNorm2d(out_channels),
            activation()
        )

    def forward(self, x):
        """Forward pass through edge detection block."""
        edge_features = []

        for detector in self.edge_detectors:
            edge_feature = detector(x)
            edge_features.append(edge_feature)

        # Combine edge features
        combined = torch.cat(edge_features, dim=1)
        output = self.combiner(combined)

        return output

Integrating Custom Blocks into PyNAS

Extending the Block Vocabulary

# File: custom_vocabulary.py

from pynas.core.vocabulary import (
    convolution_layer_vocabulary,
    layer_parameters
)

# Extend the vocabulary with custom blocks
custom_convolution_vocabulary = {
    **convolution_layer_vocabulary,
    'sep': 'SeparableConvBlock',
    'dil': 'DilatedConvBlock',
    'sa': 'SelfAttentionBlock',
    'ca': 'ChannelAttentionBlock',
    'rs': 'RemoteSensingBlock',
    'edge': 'EdgeDetectionBlock'
}

# Add parameters for custom blocks
custom_layer_parameters = {
    **layer_parameters,
    'SeparableConvBlock': [
        'out_channels_coefficient', 'kernel_size', 'stride', 'padding', 'activation'
    ],
    'DilatedConvBlock': [
        'out_channels_coefficient', 'dilation_rates', 'activation'
    ],
    'SelfAttentionBlock': [
        'reduction_ratio', 'activation'
    ],
    'ChannelAttentionBlock': [
        'reduction_ratio', 'use_spatial', 'activation'
    ],
    'RemoteSensingBlock': [
        'out_channels_coefficient', 'spectral_reduction', 'activation'
    ],
    'EdgeDetectionBlock': [
        'out_channels_coefficient', 'edge_types', 'activation'
    ]
}

Custom Block Registry

# File: custom_blocks_registry.py

import importlib
from typing import Dict, Any, Type
import torch.nn as nn

class CustomBlockRegistry:
    """Registry for managing custom blocks in PyNAS."""

    def __init__(self):
        self.blocks = {}
        self.parameter_configs = {}

        # Register built-in custom blocks
        self._register_builtin_blocks()

    def register_block(self, name: str, block_class: Type[nn.Module],
                      parameters: list, default_config: Dict[str, Any] = None):
        """
        Register a custom block.

        Args:
            name: Name identifier for the block
            block_class: PyTorch Module class
            parameters: List of parameter names for the block
            default_config: Default parameter values
        """
        self.blocks[name] = block_class
        self.parameter_configs[name] = {
            'parameters': parameters,
            'defaults': default_config or {}
        }

        print(f"Registered custom block: {name}")

    def get_block(self, name: str) -> Type[nn.Module]:
        """Get block class by name."""
        if name not in self.blocks:
            raise ValueError(f"Block '{name}' not found in registry")
        return self.blocks[name]

    def get_parameters(self, name: str) -> list:
        """Get parameter list for a block."""
        if name not in self.parameter_configs:
            raise ValueError(f"Block '{name}' not found in registry")
        return self.parameter_configs[name]['parameters']

    def get_defaults(self, name: str) -> Dict[str, Any]:
        """Get default configuration for a block."""
        if name not in self.parameter_configs:
            raise ValueError(f"Block '{name}' not found in registry")
        return self.parameter_configs[name]['defaults']

    def list_blocks(self) -> list:
        """List all registered blocks."""
        return list(self.blocks.keys())

    def _register_builtin_blocks(self):
        """Register built-in custom blocks."""
        # Register the custom blocks we defined
        self.register_block(
            'SeparableConvBlock',
            SeparableConvBlock,
            ['in_channels', 'out_channels', 'kernel_size', 'stride', 'padding', 'activation'],
            {'kernel_size': 3, 'stride': 1, 'padding': 1}
        )

        self.register_block(
            'DilatedConvBlock',
            DilatedConvBlock,
            ['in_channels', 'out_channels', 'dilation_rates', 'activation'],
            {'dilation_rates': [1, 2, 4]}
        )

        self.register_block(
            'SelfAttentionBlock',
            SelfAttentionBlock,
            ['in_channels', 'reduction_ratio', 'activation'],
            {'reduction_ratio': 8}
        )

        self.register_block(
            'ChannelAttentionBlock',
            ChannelAttentionBlock,
            ['in_channels', 'reduction_ratio', 'use_spatial', 'activation'],
            {'reduction_ratio': 16, 'use_spatial': True}
        )

        self.register_block(
            'RemoteSensingBlock',
            RemoteSensingBlock,
            ['in_channels', 'out_channels', 'spectral_reduction', 'activation'],
            {'spectral_reduction': 4}
        )

        self.register_block(
            'EdgeDetectionBlock',
            EdgeDetectionBlock,
            ['in_channels', 'out_channels', 'edge_types', 'activation'],
            {'edge_types': ['horizontal', 'vertical', 'diagonal']}
        )

# Global registry instance
custom_block_registry = CustomBlockRegistry()

Custom Architecture Builder

# File: custom_architecture_builder.py

from pynas.core.architecture_builder import ArchitectureBuilder
from pynas.core.generic_unet import build_layer, parse_conv_params
import configparser

class CustomArchitectureBuilder(ArchitectureBuilder):
    """Extended architecture builder with custom block support."""

    def __init__(self, custom_registry=None):
        super().__init__()
        self.custom_registry = custom_registry or custom_block_registry

        # Load custom configuration
        self.custom_config = self._load_custom_config()

    def _load_custom_config(self):
        """Load configuration for custom blocks."""
        config = configparser.ConfigParser()

        # Add configurations for custom blocks
        config.add_section('SeparableConvBlock')
        config.set('SeparableConvBlock', 'min_kernel_size', '3')
        config.set('SeparableConvBlock', 'max_kernel_size', '5')
        config.set('SeparableConvBlock', 'default_kernel_size', '3')
        config.set('SeparableConvBlock', 'min_out_channels_coefficient', '4')
        config.set('SeparableConvBlock', 'max_out_channels_coefficient', '12')
        config.set('SeparableConvBlock', 'default_out_channels_coefficient', '8')

        config.add_section('DilatedConvBlock')
        config.set('DilatedConvBlock', 'min_out_channels_coefficient', '4')
        config.set('DilatedConvBlock', 'max_out_channels_coefficient', '12')
        config.set('DilatedConvBlock', 'default_out_channels_coefficient', '8')

        config.add_section('SelfAttentionBlock')
        config.set('SelfAttentionBlock', 'min_reduction_ratio', '4')
        config.set('SelfAttentionBlock', 'max_reduction_ratio', '16')
        config.set('SelfAttentionBlock', 'default_reduction_ratio', '8')

        config.add_section('ChannelAttentionBlock')
        config.set('ChannelAttentionBlock', 'min_reduction_ratio', '8')
        config.set('ChannelAttentionBlock', 'max_reduction_ratio', '32')
        config.set('ChannelAttentionBlock', 'default_reduction_ratio', '16')

        config.add_section('RemoteSensingBlock')
        config.set('RemoteSensingBlock', 'min_out_channels_coefficient', '4')
        config.set('RemoteSensingBlock', 'max_out_channels_coefficient', '12')
        config.set('RemoteSensingBlock', 'default_out_channels_coefficient', '8')
        config.set('RemoteSensingBlock', 'min_spectral_reduction', '2')
        config.set('RemoteSensingBlock', 'max_spectral_reduction', '8')
        config.set('RemoteSensingBlock', 'default_spectral_reduction', '4')

        config.add_section('EdgeDetectionBlock')
        config.set('EdgeDetectionBlock', 'min_out_channels_coefficient', '4')
        config.set('EdgeDetectionBlock', 'max_out_channels_coefficient', '12')
        config.set('EdgeDetectionBlock', 'default_out_channels_coefficient', '8')

        return config

    def build_custom_layer(self, layer_config, current_channels,
                          current_height, current_width, get_activation_fn):
        """Build a custom layer from configuration."""
        layer_type = layer_config['layer_type']

        if layer_type not in self.custom_registry.blocks:
            # Fall back to default builder
            return build_layer(
                layer_config, self.custom_config, current_channels,
                current_height, current_width, 0, get_activation_fn
            )

        # Get block class and parameters
        block_class = self.custom_registry.get_block(layer_type)

        # Parse parameters specific to this block type
        params = self._parse_custom_params(layer_config, layer_type, current_channels)

        # Create the layer instance
        layer_instance = block_class(**params)

        # Calculate output channels and dimensions
        out_channels = params.get('out_channels', current_channels)

        return layer_instance, out_channels, current_height, current_width

    def _parse_custom_params(self, layer_config, layer_type, current_channels):
        """Parse parameters for custom blocks."""
        params = {}
        defaults = self.custom_registry.get_defaults(layer_type)

        # Common parameters
        if 'out_channels_coefficient' in layer_config:
            coeff = layer_config['out_channels_coefficient']
            params['out_channels'] = int(current_channels * coeff)
        elif layer_type in self.custom_config:
            default_coeff = self.custom_config.getfloat(
                layer_type, 'default_out_channels_coefficient'
            )
            params['out_channels'] = int(current_channels * default_coeff)

        params['in_channels'] = current_channels

        # Block-specific parameters
        if layer_type == 'SeparableConvBlock':
            params['kernel_size'] = layer_config.get('kernel_size', 3)
            params['stride'] = layer_config.get('stride', 1)
            params['padding'] = layer_config.get('padding', 1)

        elif layer_type == 'DilatedConvBlock':
            params['dilation_rates'] = layer_config.get(
                'dilation_rates', defaults['dilation_rates']
            )

        elif layer_type == 'SelfAttentionBlock':
            params['reduction_ratio'] = layer_config.get('reduction_ratio', 8)
            # Remove out_channels for attention blocks (they preserve channels)
            params['out_channels'] = current_channels

        elif layer_type == 'ChannelAttentionBlock':
            params['reduction_ratio'] = layer_config.get('reduction_ratio', 16)
            params['use_spatial'] = layer_config.get('use_spatial', True)
            params['out_channels'] = current_channels

        elif layer_type == 'RemoteSensingBlock':
            params['spectral_reduction'] = layer_config.get('spectral_reduction', 4)

        elif layer_type == 'EdgeDetectionBlock':
            params['edge_types'] = layer_config.get(
                'edge_types', defaults['edge_types']
            )

        # Activation function
        if 'activation' in layer_config:
            from pynas.blocks import activations
            activation_name = layer_config['activation']
            params['activation'] = getattr(activations, activation_name)

        return params

Evolution with Custom Blocks

Custom Block Evolution Example

# File: custom_block_evolution.py

import random
import torch
from pynas.core.population import Population
from pynas.core.individual import Individual
from custom_blocks_registry import custom_block_registry
from custom_architecture_builder import CustomArchitectureBuilder

class CustomBlockEvolution:
    """Evolution specifically using custom blocks."""

    def __init__(self, dataset, config):
        self.dataset = dataset
        self.config = config
        self.custom_builder = CustomArchitectureBuilder()

        # Define custom architecture templates
        self.custom_templates = [
            "sep2r_ca_sep2r_C",          # Separable + Channel Attention
            "rs3g_sa_dil2r_C",           # Remote Sensing + Self Attention + Dilated
            "edge1r_sep2r_ca_C",         # Edge Detection + Separable + Channel Attention
            "dil2g_sa_sep2r_C",          # Dilated + Self Attention + Separable
            "rs2r_edge1r_ca_sep1r_C",    # Complex multi-block architecture
        ]

    def create_custom_population(self):
        """Create population using custom blocks."""
        population = []

        for template in self.custom_templates:
            for variation in range(self.config['population_size'] // len(self.custom_templates)):
                individual = self._create_custom_individual(template, variation)
                population.append(individual)

        # Fill remaining spots with random custom architectures
        while len(population) < self.config['population_size']:
            random_individual = self._create_random_custom_individual()
            population.append(random_individual)

        return Population(individuals=population, config=self.config)

    def _create_custom_individual(self, template, variation):
        """Create individual from custom template with variations."""
        # Parse template into layers
        parsed_arch = self._parse_custom_template(template)

        # Apply variations
        if variation > 0:
            parsed_arch = self._apply_variations(parsed_arch, variation)

        return Individual(
            genome=parsed_arch,
            task=self.config.get('task', 'classification'),
            input_shape=self.config.get('input_shape', (3, 224, 224)),
            num_classes=self.dataset.num_classes
        )

    def _parse_custom_template(self, template):
        """Parse custom template string into architecture."""
        # Extended parsing for custom blocks
        custom_vocab = {
            'sep': 'SeparableConvBlock',
            'dil': 'DilatedConvBlock',
            'sa': 'SelfAttentionBlock',
            'ca': 'ChannelAttentionBlock',
            'rs': 'RemoteSensingBlock',
            'edge': 'EdgeDetectionBlock',
            'r': 'ReLU',
            'g': 'GELU',
            'C': 'Classifier'
        }

        parsed_layers = []
        i = 0

        while i < len(template):
            if template[i:i+4] == 'edge':
                # Edge detection block
                i += 4
                count = int(template[i]) if i < len(template) and template[i].isdigit() else 1
                i += 1
                activation = template[i] if i < len(template) else 'r'
                i += 1

                for _ in range(count):
                    parsed_layers.append({
                        'layer_type': 'EdgeDetectionBlock',
                        'activation': 'ReLU' if activation == 'r' else 'GELU',
                        'out_channels_coefficient': 8,
                        'edge_types': ['horizontal', 'vertical', 'diagonal']
                    })

            elif template[i:i+3] == 'sep':
                # Separable convolution block
                i += 3
                count = int(template[i]) if i < len(template) and template[i].isdigit() else 1
                i += 1
                activation = template[i] if i < len(template) else 'r'
                i += 1

                for _ in range(count):
                    parsed_layers.append({
                        'layer_type': 'SeparableConvBlock',
                        'activation': 'ReLU' if activation == 'r' else 'GELU',
                        'out_channels_coefficient': 8,
                        'kernel_size': 3
                    })

            elif template[i:i+3] == 'dil':
                # Dilated convolution block
                i += 3
                count = int(template[i]) if i < len(template) and template[i].isdigit() else 1
                i += 1
                activation = template[i] if i < len(template) else 'r'
                i += 1

                for _ in range(count):
                    parsed_layers.append({
                        'layer_type': 'DilatedConvBlock',
                        'activation': 'ReLU' if activation == 'r' else 'GELU',
                        'out_channels_coefficient': 8,
                        'dilation_rates': [1, 2, 4]
                    })

            elif template[i:i+2] == 'rs':
                # Remote sensing block
                i += 2
                count = int(template[i]) if i < len(template) and template[i].isdigit() else 1
                i += 1
                activation = template[i] if i < len(template) else 'r'
                i += 1

                for _ in range(count):
                    parsed_layers.append({
                        'layer_type': 'RemoteSensingBlock',
                        'activation': 'ReLU' if activation == 'r' else 'GELU',
                        'out_channels_coefficient': 8,
                        'spectral_reduction': 4
                    })

            elif template[i:i+2] == 'sa':
                # Self attention block
                i += 2
                parsed_layers.append({
                    'layer_type': 'SelfAttentionBlock',
                    'activation': 'ReLU',
                    'reduction_ratio': 8
                })

            elif template[i:i+2] == 'ca':
                # Channel attention block
                i += 2
                parsed_layers.append({
                    'layer_type': 'ChannelAttentionBlock',
                    'activation': 'ReLU',
                    'reduction_ratio': 16,
                    'use_spatial': True
                })

            elif template[i] == '_':
                # Separator
                i += 1

            elif template[i] == 'C':
                # Classifier head
                parsed_layers.append({
                    'layer_type': 'Classifier',
                    'activation': 'ReLU'
                })
                i += 1

            else:
                i += 1

        return parsed_layers

    def _apply_variations(self, parsed_arch, variation):
        """Apply variations to the base architecture."""
        varied_arch = parsed_arch.copy()

        for i, layer in enumerate(varied_arch):
            if variation == 1:
                # Increase channel coefficients
                if 'out_channels_coefficient' in layer:
                    layer['out_channels_coefficient'] = min(
                        layer['out_channels_coefficient'] + 2, 12
                    )

            elif variation == 2:
                # Change activation functions
                if layer.get('activation') == 'ReLU':
                    layer['activation'] = 'GELU'

            elif variation == 3:
                # Modify attention parameters
                if layer.get('layer_type') == 'SelfAttentionBlock':
                    layer['reduction_ratio'] = 4
                elif layer.get('layer_type') == 'ChannelAttentionBlock':
                    layer['reduction_ratio'] = 8

        return varied_arch

    def _create_random_custom_individual(self):
        """Create random architecture using custom blocks."""
        custom_blocks = [
            'SeparableConvBlock', 'DilatedConvBlock', 'SelfAttentionBlock',
            'ChannelAttentionBlock', 'RemoteSensingBlock', 'EdgeDetectionBlock'
        ]

        num_layers = random.randint(3, 6)
        parsed_arch = []

        for _ in range(num_layers):
            block_type = random.choice(custom_blocks)

            layer = {
                'layer_type': block_type,
                'activation': random.choice(['ReLU', 'GELU']),
            }

            if block_type in ['SeparableConvBlock', 'DilatedConvBlock',
                            'RemoteSensingBlock', 'EdgeDetectionBlock']:
                layer['out_channels_coefficient'] = random.randint(6, 10)

            if block_type == 'SeparableConvBlock':
                layer['kernel_size'] = random.choice([3, 5])

            elif block_type == 'SelfAttentionBlock':
                layer['reduction_ratio'] = random.choice([4, 8, 16])

            elif block_type == 'ChannelAttentionBlock':
                layer['reduction_ratio'] = random.choice([8, 16, 32])
                layer['use_spatial'] = random.choice([True, False])

            elif block_type == 'RemoteSensingBlock':
                layer['spectral_reduction'] = random.choice([2, 4, 8])

            parsed_arch.append(layer)

        # Add classifier head
        parsed_arch.append({
            'layer_type': 'Classifier',
            'activation': 'ReLU'
        })

        return Individual(
            genome=parsed_arch,
            task=self.config.get('task', 'classification'),
            input_shape=self.config.get('input_shape', (3, 224, 224)),
            num_classes=self.dataset.num_classes
        )

    def evolve_with_custom_blocks(self):
        """Run evolution using custom blocks."""
        population = self.create_custom_population()

        print(f"Starting evolution with {len(custom_block_registry.list_blocks())} custom blocks")
        print(f"Available blocks: {custom_block_registry.list_blocks()}")

        best_individuals = []

        for generation in range(self.config['max_iterations']):
            print(f"\n=== Generation {generation + 1} ===")

            # Evaluate population
            for individual in population.individuals:
                try:
                    model = individual.build_model()
                    if model is not None:
                        # Simple fitness evaluation (accuracy-based)
                        fitness = self._evaluate_model(model)
                        individual.fitness = fitness
                    else:
                        individual.fitness = 0.0
                except Exception as e:
                    print(f"Error building model: {e}")
                    individual.fitness = 0.0

            # Track best individual
            best_idx = max(range(len(population.individuals)),
                         key=lambda i: population.individuals[i].fitness)
            best_individual = population.individuals[best_idx]

            print(f"Best Individual Fitness: {best_individual.fitness:.4f}")
            print(f"Architecture: {self._format_architecture(best_individual.genome)}")

            best_individuals.append(best_individual.copy())

            # Evolution step
            if generation < self.config['max_iterations'] - 1:
                population = self._custom_evolution_step(population)

        return best_individuals

    def _evaluate_model(self, model):
        """Simple model evaluation (placeholder)."""
        # In a real scenario, this would evaluate on validation data
        # For this example, we'll use a simple heuristic
        param_count = sum(p.numel() for p in model.parameters())

        # Reward smaller models with reasonable complexity
        if param_count < 100000:
            return 0.5 + random.random() * 0.3
        elif param_count < 500000:
            return 0.6 + random.random() * 0.3
        else:
            return 0.4 + random.random() * 0.2

    def _format_architecture(self, genome):
        """Format architecture for display."""
        formatted = []
        for layer in genome:
            layer_type = layer.get('layer_type', 'Unknown')
            if layer_type == 'Classifier':
                formatted.append('C')
            else:
                formatted.append(layer_type[:3])
        return ' -> '.join(formatted)

    def _custom_evolution_step(self, population):
        """Evolution step preserving custom block characteristics."""
        # Selection (top 50%)
        population.individuals.sort(key=lambda x: x.fitness, reverse=True)
        selected = population.individuals[:len(population.individuals)//2]

        # Generate offspring
        offspring = []
        while len(offspring) < self.config['population_size']:
            parent1 = random.choice(selected)
            parent2 = random.choice(selected)

            child = self._custom_crossover(parent1, parent2)
            child = self._custom_mutation(child)

            offspring.append(child)

        return Population(individuals=offspring, config=self.config)

    def _custom_crossover(self, parent1, parent2):
        """Crossover preserving custom block structures."""
        # Simple single-point crossover
        p1_genome = parent1.genome[:-1]  # Exclude classifier
        p2_genome = parent2.genome[:-1]

        if len(p1_genome) > 0 and len(p2_genome) > 0:
            crossover_point = random.randint(1, min(len(p1_genome), len(p2_genome)))
            child_genome = p1_genome[:crossover_point] + p2_genome[crossover_point:]
        else:
            child_genome = p1_genome if len(p1_genome) > 0 else p2_genome

        # Add classifier
        child_genome.append({
            'layer_type': 'Classifier',
            'activation': 'ReLU'
        })

        return Individual(
            genome=child_genome,
            task=parent1.task,
            input_shape=parent1.input_shape,
            num_classes=parent1.num_classes
        )

    def _custom_mutation(self, individual):
        """Mutation respecting custom block constraints."""
        if random.random() < 0.3:  # 30% mutation rate
            genome = individual.genome[:-1]  # Exclude classifier

            if len(genome) > 0:
                # Choose mutation type
                mutation_type = random.choice(['modify', 'add', 'remove'])

                if mutation_type == 'modify':
                    # Modify existing layer
                    idx = random.randint(0, len(genome) - 1)
                    layer = genome[idx].copy()

                    if 'out_channels_coefficient' in layer:
                        layer['out_channels_coefficient'] = random.randint(6, 12)

                    if layer.get('layer_type') == 'SelfAttentionBlock':
                        layer['reduction_ratio'] = random.choice([4, 8, 16])

                    genome[idx] = layer

                elif mutation_type == 'add' and len(genome) < 6:
                    # Add new custom block
                    new_block_type = random.choice([
                        'SeparableConvBlock', 'ChannelAttentionBlock', 'DilatedConvBlock'
                    ])

                    new_layer = {
                        'layer_type': new_block_type,
                        'activation': random.choice(['ReLU', 'GELU']),
                        'out_channels_coefficient': random.randint(6, 10)
                    }

                    genome.append(new_layer)

                elif mutation_type == 'remove' and len(genome) > 1:
                    # Remove a layer
                    genome.pop(random.randint(0, len(genome) - 1))

            # Add classifier back
            genome.append({
                'layer_type': 'Classifier',
                'activation': 'ReLU'
            })

            individual.genome = genome

        return individual

Complete Custom Block Example

Main Execution Pipeline

def main_custom_blocks_example():
    """Complete example using custom blocks in PyNAS."""

    # Setup
    torch.manual_seed(42)

    # Mock dataset for example
    class MockDataset:
        def __init__(self):
            self.num_classes = 10

    dataset = MockDataset()

    config = {
        'population_size': 16,
        'max_iterations': 8,
        'task': 'classification',
        'input_shape': (3, 224, 224)
    }

    # Show registered custom blocks
    print("=== Custom Block Registry ===")
    print(f"Available blocks: {custom_block_registry.list_blocks()}")

    for block_name in custom_block_registry.list_blocks():
        params = custom_block_registry.get_parameters(block_name)
        print(f"{block_name}: {params}")

    # Run evolution with custom blocks
    print(f"\n=== Starting Evolution with Custom Blocks ===")
    evolution = CustomBlockEvolution(dataset, config)
    best_individuals = evolution.evolve_with_custom_blocks()

    # Show results
    print(f"\n=== Evolution Results ===")
    final_best = best_individuals[-1]
    print(f"Final best fitness: {final_best.fitness:.4f}")
    print(f"Final architecture:")

    for i, layer in enumerate(final_best.genome):
        print(f"  Layer {i+1}: {layer}")

    # Test building the model
    print(f"\n=== Building Final Model ===")
    try:
        final_model = final_best.build_model()
        if final_model is not None:
            param_count = sum(p.numel() for p in final_model.parameters())
            print(f"Model built successfully!")
            print(f"Parameter count: {param_count:,}")
            print(f"Model architecture:")
            print(final_model)
        else:
            print("Failed to build model")
    except Exception as e:
        print(f"Error building model: {e}")

    return best_individuals

if __name__ == "__main__":
    best_individuals = main_custom_blocks_example()

Expected Output

The custom blocks example will produce output similar to:

=== Custom Block Registry ===
Available blocks: ['SeparableConvBlock', 'DilatedConvBlock', 'SelfAttentionBlock', 'ChannelAttentionBlock', 'RemoteSensingBlock', 'EdgeDetectionBlock']
SeparableConvBlock: ['in_channels', 'out_channels', 'kernel_size', 'stride', 'padding', 'activation']
DilatedConvBlock: ['in_channels', 'out_channels', 'dilation_rates', 'activation']
SelfAttentionBlock: ['in_channels', 'reduction_ratio', 'activation']
ChannelAttentionBlock: ['in_channels', 'reduction_ratio', 'use_spatial', 'activation']
RemoteSensingBlock: ['in_channels', 'out_channels', 'spectral_reduction', 'activation']
EdgeDetectionBlock: ['in_channels', 'out_channels', 'edge_types', 'activation']

=== Starting Evolution with Custom Blocks ===
Starting evolution with 6 custom blocks
Available blocks: ['SeparableConvBlock', 'DilatedConvBlock', 'SelfAttentionBlock', 'ChannelAttentionBlock', 'RemoteSensingBlock', 'EdgeDetectionBlock']

=== Generation 1 ===
Best Individual Fitness: 0.7234
Architecture: Sep -> Cha -> Sep -> C

=== Generation 8 ===
Best Individual Fitness: 0.8456
Architecture: Rem -> Sel -> Dil -> Cha -> C

=== Evolution Results ===
Final best fitness: 0.8456
Final architecture:
  Layer 1: {'layer_type': 'RemoteSensingBlock', 'activation': 'ReLU', 'out_channels_coefficient': 8, 'spectral_reduction': 4}
  Layer 2: {'layer_type': 'SelfAttentionBlock', 'activation': 'ReLU', 'reduction_ratio': 8}
  Layer 3: {'layer_type': 'DilatedConvBlock', 'activation': 'GELU', 'out_channels_coefficient': 10, 'dilation_rates': [1, 2, 4]}
  Layer 4: {'layer_type': 'ChannelAttentionBlock', 'activation': 'ReLU', 'reduction_ratio': 16, 'use_spatial': True}
  Layer 5: {'layer_type': 'Classifier', 'activation': 'ReLU'}

=== Building Final Model ===
Model built successfully!
Parameter count: 234,567
Model architecture:
Sequential(
  (0): RemoteSensingBlock(...)
  (1): SelfAttentionBlock(...)
  (2): DilatedConvBlock(...)
  (3): ChannelAttentionBlock(...)
  (4): Classifier(...)
)

Best Practices for Custom Blocks

  1. Modular Design: Make blocks self-contained and composable

  2. Parameter Validation: Include proper input validation and error handling

  3. Configuration Flexibility: Support configurable parameters for evolution

  4. Memory Efficiency: Consider memory usage for large-scale search

  5. Documentation: Provide clear docstrings and usage examples

  6. Testing: Test blocks individually before integration

  7. Compatibility: Ensure blocks work with PyTorch’s standard operations

This example demonstrates how PyNAS can be extended with custom blocks, providing researchers with the flexibility to explore domain-specific architectures while leveraging the framework’s evolutionary search capabilities.