# -*- coding: utf-8 -*-
# MIT License
# Copyright (c) 2019 Saranraj Nambusubramaniyan
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from __future__ import division
import numpy as np
import os
import random
import logging
from multiprocessing import Pool
from tqdm import tqdm
from sorn.callbacks import *
try:
from sorn.utils import Initializer
except:
from utils import Initializer
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s:%(levelname)s:%(message)s",
handlers=[logging.FileHandler("sorn.log"), logging.StreamHandler()],
)
[docs]class Sorn(object):
"""This class wraps initialization of the network and its parameters"""
nu = 10
ne = 200
ni = int(0.2 * ne)
eta_stdp = 0.004
eta_inhib = 0.001
eta_ip = 0.01
te_max = 1.0
ti_max = 0.5
ti_min = 0.0
te_min = 0.0
mu_ip = 0.1
sigma_ip = 0.0
network_type_ee = "Sparse"
network_type_ei = "Sparse"
network_type_ie = "Dense"
lambda_ee = 20
lambda_ei = 40
lambda_ie = 100
[docs] @staticmethod
def initialize_weight_matrix(
network_type: str, synaptic_connection: str, self_connection: str, lambd_w: int
):
"""Wrapper for initializing the weight matrices for SORN
Args:
network_type (str): Spare or Dense
synaptic_connection (str): EE,EI,IE. Note that Spare connection is defined only for EE connections
self_connection (str): True or False: Synaptic delay or time delay
lambd_w (int): Average number of incoming and outgoing connections per neuron
Returns:
weight_matrix (array): Array of connection strengths
"""
if (network_type == "Sparse") and (self_connection == "False"):
# Generate weight matrix for E-E/ E-I connections with mean lamda incoming and out-going connections per neuron
assert (
lambd_w <= Sorn.ne
), "Number of connections per unit (lambda) should be less than number of units(Ne) in the pool and also Ne should be greater than 25"
weight_matrix = Initializer.generate_lambd_connections(
synaptic_connection, Sorn.ne, Sorn.ni, lambd_w, lambd_std=1
)
# Dense matrix for W_ie
elif (network_type == "Dense") and (self_connection == "False"):
# Uniform distribution of weights
weight_matrix = np.random.uniform(0.0, 0.1, (Sorn.ne, Sorn.ni))
weight_matrix.reshape((Sorn.ne, Sorn.ni))
return weight_matrix
[docs] @staticmethod
def initialize_threshold_matrix(
te_min: float, te_max: float, ti_min: float, ti_max: float
):
"""Initialize the threshold for excitatory and inhibitory neurons
Args:
te_min (float): Min threshold value for excitatory units
te_max (float): Min threshold value for inhibitory units
ti_min (float): Max threshold value for excitatory units
ti_max (float): Max threshold value for inhibitory units
Returns:
te (array): Threshold values for excitatory units
ti (array): Threshold values for inhibitory units
"""
te = np.random.uniform(te_min, te_max, (Sorn.ne, 1))
ti = np.random.uniform(ti_min, ti_max, (Sorn.ni, 1))
return te, ti
[docs] @staticmethod
def initialize_activity_vector(ne: int, ni: int):
"""Initialize the activity vectors X and Y for excitatory and inhibitory neurons
Args:
ne (int): Number of excitatory neurons
ni (int): Number of inhibitory neurons
Returns:
x (array): Array of activity vectors of excitatory population
y (array): Array of activity vectors of inhibitory population"""
x = np.zeros((ne, 2))
y = np.zeros((ni, 2))
return x, y
[docs]class Plasticity(Sorn):
"""Instance of class Sorn. Inherits the variables and functions defined in class Sorn.
It encapsulates all plasticity mechanisms mentioned in the article. Inherits all attributed from parent class Sorn
"""
def __init__(self):
super().__init__()
self.nu = Sorn.nu # Number of input units
self.ne = Sorn.ne # Number of excitatory units
self.eta_stdp = (
Sorn.eta_stdp
) # STDP plasticity Learning rate constant; SORN1 and SORN2
self.eta_ip = (
Sorn.eta_ip
) # Intrinsic plasticity learning rate constant; SORN1 and SORN2
self.eta_inhib = (
Sorn.eta_inhib
) # Intrinsic plasticity learning rate constant; SORN2 only
self.h_ip = 2 * Sorn.nu / Sorn.ne # Target firing rate
self.mu_ip = Sorn.mu_ip # Mean target firing rate
# Number of inhibitory units in the network
self.ni = int(0.2 * Sorn.ne)
self.timesteps = Sorn.timesteps # Total time steps of simulation
self.te_min = Sorn.te_min # Excitatory minimum Threshold
self.te_max = Sorn.te_max # Excitatory maximum Threshold
[docs] def stdp(self, wee: np.array, x: np.array, cutoff_weights: list):
"""Apply STDP rule : Regulates synaptic strength between the pre(Xj) and post(Xi) synaptic neurons
Args:
wee (array): Weight matrix
x (array): Excitatory network activity
cutoff_weights (list): Maximum and minimum weight ranges
Returns:
wee (array): Weight matrix
"""
x = np.asarray(x)
xt_1 = x[:, 0]
xt = x[:, 1]
wee_t = wee.copy()
# STDP applies only on the neurons which are connected.
for i in range(len(wee_t[0])): # Each neuron i, Post-synaptic neuron
for j in range(
len(wee_t[0:])
): # Incoming connection from jth pre-synaptic neuron to ith neuron
if wee_t[j][i] != 0.0: # Check connectivity
# Get the change in weight
delta_wee_t = self.eta_stdp * (xt[i] * xt_1[j] - xt_1[i] * xt[j])
# Update the weight between jth neuron to i ""Different from notation in article
wee_t[j][i] = wee[j][i] + delta_wee_t
# Prune the smallest weights induced by plasticity mechanisms; Apply lower cutoff weight
wee_t = Initializer.reset_min(wee_t, cutoff_weights[0])
# Check and set all weights < upper cutoff weight
wee_t = Initializer.reset_max(wee_t, cutoff_weights[1])
return wee_t
[docs] def ip(self, te: np.array, x: np.array):
"""Intrinsic Plasiticity mechanism
Args:
te (array): Threshold vector of excitatory units
x (array): Excitatory network activity
Returns:
te (array): Threshold vector of excitatory units
"""
# IP rule: Active unit increases its threshold and inactive decreases its threshold.
xt = x[:, 1]
te_update = te + self.eta_ip * (xt.reshape(self.ne, 1) - self.h_ip)
# Check whether all te are in range [0.0,1.0] and update acordingly
# Update te < 0.0 ---> 0.0
te_update = Initializer.reset_min(te_update, self.te_min)
# Set all te > 1.0 --> 1.0
te_update = Initializer.reset_max(te_update, self.te_max)
return te_update
[docs] @staticmethod
def ss(wee: np.array):
"""Synaptic Scaling or Synaptic Normalization
Args:
wee (array): Weight matrix
Returns:
wee (array): Scaled Weight matrix
"""
wee = wee / np.sum(wee, axis=0)
return wee
[docs] def istdp(self, wei: np.array, x: np.array, y: np.array, cutoff_weights: list):
"""Apply iSTDP rule, which regulates synaptic strength between the pre inhibitory(Xj) and post Excitatory(Xi) synaptic neurons
Args:
wei (array): Synaptic strengths from inhibitory to excitatory
x (array): Excitatory network activity
y (array): Inhibitory network activity
cutoff_weights (list): Maximum and minimum weight ranges
Returns:
wei (array): Synaptic strengths from inhibitory to excitatory"""
# Excitatory network activity
xt = np.asarray(x)[:, 1]
# Inhibitory network activity
yt_1 = np.asarray(y)[:, 0]
# iSTDP applies only on the neurons which are connected.
wei_t = wei.copy()
for i in range(
len(wei_t[0])
): # Each neuron i, Post-synaptic neuron: means for each column;
for j in range(
len(wei_t[0:])
): # Incoming connection from j, pre-synaptic neuron to ith neuron
if wei_t[j][i] != 0.0: # Check connectivity
# Get the change in weight
delta_wei_t = (
-self.eta_inhib * yt_1[j] * (1 - xt[i] * (1 + 1 / self.mu_ip))
)
# Update the weight between jth neuron to i ""Different from notation in article
wei_t[j][i] = wei[j][i] + delta_wei_t
# Prune the smallest weights induced by plasticity mechanisms; Apply lower cutoff weight
wei_t = Initializer.reset_min(wei_t, cutoff_weights[0])
# Check and set all weights < upper cutoff weight
wei_t = Initializer.reset_max(wei_t, cutoff_weights[1])
return wei_t
[docs] @staticmethod
def structural_plasticity(wee: np.array):
"""Add new connection value to the smallest weight between excitatory units randomly
Args:
wee (array): Weight matrix
Returns:
wee (array): Weight matrix"""
p_c = np.random.randint(0, 10, 1)
if p_c == 0: # p_c= 0.1
# Do structural plasticity
# Choose the smallest weights randomly from the weight matrix wee
indices = Initializer.inactive_synapses(wee)
# Choose any idx randomly such that i!=j
while True:
idx_rand = random.choice(indices)
if idx_rand[0] != idx_rand[1]:
break
wee[idx_rand[0]][idx_rand[1]] = 0.001
return wee
[docs] @staticmethod
def initialize_plasticity():
"""Initialize weight matrices for plasticity phase based on network configuration
Args:
kwargs (self.__dict__): All arguments are inherited from Sorn attributes
Returns:
tuple(array): Weight matrices WEI, WEE, WIE and threshold matrices Te, Ti and Initial state vectors X,Y"""
sorn_init = Sorn()
WEE_init = sorn_init.initialize_weight_matrix(
network_type=Sorn.network_type_ee,
synaptic_connection="EE",
self_connection="False",
lambd_w=Sorn.lambda_ee,
)
WEI_init = sorn_init.initialize_weight_matrix(
network_type=Sorn.network_type_ei,
synaptic_connection="EI",
self_connection="False",
lambd_w=Sorn.lambda_ei,
)
WIE_init = sorn_init.initialize_weight_matrix(
network_type=Sorn.network_type_ie,
synaptic_connection="IE",
self_connection="False",
lambd_w=Sorn.lambda_ie,
)
Wee_init = Initializer.zero_sum_incoming_check(WEE_init)
Wei_init = Initializer.zero_sum_incoming_check(WEI_init)
Wie_init = Initializer.zero_sum_incoming_check(WIE_init)
c = np.count_nonzero(Wee_init)
v = np.count_nonzero(Wei_init)
b = np.count_nonzero(Wie_init)
logging.info("Network Initialized")
logging.info("Number of connections in Wee %s , Wei %s, Wie %s" % (c, v, b))
logging.info(
"Shapes Wee %s Wei %s Wie %s"
% (Wee_init.shape, Wei_init.shape, Wie_init.shape)
)
# Normalize the incoming weights
normalized_wee = Initializer.normalize_weight_matrix(Wee_init)
normalized_wei = Initializer.normalize_weight_matrix(Wei_init)
normalized_wie = Initializer.normalize_weight_matrix(Wie_init)
te_init, ti_init = sorn_init.initialize_threshold_matrix(
Sorn.te_min, Sorn.te_max, Sorn.ti_min, Sorn.ti_max
)
x_init, y_init = sorn_init.initialize_activity_vector(Sorn.ne, Sorn.ni)
# Initializing variables from sorn_initialize.py
wee = normalized_wee.copy()
wei = normalized_wei.copy()
wie = normalized_wie.copy()
te = te_init.copy()
ti = ti_init.copy()
x = x_init.copy()
y = y_init.copy()
return wee, wei, wie, te, ti, x, y
[docs]class MatrixCollection(Sorn):
"""Collect all matrices initialized and updated during simulation(plasiticity and training phases)
Args:
phase(str): Training or Plasticity phase
state(dict): Network activity, threshold and connection matrices
Returns:
MatrixCollection instance"""
def __init__(self, phase: str, state: dict = None):
super().__init__()
self.phase = phase
self.state = state
if self.phase == "plasticity" and self.state == None:
self.timesteps = Sorn.timesteps + 1 # Total training steps
self.Wee, self.Wei, self.Wie, self.Te, self.Ti, self.X, self.Y = (
[0] * self.timesteps,
[0] * self.timesteps,
[0] * self.timesteps,
[0] * self.timesteps,
[0] * self.timesteps,
[0] * self.timesteps,
[0] * self.timesteps,
)
wee, wei, wie, te, ti, x, y = Plasticity.initialize_plasticity()
# Assign initial matrix to the master matrices
self.Wee[0] = wee
self.Wei[0] = wei
self.Wie[0] = wie
self.Te[0] = te
self.Ti[0] = ti
self.X[0] = x
self.Y[0] = y
elif self.phase == "plasticity" and self.state != None:
self.timesteps = Sorn.timesteps + 1 # Total training steps
self.Wee, self.Wei, self.Wie, self.Te, self.Ti, self.X, self.Y = (
[0] * self.timesteps,
[0] * self.timesteps,
[0] * self.timesteps,
[0] * self.timesteps,
[0] * self.timesteps,
[0] * self.timesteps,
[0] * self.timesteps,
)
# Assign matrices from plasticity phase to the new master matrices for training phase
self.Wee[0] = state["Wee"]
self.Wei[0] = state["Wei"]
self.Wie[0] = state["Wie"]
self.Te[0] = state["Te"]
self.Ti[0] = state["Ti"]
self.X[0] = state["X"]
self.Y[0] = state["Y"]
elif self.phase == "training":
# NOTE:timesteps here is diferent for plasticity and training phase
self.timesteps = Sorn.timesteps + 1 # Total training steps
self.Wee, self.Wei, self.Wie, self.Te, self.Ti, self.X, self.Y = (
[0] * self.timesteps,
[0] * self.timesteps,
[0] * self.timesteps,
[0] * self.timesteps,
[0] * self.timesteps,
[0] * self.timesteps,
[0] * self.timesteps,
)
# Assign matrices from plasticity phase to new respective matrices for training phase
self.Wee[0] = state["Wee"]
self.Wei[0] = state["Wei"]
self.Wie[0] = state["Wie"]
self.Te[0] = state["Te"]
self.Ti[0] = state["Ti"]
self.X[0] = state["X"]
self.Y[0] = state["Y"]
[docs] def weight_matrix(self, wee: np.array, wei: np.array, wie: np.array, i: int):
"""Update weight matrices
Args:
wee(array): Excitatory-Excitatory weight matrix
wei(array): Inhibitory-Excitatory weight matrix
wie(array): Excitatory-Inhibitory weight matrix
i(int): Time step
Returns:
tuple(array): Weight Matrices Wee, Wei, Wie"""
self.Wee[i + 1] = wee
self.Wei[i + 1] = wei
self.Wie[i + 1] = wie
return self.Wee, self.Wei, self.Wie
[docs] def threshold_matrix(self, te: np.array, ti: np.array, i: int):
"""Update threshold matrices
Args:
te(array): Excitatory threshold
ti(array): Inhibitory threshold
i(int): Time step
Returns:
tuple(array): Threshold Matrices Te and Ti"""
self.Te[i + 1] = te
self.Ti[i + 1] = ti
return self.Te, self.Ti
[docs] def network_activity_t(
self, excitatory_net: np.array, inhibitory_net: np.array, i: int
):
"""Network state at current time step
Args:
excitatory_net(array): Excitatory network activity
inhibitory_net(array): Inhibitory network activity
i(int): Time step
Returns:
tuple(array): Updated Excitatory and Inhibitory states
"""
self.X[i + 1] = excitatory_net
self.Y[i + 1] = inhibitory_net
return self.X, self.Y
[docs] def network_activity_t_1(self, x: np.array, y: np.array, i: int):
"""Network activity at previous time step
Args:
x(array): Excitatory network activity
y(array): Inhibitory network activity
i(int): Time step
Returns:
tuple(array): Previous Excitatory and Inhibitory states
"""
x_1, y_1 = [0] * self.timesteps, [0] * self.timesteps
x_1[i] = x
y_1[i] = y
return x_1, y_1
[docs]class NetworkState(Plasticity):
"""The evolution of network states
Args:
v_t(array): External input/stimuli
Returns:
instance(object): NetworkState instance"""
def __init__(self, v_t: np.array):
super().__init__()
self.v_t = v_t
assert Sorn.nu == len(
self.v_t
), "Input units and input size mismatch: {} != {}".format(
Sorn.nu, len(self.v_t)
)
if Sorn.nu != Sorn.ne:
self.v_t = list(self.v_t) + [0.0] * (Sorn.ne - Sorn.nu)
self.v_t = np.expand_dims(self.v_t, 1)
[docs] def incoming_drive(self, weights: np.array, activity_vector: np.array):
"""Excitatory Post synaptic potential towards neurons in the reservoir in the absence of external input
Args:
weights(array): Synaptic strengths
activity_vector(list): Acitivity of inhibitory or Excitatory neurons
Returns:
incoming(array): Excitatory Post synaptic potential towards neurons
"""
incoming = weights * activity_vector
incoming = np.array(incoming.sum(axis=0))
return incoming
[docs] def excitatory_network_state(
self,
wee: np.array,
wei: np.array,
te: np.array,
x: np.array,
y: np.array,
white_noise_e: np.array,
):
"""Activity of Excitatory neurons in the network
Args:
wee(array): Excitatory-Excitatory weight matrix
wei(array): Inhibitory-Excitatory weight matrix
te(array): Excitatory threshold
x(array): Excitatory network activity
y(array): Inhibitory network activity
white_noise_e(array): Gaussian noise
Returns:
x(array): Current Excitatory network activity
"""
xt = x[:, 1]
xt = xt.reshape(self.ne, 1)
yt = y[:, 1]
yt = yt.reshape(self.ni, 1)
incoming_drive_e = np.expand_dims(
self.incoming_drive(weights=wee, activity_vector=xt), 1
)
incoming_drive_i = np.expand_dims(
self.incoming_drive(weights=wei, activity_vector=yt), 1
)
tot_incoming_drive = (
incoming_drive_e
- incoming_drive_i
+ white_noise_e
+ np.asarray(self.v_t)
- te
)
# Heaviside step function
heaviside_step = np.expand_dims([0.0] * len(tot_incoming_drive), 1)
heaviside_step[tot_incoming_drive > 0] = 1.0
return heaviside_step
[docs] def inhibitory_network_state(
self, wie: np.array, ti: np.array, y: np.array, white_noise_i: np.array
):
"""Activity of Excitatory neurons in the network
Args:
wee(array): Excitatory-Excitatory weight matrix
wie(array): Excitatory-Inhibitory weight matrix
ti(array): Inhibitory threshold
y(array): Inhibitory network activity
white_noise_i(array): Gaussian noise
Returns:
y(array): Current Inhibitory network activity"""
wie = np.asarray(wie)
yt = y[:, 1]
yt = yt.reshape(Sorn.ne, 1)
incoming_drive_e = np.expand_dims(
self.incoming_drive(weights=wie, activity_vector=yt), 1
)
tot_incoming_drive = incoming_drive_e + white_noise_i - ti
heaviside_step = np.expand_dims([0.0] * len(tot_incoming_drive), 1)
heaviside_step[tot_incoming_drive > 0] = 1.0
return heaviside_step
[docs] def recurrent_drive(
self,
wee: np.array,
wei: np.array,
te: np.array,
x: np.array,
y: np.array,
white_noise_e: np.array,
):
"""Network state due to recurrent drive received by the each unit at time t+1. Activity of Excitatory neurons without external stimuli
Args:
wee(array): Excitatory-Excitatory weight matrix
wei(array): Inhibitory-Excitatory weight matrix
te(array): Excitatory threshold
x(array): Excitatory network activity
y(array): Inhibitory network activity
white_noise_e(array): Gaussian noise
Returns:
xt(array): Recurrent network state
"""
xt = x[:, 1]
xt = xt.reshape(self.ne, 1)
yt = y[:, 1]
yt = yt.reshape(self.ni, 1)
incoming_drive_e = np.expand_dims(
self.incoming_drive(weights=wee, activity_vector=xt), 1
)
incoming_drive_i = np.expand_dims(
self.incoming_drive(weights=wei, activity_vector=yt), 1
)
tot_incoming_drive = incoming_drive_e - incoming_drive_i + white_noise_e - te
heaviside_step = np.expand_dims([0.0] * len(tot_incoming_drive), 1)
heaviside_step[tot_incoming_drive > 0] = 1.0
return heaviside_step
# Simulate / Train SORN
[docs]class Simulator_(Sorn):
"""Simulate SORN using external input/noise using the fresh or pretrained matrices
Args:
inputs(np.array, optional): External stimuli. Defaults to None.
phase(str, optional): Plasticity phase. Defaults to "plasticity".
matrices(dict, optional): Network states, connections and threshold matrices. Defaults to None.
timesteps(int, optional): Total number of time steps to simulate the network. Defaults to 1.
noise(bool, optional): If True, noise will be added. Defaults to True.
Returns:
last_state(dict): Network states, connections and threshold matrices
X_all(array): Excitatory network activity collected during entire simulation steps
Y_all(array): Inhibitory network activity collected during entire simulation steps
R_all(array): Recurrent network activity collected during entire simulation steps
frac_pos_active_conn(list): Number of positive connection strengths in the network at each time step during simulation"""
def __init__(self):
super().__init__()
self.avail_callbacks = {
"ExcitatoryActivation": ExcitatoryActivation,
"InhibitoryActivation": InhibitoryActivation,
"RecurrentActivation": RecurrentActivation,
"WEE": WEE,
"WEI": WEI,
"TE": TE,
"TI": TI,
"EIConnectionCounts": EIConnectionCounts,
"EEConnectionCounts": EEConnectionCounts,
}
[docs] def update_callback_state(self, *args) -> None:
if self.callbacks:
(indices,) = np.array(self.callback_mask).nonzero()
keys = [list(self.avail_callbacks.keys())[i] for i in indices]
values = [args[idx] for idx in indices]
for key, val in zip(keys, values):
self.callback_state[key] = val
[docs] def run(
self,
inputs: np.array = None,
phase: str = "plasticity",
state: dict = None,
timesteps: int = None,
noise: bool = True,
freeze: list = None,
callbacks: list = [],
**kwargs,
):
"""Simulation/Plasticity phase
Args:
inputs(np.array, optional): External stimuli. Defaults to None.
phase(str, optional): Plasticity phase. Defaults to "plasticity"
state(dict, optional): Network states, connections and threshold matrices. Defaults to None.
timesteps(int, optional): Total number of time steps to simulate the network. Defaults to 1.
noise(bool, optional): If True, noise will be added. Defaults to True.
freeze(list, optional): List of synaptic plasticity mechanisms which will be turned off during simulation. Defaults to None.
callbacks(list, optional): Requested values from ["ExcitatoryActivation", "InhibitoryActivation",
"RecurrentActivation", "WEE", "WEI", "TE", "EEConnectionCounts"] collected and returned from the simulate sorn object.
Returns:
last_state(dict): Network states, connections and threshold matrices
callback_values(dict): Requexted network parameters and activations"""
assert (
phase == "plasticity" or "training"
), "Phase can be either 'plasticity' or 'training'"
self.timesteps = timesteps
Sorn.timesteps = timesteps
self.phase = phase
self.state = state
self.freeze = [] if freeze == None else freeze
self.callbacks = callbacks
kwargs_ = [
"ne",
"nu",
"network_type_ee",
"network_type_ei",
"network_type_ie",
"lambda_ee",
"lambda_ei",
"lambda_ie",
"eta_stdp",
"eta_inhib",
"eta_ip",
"te_max",
"ti_max",
"ti_min",
"te_min",
"mu_ip",
"sigma_ip",
]
for key, value in kwargs.items():
if key in kwargs_:
setattr(Sorn, key, value)
Sorn.ni = int(0.2 * Sorn.ne)
plasticity = Plasticity()
# Initialize/Get the weight, threshold matrices and activity vectors
matrix_collection = MatrixCollection(phase=self.phase, state=self.state)
if self.callbacks:
assert isinstance(self.callbacks, list), "Callbacks must be a list"
assert all(isinstance(callback, str) for callback in self.callbacks)
self.callback_mask = np.isin(
list(self.avail_callbacks.keys()), self.callbacks
).astype(int)
self.callback_state = dict.fromkeys(self.callbacks, None)
# Requested values to be collected and returned
self.dispatcher = Callbacks(
self.timesteps, self.avail_callbacks, self.callbacks
)
# To get the last activation status of Exc and Inh neurons
for i in tqdm(range(self.timesteps)):
if noise:
white_noise_e = Initializer.white_gaussian_noise(
mu=0.0, sigma=0.04, t=Sorn.ne
)
white_noise_i = Initializer.white_gaussian_noise(
mu=0.0, sigma=0.04, t=Sorn.ni
)
else:
white_noise_e, white_noise_i = 0.0, 0.0
network_state = NetworkState(inputs[:, i])
# Buffers to get the resulting x and y vectors at the current time step and update the master matrix
x_buffer, y_buffer = np.zeros((Sorn.ne, 2)), np.zeros((Sorn.ni, 2))
Wee, Wei, Wie = (
matrix_collection.Wee,
matrix_collection.Wei,
matrix_collection.Wie,
)
Te, Ti = matrix_collection.Te, matrix_collection.Ti
X, Y = matrix_collection.X, matrix_collection.Y
# Fraction of active connections between E-E and E-I networks
ei_conn = (Wei[i] > 0.0).sum()
ee_conn = (Wee[i] > 0.0).sum()
# Recurrent drive
r = network_state.recurrent_drive(
Wee[i], Wei[i], Te[i], X[i], Y[i], white_noise_e
)
# Get excitatory states and inhibitory states given the weights and thresholds
# x(t+1), y(t+1)
excitatory_state_xt_buffer = network_state.excitatory_network_state(
Wee[i], Wei[i], Te[i], X[i], Y[i], white_noise_e
)
inhibitory_state_yt_buffer = network_state.inhibitory_network_state(
Wie[i], Ti[i], X[i], white_noise_i
)
# Update X and Y
x_buffer[:, 0] = X[i][:, 1] # xt -->(becomes) xt_1
x_buffer[
:, 1
] = excitatory_state_xt_buffer.T # New_activation; x_buffer --> xt
y_buffer[:, 0] = Y[i][:, 1]
y_buffer[:, 1] = inhibitory_state_yt_buffer.T
# Plasticity phase
plasticity = Plasticity()
# STDP
if "stdp" not in self.freeze:
Wee[i] = plasticity.stdp(Wee[i], x_buffer, cutoff_weights=(0.0, 1.0))
# Intrinsic plasticity
if "ip" not in self.freeze:
Te[i] = plasticity.ip(Te[i], x_buffer)
# Structural plasticity
if "sp" not in self.freeze:
Wee[i] = plasticity.structural_plasticity(Wee[i])
# iSTDP
if "istdp" not in self.freeze:
Wei[i] = plasticity.istdp(
Wei[i], x_buffer, y_buffer, cutoff_weights=(0.0, 1.0)
)
# Synaptic scaling Wee
if "ss" not in self.freeze:
Wee[i] = plasticity.ss(Wee[i])
Wei[i] = plasticity.ss(Wei[i])
# Assign the matrices to the matrix collections
matrix_collection.weight_matrix(Wee[i], Wei[i], Wie[i], i)
matrix_collection.threshold_matrix(Te[i], Ti[i], i)
matrix_collection.network_activity_t(x_buffer, y_buffer, i)
if self.callbacks:
self.update_callback_state(
x_buffer[:, 1],
y_buffer[:, 1],
r,
Wee[i],
Wei[i],
Te[i],
Ti[i],
ei_conn,
ee_conn,
)
self.dispatcher.step(self.callback_state, time_step=i)
last_state = {
"Wee": matrix_collection.Wee[-1],
"Wei": matrix_collection.Wei[-1],
"Wie": matrix_collection.Wie[-1],
"Te": matrix_collection.Te[-1],
"Ti": matrix_collection.Ti[-1],
"X": X[-1],
"Y": Y[-1],
}
if self.callbacks:
return last_state, self.dispatcher.get()
else:
return last_state, {}
[docs]class Trainer_(Sorn):
"""Train the network with the fresh or pretrained network matrices and external stimuli"""
def __init__(self):
super().__init__()
self.avail_callbacks = {
"ExcitatoryActivation": ExcitatoryActivation,
"InhibitoryActivation": InhibitoryActivation,
"RecurrentActivation": RecurrentActivation,
"WEE": WEE,
"WEI": WEI,
"TE": TE,
"TI": TI,
"EIConnectionCounts": EIConnectionCounts,
"EEConnectionCounts": EEConnectionCounts,
}
[docs] def update_callback_state(self, *args) -> None:
if self.callbacks:
(indices,) = np.array(self.callback_mask).nonzero()
keys = [list(self.avail_callbacks.keys())[i] for i in indices]
values = [args[idx] for idx in indices]
for key, val in zip(keys, values):
self.callback_state[key] = val
[docs] def run(
self,
inputs: np.array = None,
phase: str = "training",
state: dict = None,
timesteps: int = None,
noise: bool = True,
freeze: list = None,
callbacks: list = [],
**kwargs,
):
"""Train the network with the fresh or pretrained network matrices and external stimuli
Args:
inputs(np.array, optional): External stimuli. Defaults to None.
phase(str, optional): Training phase. Defaults to "training".
state(dict, optional): Network states, connections and threshold matrices. Defaults to None.
timesteps(int, optional): Total number of time steps to simulate the network. Defaults to 1.
noise(bool, optional): If True, noise will be added. Defaults to True.
freeze(list, optional): List of synaptic plasticity mechanisms which will be turned off during simulation. Defaults to None.
max_workers(int, optional): Maximum workers for multhreading the plasticity steps
Returns:
last_state(dict): Network states, connections and threshold matrices
X_all(array): Excitatory network activity collected during entire simulation steps
Y_all(array): Inhibitory network activity collected during entire simulation steps
R_all(array): Recurrent network activity collected during entire simulation steps
frac_pos_active_conn(list): Number of positive connection strengths in the network at each time step during simulation"""
assert (
phase == "plasticity" or "training"
), "Phase can be either 'plasticity' or 'training'"
kwargs_ = [
"ne",
"nu",
"network_type_ee",
"network_type_ei",
"network_type_ie",
"lambda_ee",
"lambda_ei",
"lambda_ie",
"eta_stdp",
"eta_inhib",
"eta_ip",
"te_max",
"ti_max",
"ti_min",
"te_min",
"mu_ip",
"sigma_ip",
]
for key, value in kwargs.items():
if key in kwargs_:
setattr(Sorn, key, value)
Sorn.ni = int(0.2 * Sorn.ne)
self.phase = phase
self.state = state
self.timesteps = timesteps
Sorn.timesteps = timesteps
self.inputs = np.asarray(inputs)
self.freeze = [] if freeze == None else freeze
self.callbacks = callbacks
matrix_collection = MatrixCollection(phase=self.phase, state=self.state)
if self.callbacks:
assert isinstance(self.callbacks, list), "Callbacks must be a list"
assert all(isinstance(callback, str) for callback in self.callbacks)
self.callback_mask = np.isin(
list(self.avail_callbacks.keys()), self.callbacks
).astype(int)
self.callback_state = dict.fromkeys(self.callbacks, None)
# Requested values to be collected and returned
self.dispatcher = Callbacks(
self.timesteps, self.avail_callbacks, self.callbacks
)
for i in range(self.timesteps):
if noise:
white_noise_e = Initializer.white_gaussian_noise(
mu=0.0, sigma=0.04, t=Sorn.ne
)
white_noise_i = Initializer.white_gaussian_noise(
mu=0.0, sigma=0.04, t=Sorn.ni
)
else:
white_noise_e = 0.0
white_noise_i = 0.0
network_state = NetworkState(self.inputs[:, i])
# Buffers to get the resulting x and y vectors at the current time step and update the master matrix
x_buffer, y_buffer = np.zeros((Sorn.ne, 2)), np.zeros((Sorn.ni, 2))
Wee, Wei, Wie = (
matrix_collection.Wee,
matrix_collection.Wei,
matrix_collection.Wie,
)
Te, Ti = matrix_collection.Te, matrix_collection.Ti
X, Y = matrix_collection.X, matrix_collection.Y
# Fraction of active connections between E-E and E-I networks
ei_conn = (Wei[i] > 0.0).sum()
ee_conn = (Wee[i] > 0.0).sum()
# Recurrent drive at t+1 used to predict the next external stimuli
r = network_state.recurrent_drive(
Wee[i], Wei[i], Te[i], X[i], Y[i], white_noise_e=white_noise_e
)
# Get excitatory states and inhibitory states given the weights and thresholds
# x(t+1), y(t+1)
excitatory_state_xt_buffer = network_state.excitatory_network_state(
Wee[i], Wei[i], Te[i], X[i], Y[i], white_noise_e=white_noise_e
)
inhibitory_state_yt_buffer = network_state.inhibitory_network_state(
Wie[i], Ti[i], X[i], white_noise_i=white_noise_i
)
# Update X and Y
x_buffer[:, 0] = X[i][:, 1] # xt -->xt_1
x_buffer[:, 1] = excitatory_state_xt_buffer.T # x_buffer --> xt
y_buffer[:, 0] = Y[i][:, 1]
y_buffer[:, 1] = inhibitory_state_yt_buffer.T
if self.phase == "plasticity":
# Plasticity phase
plasticity = Plasticity()
# STDP
if "stdp" not in self.freeze:
Wee[i] = plasticity.stdp(
Wee[i], x_buffer, cutoff_weights=(0.0, 1.0)
)
# Intrinsic plasticity
if "ip" not in self.freeze:
Te[i] = plasticity.ip(Te[i], x_buffer)
# Structural plasticity
if "sp" not in self.freeze:
Wee[i] = plasticity.structural_plasticity(Wee[i])
# iSTDP
if "istdp" not in self.freeze:
Wei[i] = plasticity.istdp(
Wei[i], x_buffer, y_buffer, cutoff_weights=(0.0, 1.0)
)
# Synaptic scaling Wee
if "ss" not in self.freeze:
Wee[i] = plasticity.ss(Wee[i])
Wei[i] = plasticity.ss(Wei[i])
else:
# Wee[i], Wei[i], Te[i] remain same
pass
# Assign the matrices to the matrix collections
matrix_collection.weight_matrix(Wee[i], Wei[i], Wie[i], i)
matrix_collection.threshold_matrix(Te[i], Ti[i], i)
matrix_collection.network_activity_t(x_buffer, y_buffer, i)
if self.callbacks:
self.update_callback_state(
x_buffer[:, 1],
y_buffer[:, 1],
r,
Wee[i],
Wei[i],
Te[i],
Ti[i],
ei_conn,
ee_conn,
)
self.dispatcher.step(self.callback_state, time_step=i)
last_state = {
"Wee": matrix_collection.Wee[-1],
"Wei": matrix_collection.Wei[-1],
"Wie": matrix_collection.Wie[-1],
"Te": matrix_collection.Te[-1],
"Ti": matrix_collection.Ti[-1],
"X": X[-1],
"Y": Y[-1],
}
if self.callbacks:
return last_state, self.dispatcher.get()
else:
return last_state, {}
Trainer = Trainer_()
Simulator = Simulator_()
if __name__ == "__main__":
pass