Source code for sorn.utils

# MIT License

# Copyright (c) 2019 Saranraj Nambusubramaniyan

# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

from __future__ import division
import numpy as np
from scipy.stats import norm
import random
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.optimize import curve_fit
from scipy import stats
import networkx as nx
import pandas as pd
from mpl_toolkits.axes_grid1.inset_locator import InsetPosition


class Initializer(object):
    """
    Helper class to initialize the matrices for the SORN
    """

    def __init__(self):
        pass

    @staticmethod
    def generate_strong_inp(length: int, reservoir_size: int):
        """Generate strong one-hot vector of input. Random neurons in the reservoir acts as inputs

        Args:
            length (int): Number of input neurons

        Returns:
            inp (array): Input vector of length equals the number of neurons in the reservoir
                  with randomly chosen neuron set active

            idx (list): List of chosen input neurons"""

        inp = [0] * reservoir_size
        x = [0] * length
        idx = np.random.choice(length, np.random.randint(reservoir_size))

        for i in idx:
            x[i] = 1.0e4

        inp[: len(x)] = x

        return inp, idx

    # Generate multi-node one-hot strong inputs

    @staticmethod
    def multi_one_hot_inp(ne: int, inputs: list, n_nodes_per_inp: int):
        """Generate multi(n_nodes_per_inp) one hot vector for each input.
        For each input, set n_nodes_per_inp equals one and the rest of
        neurons in the pool recieves no external stimuli

        Args:
          ne (int): Number of excitatory units in sorn

          inputs (list): input labels

          n_nodes_per_inp(int): Number of target units in pool that receives single input

        Returns:
          one_hot_vector for each label with length equals ne

        """

        one_hot = np.zeros((ne, len(inputs)))

        idxs = []

        for _ in range(n_nodes_per_inp):
            idxs.append(random.sample(range(0, ne), len(inputs)))

        idxs = list(zip(*idxs))

        j = 0  # Max(j) = len(inputs)
        for idx_list in idxs:
            for i in idx_list:
                one_hot[i][j] = 1
            j += 1

        return one_hot, idxs

    @staticmethod
    def generate_gaussian_inputs(length: int, reservoir_size: int):

        """Generate external stimuli sampled from Gaussian distribution.
        Randomly neurons in the reservoir receives this input at each timestep

        Args:
            length (int): Number of input neurons

        Returns:
            out (array): Input vector of length equals the number of neurons in the reservoir
                  with randomly chosen neuron set active

            idx (int): List of chosen input neurons
        """

        out = [0] * reservoir_size
        x = [0] * length
        idx = np.random.choice(length, np.random.randint(reservoir_size))
        inp = np.random.normal(length)

        for i in idx:
            x[i] = inp[i]

        out[: len(x)] = x

        return out, idx

    @staticmethod
    def normalize_weight_matrix(weight_matrix: np.array):

        # Applied only while initializing the weight. During simulation, Synaptic scaling applied on weight matrices

        """Normalize the weights in the matrix such that incoming connections to a neuron sum up to 1

        Args:
            weight_matrix (array): Incoming Weights from W_ee or W_ei or W_ie

        Returns:
            weight_matrix (array): Normalized weight matrix"""

        normalized_weight_matrix = weight_matrix / np.sum(weight_matrix, axis=0)

        return normalized_weight_matrix

    @staticmethod
    def generate_lambd_connections(
        synaptic_connection: str, ne: int, ni: int, lambd_w: int, lambd_std: int
    ):

        """Generate lambda incoming connections for Excitatory neurons and outgoing connections per Inhibitory neuron

        Args:
            synaptic_connection (str):  Type of sysnpatic connection (EE,EI or IE)

            ne (int): Number of excitatory units

            ni (int): Number of inhibitory units

            lambd_w (int): Average number of incoming connections

            lambd_std (int): Standard deviation of average number of connections per neuron

        Returns:
            connection_weights (array) - Weight matrix

        """

        if synaptic_connection == "EE":

            """Choose random lamda connections per neuron"""

            # Draw normally distributed ne integers with mean lambd_w

            lambdas_incoming = norm.ppf(
                np.random.random(ne), loc=lambd_w, scale=lambd_std
            ).astype(int)

            # lambdas_outgoing = norm.ppf(np.random.random(ne), loc=lambd_w, scale=lambd_std).astype(int)

            # List of neurons

            list_neurons = list(range(ne))

            # Connection weights

            connection_weights = np.zeros((ne, ne))

            # For each lambd value in the above list,
            # generate weights for incoming and outgoing connections

            # -------------Gaussian Distribution of weights --------------

            # weight_matrix = np.random.randn(Sorn.ne, Sorn.ni) + 2 # Small random values from gaussian distribution
            # Centered around 2 to make all values positive

            # ------------Uniform Distribution --------------------------
            global_incoming_weights = np.random.uniform(0.0, 0.1, sum(lambdas_incoming))

            # Index Counter
            global_incoming_weights_idx = 0

            # Choose the neurons in order [0 to 199]

            for neuron in list_neurons:

                # Choose ramdom unique (lambdas[neuron]) neurons from  list_neurons
                possible_connections = list_neurons.copy()

                possible_connections.remove(
                    neuron
                )  # Remove the selected neuron from possible connections i!=j

                # Choose random presynaptic neurons
                possible_incoming_connections = random.sample(
                    possible_connections, lambdas_incoming[neuron]
                )

                incoming_weights_neuron = global_incoming_weights[
                    global_incoming_weights_idx : global_incoming_weights_idx
                    + lambdas_incoming[neuron]
                ]

                # ---------- Update the connection weight matrix ------------

                # Update incoming connection weights for selected 'neuron'

                for incoming_idx, incoming_weight in enumerate(incoming_weights_neuron):
                    connection_weights[possible_incoming_connections[incoming_idx]][
                        neuron
                    ] = incoming_weight

                global_incoming_weights_idx += lambdas_incoming[neuron]

            return connection_weights

        if synaptic_connection == "EI":

            """Choose random lamda connections per neuron"""

            # Draw normally distributed ni integers with mean lambd_w
            lambdas = norm.ppf(
                np.random.random(ni), loc=lambd_w, scale=lambd_std
            ).astype(int)

            # List of neurons

            list_neurons = list(range(ni))  # Each i can connect with random ne neurons

            # Initializing connection weights variable

            connection_weights = np.zeros((ni, ne))

            # ------------Uniform Distribution -----------------------------
            global_outgoing_weights = np.random.uniform(0.0, 0.1, sum(lambdas))

            # Index Counter
            global_outgoing_weights_idx = 0

            # Choose the neurons in order [0 to 40]

            for neuron in list_neurons:

                # Choose random unique (lambdas[neuron]) neurons from  list_neurons
                possible_connections = list(range(ne))

                possible_outgoing_connections = random.sample(
                    possible_connections, lambdas[neuron]
                )  # possible_outgoing connections to the neuron

                # Update weights
                outgoing_weights = global_outgoing_weights[
                    global_outgoing_weights_idx : global_outgoing_weights_idx
                    + lambdas[neuron]
                ]

                # ---------- Update the connection weight matrix ------------

                # Update outgoing connections for the neuron

                for outgoing_idx, outgoing_weight in enumerate(
                    outgoing_weights
                ):  # Update the columns in the connection matrix
                    connection_weights[neuron][
                        possible_outgoing_connections[outgoing_idx]
                    ] = outgoing_weight

                # Update the global weight values index
                global_outgoing_weights_idx += lambdas[neuron]

            return connection_weights

    @staticmethod
    def get_incoming_connection_dict(weights: np.array):
        """Get the non-zero entries in columns is the incoming connections for the neurons

        Args:
            weights (np.array): Connection/Synaptic weights

        Returns:
            dict : Dictionary of incoming connections to each neuron
        """

        # Indices of nonzero entries in the columns
        connection_dict = dict.fromkeys(range(1, len(weights) + 1), 0)

        for i in range(len(weights[0])):  # For each neuron
            connection_dict[i] = list(np.nonzero(weights[:, i])[0])

        return connection_dict

    @staticmethod
    def get_outgoing_connection_dict(weights: np.array):
        """Get the non-zero entries in rows is the outgoing connections for the neurons

        Args:
            weights (np.array): Connection/Synaptic weights

        Returns:
            dict : Dictionary of outgoing connections from each neuron
        """

        # Indices of nonzero entries in the rows
        connection_dict = dict.fromkeys(range(1, len(weights) + 1), 1)

        for i in range(len(weights[0])):  # For each neuron
            connection_dict[i] = list(np.nonzero(weights[i, :])[0])

        return connection_dict

    @staticmethod
    def reset_min(z: np.array, cutoff_val: float):
        """Prune the connections/thresholds with negative connection strength. The weights less than cutoff_weight set to 0

        Args:
            z (np.array): Synaptic strengths or neuron threshold values

            cutoff_val (float): Lower threshold

        Returns:
            array: Connections weights or unit thresholds with values less than cutoff_val set to 0
        """

        z[z <= cutoff_val] = cutoff_val

        return z

    @staticmethod
    def reset_max(z: np.array, cutoff_val: float):
        """Set cutoff limit for the values in given array

        Args:
            z (np.array): Synaptic strengths or neuron threshold values

            cutoff_val (float): Higher threshold

        Returns:
            array: Connections weights or unit thresolds with values greater than cutoff_val set to 1
        """

        z[z > cutoff_val] = cutoff_val

        return z

    @staticmethod
    def inactive_synapses(wee: np.array):
        """Helper function for Structural plasticity to randomly select the unconnected units

        Args:
            wee (array):  Weight matrix

        Returns:
            list (indices): (row_idx,col_idx)"""

        i, j = np.where(wee <= 0.0)
        indices = list(zip(i, j))

        self_conn_removed = []
        for i, idxs in enumerate(indices):

            if idxs[0] != idxs[1]:
                self_conn_removed.append(indices[i])

        return self_conn_removed

    @staticmethod
    def white_gaussian_noise(mu: float, sigma: float, t: int):

        """Generates white gaussian noise with mean mu, standard deviation sigma and
        the noise length equals t

        Args:
            mu (float): Mean value of Gaussian noise

            sigma (float): Standard deviation of Gaussian noise

            t (int): Length of noise vector

        Returns:
            array: White gaussian noise of length t
        """

        noise = np.random.normal(mu, sigma, t)

        return np.expand_dims(noise, 1)

    @staticmethod
    def zero_sum_incoming_check(weights: np.array):
        """Make sure, each neuron in the pool has atleast 1 incoming connection

        Args:
            weights (array): Synaptic strengths

        Returns:
            array: Synaptic weights of neurons with atleast one positive (non-zero) incoming connection strength
        """
        zero_sum_incomings = np.where(np.sum(weights, axis=0) == 0.0)
        if len(zero_sum_incomings[-1]) == 0:
            return weights
        else:
            for zero_sum_incoming in zero_sum_incomings[-1]:

                rand_indices = np.random.randint(int(weights.shape[0] * 0.2), size=2)
                rand_values = np.random.uniform(0.0, 0.1, 2)

                for i, idx in enumerate(rand_indices):
                    weights[:, zero_sum_incoming][idx] = rand_values[i]

        return weights


[docs]class Plotter(object): """Wrapper class to call plotting methods""" def __init__(self): pass
[docs] @staticmethod def hist_incoming_conn( weights: np.array, bin_size: int, histtype: str, savefig: bool ): """Plot the histogram of number of presynaptic connections per neuron Args: weights (array): Connection weights bin_size (int): Histogram bin size histtype (str): Same as histtype matplotlib savefig (bool): If True plot will be saved as png file in the cwd Returns: plot (matplotlib.pyplot): plot object """ num_incoming_weights = np.sum(np.array(weights) > 0, axis=0) plt.figure(figsize=(12, 5)) plt.xlabel("Number of connections") plt.ylabel("Probability") # Fit a normal distribution to the data mu, std = norm.fit(num_incoming_weights) plt.hist( num_incoming_weights, bins=bin_size, density=True, alpha=0.6, color="b" ) # PDF xmin, xmax = plt.xlim() x = np.linspace(xmin, xmax, max(num_incoming_weights)) p = norm.pdf(x, mu, std) plt.plot(x, p, "k", linewidth=2) title = "Distribution of presynaptic connections: mu = %.2f, std = %.2f" % ( mu, std, ) plt.title(title) if savefig: plt.savefig("hist_incoming_conn") return plt.show()
[docs] @staticmethod def hist_outgoing_conn( weights: np.array, bin_size: int, histtype: str, savefig: bool ): """Plot the histogram of number of incoming connections per neuron Args: weights (array): Connection weights bin_size (int): Histogram bin size histtype (str): Same as histtype matplotlib savefig (bool): If True plot will be saved as png file in the cwd Returns: plot object""" # Plot the histogram of distribution of number of incoming connections in the network num_outgoing_weights = np.sum(np.array(weights) > 0, axis=1) plt.figure(figsize=(12, 5)) plt.xlabel("Number of connections") plt.ylabel("Probability") # Fit a normal distribution to the data mu, std = norm.fit(num_outgoing_weights) plt.hist( num_outgoing_weights, bins=bin_size, density=True, alpha=0.6, color="b" ) # PDF xmin, xmax = plt.xlim() x = np.linspace(xmin, xmax, max(num_outgoing_weights)) p = norm.pdf(x, mu, std) plt.plot(x, p, "k", linewidth=2) title = "Distribution of post synaptic connections: mu = %.2f, std = %.2f" % ( mu, std, ) plt.title(title) if savefig: plt.savefig("hist_outgoing_conn") return plt.show()
[docs] @staticmethod def network_connection_dynamics(connection_counts: np.array, savefig: bool): """Plot number of positive connection in the excitatory pool Args: connection_counts (array) - 1D Array of number of connections in the network per time step savefig (bool) - If True plot will be saved as png file in the cwd Returns: plot object """ # Plot graph for entire simulation time period _, ax1 = plt.subplots(figsize=(12, 5)) ax1.plot(connection_counts, label="Connection dynamics") plt.margins(x=0) ax1.set_xticks(ax1.get_xticks()[::2]) ax1.set_title("Network connection dynamics") plt.ylabel("Number of active connections") plt.xlabel("Time step") plt.legend(loc="upper right") plt.tight_layout() if savefig: plt.savefig("connection_dynamics") return plt.show()
[docs] @staticmethod def hist_firing_rate_network(spike_train: np.array, bin_size: int, savefig: bool): """Plot the histogram of firing rate (total number of neurons spike at each time step) Args: spike_train (array): Array of spike trains bin_size (int): Histogram bin size savefig (bool): If True, plot will be saved in the cwd Returns: plot object""" fr = np.count_nonzero(spike_train.tolist(), 1) # Filter zero entries in firing rate list above fr = list(filter(lambda a: a != 0, fr)) plt.title("Distribution of population activity without inactive time steps") plt.xlabel("Spikes/time step") plt.ylabel("Count") plt.hist(fr, bin_size) if savefig: plt.savefig("hist_firing_rate_network.png") return plt.show()
[docs] @staticmethod def scatter_plot(spike_train: np.array, savefig: bool): """Scatter plot of spike trains Args: spike_train (list): Array of spike trains with_firing_rates (bool): If True, firing rate of the network will be plotted savefig (bool): If True, plot will be saved in the cwd Returns: plot object""" # Conver the list of spike train into array spike_train = np.asarray(spike_train) # Get the indices where spike_train is 1 x, y = np.argwhere(spike_train.T == 1).T plt.figure(figsize=(8, 5)) firing_rates = Statistics.firing_rate_network(spike_train).tolist() plt.plot(firing_rates, label="Firing rate") plt.legend(loc="upper left") plt.scatter(y, x, s=0.1, color="black") plt.title("Spike Trains") plt.xlabel("Time step") plt.ylabel("Neuron") plt.legend(loc="upper left") if savefig: plt.savefig("ScatterSpikeTrain.png") return plt.show()
[docs] @staticmethod def raster_plot(spike_train: np.array, savefig: bool): """Raster plot of spike trains Args: spike_train (array): Array of spike trains with_firing_rates (bool): If True, firing rate of the network will be plotted savefig (bool): If True, plot will be saved in the cwd Returns: plot object""" # Conver the list of spike train into array spike_train = np.asarray(spike_train) plt.figure(figsize=(11, 6)) firing_rates = Statistics.firing_rate_network(spike_train).tolist() plt.plot(firing_rates, label="Firing rate") plt.legend(loc="upper left") plt.title("Spike Trains") # Get the indices where spike_train is 1 x, y = np.argwhere(spike_train.T == 1).T plt.plot(y, x, "|r") plt.xlabel("Time step") plt.ylabel("Neuron") if savefig: plt.savefig("RasterSpikeTrain.png") return plt.show()
[docs] @staticmethod def correlation(corr: np.array, savefig: bool): """Plot correlation between neurons Args: corr (array): Correlation matrix savefig (bool): If true will save the plot at the current working directory Returns: matplotlib.pyplot: Neuron Correlation plot """ # Generate a mask for the upper triangle mask = np.zeros_like(corr, dtype=np.bool) mask[np.triu_indices_from(mask)] = True f, ax = plt.subplots(figsize=(11, 9)) # Custom diverging colormap cmap = sns.diverging_palette(220, 10, as_cmap=True) sns.heatmap( corr, mask=mask, cmap=cmap, xticklabels=5, yticklabels=5, vmax=0.1, center=0, square=False, linewidths=0.0, cbar_kws={"shrink": 0.9}, ) if savefig: plt.savefig("Correlation between neurons") return None
[docs] @staticmethod def isi_exponential_fit( spike_train: np.array, neuron: int, bin_size: int, savefig: bool ): """Plot Exponential fit on the inter-spike intervals during training or simulation phase Args: spike_train (array): Array of spike trains neuron (int): Target neuron bin_size (int): Spike train will be splitted into bins of size bin_size savefig (bool): If True, plot will be saved in the cwd Returns: plot object""" isi = Statistics.spike_time_intervals(spike_train[:, neuron]) y, x = np.histogram(sorted(isi), bins=bin_size) x = [int(i) for i in x] y = [float(i) for i in y] def exponential_func(y, a, b, c): return a * np.exp(-b * np.array(y)) - c # Curve fit popt, _ = curve_fit(exponential_func, x[1:bin_size], y[1:bin_size]) plt.plot( x[1:bin_size], exponential_func(x[1:bin_size], *popt), label="Exponential fit", ) plt.title("Distribution of Inter Spike Intervals and Exponential Curve Fit") plt.scatter(x[1:bin_size], y[1:bin_size], s=2.0, color="black", label="ISI") plt.xlabel("ISI") plt.ylabel("Frequency") plt.legend() if savefig: plt.savefig("isi_exponential_fit") return plt.show()
[docs] @staticmethod def weight_distribution(weights: np.array, bin_size: int, savefig: bool): """Plot the distribution of synaptic weights Args: weights (array): Connection weights bin_size (int): Spike train will be splited into bins of size bin_size savefig (bool): If True, plot will be saved in the cwd Returns: plot object""" weights = weights[ weights >= 0.01 ] # Remove the weight values less than 0.01 # As reported in article SORN 2013 y, x = np.histogram(weights, bins=bin_size) # Create histogram with bin_size plt.title("Synaptic weight distribution") plt.scatter(x[:-1], y, s=2.0, c="black") plt.xlabel("Connection strength") plt.ylabel("Frequency") if savefig: plt.savefig("weight_distribution") return plt.show()
[docs] @staticmethod def linear_lognormal_fit(weights: np.array, num_points: int, savefig: bool): """Lognormal curve fit on connection weight distribution Args: weights (array): Connection weights num_points(int): Number of points to be plotted in the x axis savefig(bool): If True, plot will be saved in the cwd Returns: plot object""" weights = np.array(weights.tolist()) weights = weights[weights >= 0.01] M = float(np.mean(weights)) # Geometric mean s = float(np.std(weights)) # Geometric standard deviation # Lognormal distribution parameters mu = float(np.mean(np.log(weights))) # Mean of log(X) sigma = float(np.std(np.log(weights))) # Standard deviation of log(X) shape = sigma # Scipy's shape parameter scale = np.exp(mu) # Scipy's scale parameter median = np.exp(mu) mode = np.exp(mu - sigma ** 2) # Note that mode depends on both M and s mean = np.exp(mu + (sigma ** 2 / 2)) # Note that mean depends on both M and s x = np.linspace(np.min(weights), np.max(weights), num=num_points) pdf = stats.lognorm.pdf(x, shape, loc=0, scale=scale) plt.figure(figsize=(12, 4.5)) plt.title("Curve fit on connection weight distribution") # Figure on linear scale plt.subplot(121) plt.plot(x, pdf) plt.vlines(mode, 0, pdf.max(), linestyle=":", label="Mode") plt.vlines( mean, 0, stats.lognorm.pdf(mean, shape, loc=0, scale=scale), linestyle="--", color="green", label="Mean", ) plt.vlines( median, 0, stats.lognorm.pdf(median, shape, loc=0, scale=scale), color="blue", label="Median", ) plt.ylim(ymin=0) plt.xlabel("Weight") plt.title("Linear scale") plt.legend() # Figure on logarithmic scale plt.subplot(122) plt.semilogx(x, pdf) plt.vlines(mode, 0, pdf.max(), linestyle=":", label="Mode") plt.vlines( mean, 0, stats.lognorm.pdf(mean, shape, loc=0, scale=scale), linestyle="--", color="green", label="Mean", ) plt.vlines( median, 0, stats.lognorm.pdf(median, shape, loc=0, scale=scale), color="blue", label="Median", ) plt.ylim(ymin=0) plt.xlabel("Weight") plt.title("Logarithmic scale") plt.legend() if savefig: plt.savefig("LinearLognormalFit") return plt.show()
[docs] @staticmethod def plot_network(corr: np.array, corr_thres: float, fig_name: str = None): """Network x graphical visualization of the network using the correlation matrix Args: corr (array): Correlation between neurons corr_thres (array): Threshold to prune the connection. Smaller the threshold, higher the density of connections fig_name (array, optional): Name of the figure. Defaults to None. Returns: matplotlib.pyplot: Plot instance """ df = pd.DataFrame(corr) links = df.stack().reset_index() links.columns = ["var1", "var2", "value"] links_filtered = links.loc[ (links["value"] > corr_thres) & (links["var1"] != links["var2"]) ] G = nx.from_pandas_edgelist(links_filtered, "var1", "var2") plt.figure(figsize=(50, 50)) nx.draw( G, with_labels=True, node_color="orange", node_size=50, linewidths=5, font_size=10, ) plt.text(0.1, 0.9, "%s" % corr_thres) plt.savefig("%s" % fig_name) plt.show()
[docs] @staticmethod def hamming_distance(hamming_dist: list, savefig: bool): """Hamming distance between true netorks states and perturbed network states Args: hamming_dist (list): Hamming distance values savefig (bool): If True, save the fig at current working directory Returns: matplotlib.pyplot: Hamming distance between true and perturbed network states """ plt.figure(figsize=(15, 6)) plt.title("Hamming distance between actual and perturbed states") plt.xlabel("Time steps") plt.ylabel("Hamming distance") plt.plot(hamming_dist) if savefig: plt.savefig("HammingDistance") return plt.show()
[docs]class Statistics(object): """Wrapper class for statistical analysis methods""" def __init__(self): pass
[docs] @staticmethod def firing_rate_neuron(spike_train: np.array, neuron: int, bin_size: int): """Measure spike rate of given neuron during given time window Args: spike_train (array): Array of spike trains neuron (int): Target neuron in the reservoir bin_size (int): Divide the spike trains into bins of size bin_size Returns: int: firing_rate""" time_period = len(spike_train[:, 0]) neuron_spike_train = spike_train[:, neuron] # Split the list(neuron_spike_train) into sub lists of length time_step samples_spike_train = [ neuron_spike_train[i : i + bin_size] for i in range(0, len(neuron_spike_train), bin_size) ] spike_rate = 0.0 for _, spike_train in enumerate(samples_spike_train): spike_rate += list(spike_train).count(1.0) spike_rate = spike_rate * bin_size / time_period return time_period, bin_size, spike_rate
[docs] @staticmethod def firing_rate_network(spike_train: np.array): """Calculate number of neurons spikes at each time step.Firing rate of the network Args: spike_train (array): Array of spike trains Returns: int: firing_rate""" firing_rate = np.count_nonzero(spike_train.tolist(), 1) return firing_rate
[docs] @staticmethod def scale_dependent_smoothness_measure(firing_rates: list): """Smoothem the firing rate depend on its scale. Smaller values corresponds to smoother series Args: firing_rates (list): List of number of active neurons per time step Returns: sd_diff (list): Float value signifies the smoothness of the semantic changes in firing rates """ diff = np.diff(firing_rates) sd_diff = np.std(diff) return sd_diff
[docs] @staticmethod def scale_independent_smoothness_measure(firing_rates: list): """Smoothem the firing rate independent of its scale. Smaller values corresponds to smoother series Args: firing_rates (list): List of number of active neurons per time step Returns: coeff_var (list):Float value signifies the smoothness of the semantic changes in firing rates""" diff = np.diff(firing_rates) mean_diff = np.mean(diff) sd_diff = np.std(diff) coeff_var = sd_diff / abs(mean_diff) return coeff_var
[docs] @staticmethod def autocorr(firing_rates: list, t: int = 2): """ Score interpretation - scores near 1 imply a smoothly varying series - scores near 0 imply that there's no overall linear relationship between a data point and the following one (that is, plot(x[-length(x)],x[-1]) won't give a scatter plot with any apparent linearity) - scores near -1 suggest that the series is jagged in a particular way: if one point is above the mean, the next is likely to be below the mean by about the same amount, and vice versa. Args: firing_rates (list): Firing rates of the network t (int, optional): Window size. Defaults to 2. Returns: array: Autocorrelation between neurons given their firing rates """ return np.corrcoef( np.array( [ firing_rates[0 : len(firing_rates) - t], firing_rates[t : len(firing_rates)], ] ) )
[docs] @staticmethod def avg_corr_coeff(spike_train: np.array): """Measure Average Pearson correlation coeffecient between neurons Args: spike_train (array): Neural activity Returns: array: Average correlation coeffecient""" corr_mat = np.corrcoef(np.asarray(spike_train).T) avg_corr = np.sum(corr_mat, axis=1) / 200 corr_coeff = ( avg_corr.sum() / 200 ) # 2D to 1D and either upper or lower half of correlation matrix. return corr_mat, corr_coeff
[docs] @staticmethod def spike_times(spike_train: np.array): """Get the time instants at which neuron spikes Args: spike_train (array): Spike trains of neurons Returns: (array): Spike time of each neurons in the pool""" times = np.where(spike_train == 1.0) return times
[docs] @staticmethod def spike_time_intervals(spike_train): """Generate spike time intervals spike_trains Args: spike_train (array): Network activity Returns: list: Inter spike intervals for each neuron in the reservoir """ spike_times = Statistics.spike_times(spike_train) isi = np.diff(spike_times[-1]) return isi
[docs] @staticmethod def hamming_distance(actual_spike_train: np.array, perturbed_spike_train: np.array): """Hamming distance between true netorks states and perturbed network states Args: actual_spike_train (np.array): True network's states perturbed_spike_train (np.array): Perturbated network's states Returns: float: Hamming distance between true and perturbed network states """ hd = [ np.count_nonzero(actual_spike_train[i] != perturbed_spike_train[i]) for i in range(len(actual_spike_train)) ] return hd
[docs] @staticmethod def fanofactor(spike_train: np.array, neuron: int, window_size: int): """Investigate whether neuronal spike generation is a poisson process Args: spike_train (np.array): Spike train of neurons in the reservoir neuron (int): Target neuron in the pool window_size (int): Sliding window size for time step ranges to be considered for measuring the fanofactor Returns: float : Fano factor of the neuron spike train """ # Choose activity of random neuron neuron_act = spike_train[:, neuron] # Divide total observations into 'tws' time windows of size 'ws' for a neuron 60 tws = np.split(neuron_act, window_size) fr = [] for i in range(len(tws)): fr.append(np.count_nonzero(tws[i])) # print('Firing rate of the neuron during each time window of size %s is %s' %(ws,fr)) mean_firing_rate = np.mean(fr) variance_firing_rate = np.var(fr) fano_factor = variance_firing_rate / mean_firing_rate return mean_firing_rate, variance_firing_rate, fano_factor
[docs] @staticmethod def spike_source_entropy(spike_train: np.array, num_neurons: int): """Measure the uncertainty about the origin of spike from the network using entropy Args: spike_train (np.array): Spike train of neurons num_neurons (int): Number of neurons in the reservoir Returns: int : Spike source entropy of the network """ # Number of spikes from each neuron during the interval n_spikes = np.count_nonzero(spike_train, axis=0) p = n_spikes / np.count_nonzero( spike_train ) # Probability of each neuron that can generate spike in next step # print(p) # Note: pi shouldn't be zero sse = np.sum([pi * np.log(pi) for pi in p]) / np.log( 1 / num_neurons ) # Spike source entropy return sse