Using Akka Streaming “saving alerts” – part 1

Full disclosure: this post was initially published at Bonial tech blog here, one my favorite companies at the heart of Berlin, and where I have been fortunate enough to be working for 2+ years as a freelance Data Engineer. If you are looking for positions in tech, I can’t help to recommend checking their career page.

Overview

Some months ago I was working on an internal project at Bonial using Akka Streaming (in scala) to provide additional features to our current push notification system. The main goal of the project was to enhance the speed to which the client is able to notify its end users of available discount coupons. In this case, we wanted to notify the users in a real time fashion of available coupons on store, so that they could use them more effectively on the spot and save money. Hence our project code name: “saving alerts”!

After some architectural discussions where we compared several technical options, we decided to give akka streaming a go. It has been a fun ride, so we thought we might as well share some of the lessons learned.

This post has been divided into two parts:

  1. Part 1 – we provide an overview about the main tech building blocks being used in this project (mainly focusing on Akka)
  2. Part 2 – details how the system was designed and implemented

 

Without further ado, let us start with an overview of our “saving-alerts” application:

app-overview

Logically speaking one can group the tasks executed by our application into three (3) main stages:

  1. Read an event(s) from a given Kafka topic and perform basic validation (which are collected from our own tracking API); each event belongs to a unique user, and is triggered by our mobile App;
  2. Querying an internal service – the so-called “Coupon API” – to check if there are any available coupons for that given user;
  3. Apply of set of business logic rules – which at the moment are determined by our Product Managers – which determine whether, in the end, to send or not to send a push notification to that user mobile app.

Besides these main logical tasks, we still do some other “offline” housekeeping scheduled processes, namely: loading from an S3 bucket into memory updated versions of meta information about our retailers and the business logic rules, renew an Auth token to talk internally within the client’s API, and obviously logging of app statistics for monitoring purposes.

In terms of tech stack, relevant for this project is simply the Akka actor system, a dedicated single node Redis instance, and some dedicated S3 buckets. All the rest – such as the tracking API, Kafka queue, Authentication API, Coupon API, Monitoring Service and Push Notification API, etc. – are all viewed as external services from the app point of view, even though most of them belong inside the company.

Though not particularly relevant for this project, the whole Akka application was deployed on AWS on ec2 instances. As we state in our final conclusion notes, a good fit for this application would also be to use some Docker container orchestration service such as Kubernetes.

Before we dive deep into how we implemented this system, let us first review the main technical building block concepts used in the project.

System Architecture

The main building block of the current application is the Akka framework. Hopefully this section will guide you through some of the rational that we used to guide our decisions, and ideally why we choose to use Akka for this project.

About Akka

Why akka

Let’s start from the very basics: building concurrent and distributed applications is far from being a trivial task. In short, Akka comes to the rescue for this exact problem: it is an open source project that provides a simple and high level abstraction layer in the form of Actor model to greatly simplify dealing concurrent, distributed and fault tolerant applications on the JVM. Here is a summary of Akka’s purpose:

  • provide concurrency (scale up) and remoting (scale out)
  • easily get rid of race conditions and multi-threading locking problems, such as deadlocks (“[…] when several participants are waiting on each other to reach a specific state to be able to progress”), starvation (“[…] when there are participants that can make progress, but there might be one or more that cannot”), and livelocks (when several participants are granting each other a given resource and none ends up using it) – please see akka documentation, which does a fantastic job explaining it;
  • provide easy programming model to code a multitude of complex tasks
  • allow you to scale horizontally in your Application

 

There are three main Building blocks we are using from Akka framework in this project:

  • Akka actors
  • Akka remote & clustering
  • Akka streaming

Akka Actors

The actor system provides asynchronous non-blocking highly performant message-driven programming model distributed environment. This is achieved via the Actor concept.

An Actor is sort of a safety container, a sort of light weight isolated computation units which encapsulate state and behaviour.

In fact, actor methods are private by default –  one cannot call methods on actors from the outside. The only way to interact with actors is by message sending – this holds also true for inter-actor communication. Moreover, and as stated in Akka documentation:  “This method (the “receive” method, which has to be overriden by every actor implementation) is responsible for receiving and handling one message. Let’s reiterate what we already said: each and every actor can handle at most one message at a time, thus receive method is never called concurrently. If the actor is already handling some message, the remaining messages are kept in a queue dedicated to each actor. Thanks to this rigorous rule, we can avoid any synchronization inside actor, which is always thread-safe.

Here is an example of a basic Actor class (written in scala), retrieved from Akka’s documentation and changed on minor details:

 

The receive method

In order to create an actor, one needs to extend the Actor Trait (sort of Java Interface), which mandates the implementation of the “receive” method – basically where all the action happens. In our example, in case the “MyActor” receives the message “test”, the actor will log “received test”, and if it receives the message “mutate”, it will mutate its local variable by incrementing one (1). As each message is handled sequentially, and there is no other way to interact with an actor, it follows that you do not have to synchronize access to internal Actor state variables, as they are protected from state corruption via isolation – this is what is meant when one says that actors encapsulate state and behaviour.

As mentioned before, the receive method needs to be implemented by every actor. The receive Method is a PartialFunction, which accepts “Any” type and with void return. You can confirm this in Akka’s source code, namely the Actor object implementation:

By the way, as a side note, the receive method being a PartialFunction is also one of Akka Streaming main proponents criticism, due to the lack of type safety.

In the provided example we are using simple strings as messages (“test”, and “mutate”). However usually one uses scala case classes to send messages, since, as a best practice, messages should be immutable objects, which do not hold any object that is mutable. Finally, Akka will take care of serialization in the background. However, you can also implement your custom serializers, as is recommended speacially in the cases of remoting, in order to optimize performance or for complex cases. Here is an example how two actors can communicate with each other:

If one wants to reply to a message sent, one can use the exclamation mark “!” notation to send a message. This is a “fire and forget” way of sending a message (which means there is no acknowledgement from the receiver that the message was indeed received). In order to have an acknowledgement one could use the ask pattern instead with the interrogation mark “?”.

Also note that to retrieve the underlying message sender we call the “sender” method, which returns a so-called “ActorRef” – a reference of the underlying address of the sender actor.  Thus, if actor DudeB would receive message “hallo” from actor DudeA, it would be able to reply to it just by calling sender() method, which is provided in the Actor trait:

Finally, messages are stored in the recipients Mailbox. Though there are exceptions (such a routers, which we will see later), every actor can have a dedicated Mailbox. A Mailbox is a queue to store messages.

Message ordering

Important to note is that message order is not guaranteed. That is, if say Actor B has sent a message to Actor A at a given point in time, and then later Actor C sends a message to same Actor A, Akka does not provide any guarantee that the Actor’s B message will be delivered before Actor’s C message (event though Actor B sent it a message before Actor C). This would be fairly difficult for Akka to control especially in the case where actors are not co-located on the same server (as we will discuss later) – if Actor B is having high gitter on his network for example, it might happen that Actor C gets his message passed through first, for example.

Though order between different actors is not guaranteed, Akka does guarantee that messages from the same actor to another actor will be ordered. So, if Actor B sends one message to Actor A, and then later sends a second message again to Actor A, one has the guarantee that, assuming both messages are successfully delivered, the first message will be processed before the second.

 

Besides being processed sequentially, it is also relevant to note that messages are also processed asynchronously to avoid blocking the current thread where the actor is residing. Each actor gets assigned a light weight thread – you can have several millions of actors per GB of heap memory – which are completely shielded from other actors. This is the first basic fundamental advantage of Akka – providing a lighting fast asynchronous paradigm/API for building applications.

As we will see next, akka provides many more building blocks which enhance its capabilities. We will focus on how akka benefits this application specifically, namely how it provides an optimized multi-threading scale-up (inside the same server) and scale-out (accross several remote servers) environment for free.

Akka Routers (scale-up)

Router actors are a special kind of actors, that make it easy to scale out. That is, with exactly the same code, one can simply launch an actor of a type Router, and it starts automatically child actors – so-called “routees” in akka terminology.

The child actors will have their own Mailboxes; however the Router itself will not. A router will serve as a fast proxy, which just forwards messages to it’s own routees according to a given algorithm. For example, in our application, we are simply using round-robin policy. However, other (some more complex) algorithms could be used, such as load balancing by routee CPU and memory statistics, or scatter-gun-approach (for low latency requirements for example), or even simply to broadcast to all routees.

akka_router_architecture

The main advantage of Routers is that they provide a very easy way to scale-up the multi-threading environment. With the same Class code and simply changing the way we instantiate the Actor we can transform an actor to a Router.

Akka Remote & clustering modules (scale-out)

To distribute actors accross different servers one has two modules available: Akka remoting, and, dependent on the first, Akka Clustering. Akka remote provides location transparency to actors, so that the application code does not have to change (well, neglectable) if an actor is communicating with another actor on the same server or on a different one.

Akka Clustering on the other hand, goes on step further and builds on top of Akka Remoting, providing failure detection and potentially failover mechanisms. The way clustering works is by having a decentralized peer-to-peer membership service with no single-point-of-failure, nor single point of bottleneck. The membership is done via gossip protocol based on Amazon DynamoDB.

As we will see later, the way we scale in this our application, the clustering advantadges are not currently being used. That is, we are not extending specific actor system accross more than one node. However, the application is written in a way that it is completly prepared for it.

Akka Streaming (backpressure for  streaming consumption)

Akka streaming is yet another module from Akka, relatively recently released. The main point of it is to hide away the complexities of creating a streaming consumption environment,  and providing back-pressure for free. Akka has a really good documentation explaining back-pressure in more detail, but in short back-pressure ensures that producers halt down production speed in case consumers are lagging behind (for example, for some performance issue not being able to consume fast enough).

Last but not least, it is important to highlight that Akka Streaming works kind of a blackbox (in a good way), doing all the heavy lifting in the background reliefing you to focus on other business critial logic. The API is also intuitive to use, with a following nice functional programming paradigm style. However, we should warn that as your operations graph complexity grows, you will be forced to dive deep into more advanced topics.

About Kafka

Kafka is a complex system – more specifically in this case a distributed publish-subscribe messaging system – where one of the many uses-cases include messaging. This is provided to the Akka application as a service, thus from the application stand point, one does not need to care much about it. However, it is beneficial to understand how the application scales and how it ingests data. The following summary attempts to highlight some of the core differences that make Kafka different from other messaging systems, such as RabbitMQ:
  • Kafka implements philosophy dumb broker, smart consumer; consumers are responsible for knowing from when they should consume data – kafka does not keep track; this is a major destinction compared to, for example, RabbitMQ, where many sophisticated delivery checks are available to deal with dead letter messages; in regards to our application, given Akka’ Streaming back-pressure mechanism, our application will halt consumption, in case consumers are not able to keep up with producers;
  • Persistent storage during X amount of time; many clients can read from same topic, for as long as Kafka is configured to persist data;
  • Topics can be partitioned for scalability – in practice this means that we can distribute and store for the same topic among several severs;
  • Data in each partition added in append-only mode, creating an immutable sequence of records – a structured commit log; records are stored in key value structure, and in any format, such as: String, JSON, Avro, etc.
  • It follows that order is only guaranteed on a partition basis; that is, inside the same partition if event A was appended before event B, it will be consumed before as well by the Consumer assigned to that partition. However, among partitions order is not guaranteed; the following illustration taken from kafka’s own project page illustrates this concept better:

 

log_anatomy

  • Besides possibly being partitioned, topics can also be replicated among several nodes, thus guaranteeing HA;
  • Consumers can be assigned to groups, thus scaling the amount of times topic pool can be consumed;

For more detail about Kafka, we recommend Kafka’s own page, which has really good intro. Finally, if you are indeed familiarized with RabbitMQ, we would recommend reading the following article, comparing Kafka with RabbitMQ, especially to which use cases each fits best.

 

Advertisements

Getting through Deep Learning – Tensorflow intro (part 3)

This post is part of a tutorial series:

  1. Getting through Deep Learning – CNNs (part 1)
  2. Getting through Deep Learning – TensorFlow intro (part 2)
  3. Getting through Deep Learning – TensorFlow intro (part 3)

Alright, lets move on to more interesting stuff: linear regression. Since the main focus in TensorFlow, and given the abundancy of online resources on the subject, I’ll just assume you are familiar with Linear Regressions.

As previously mentioned, a linear regression has the following formula:

linear_regression

Where Y is the dependent variable, X is the independent variable, and b0 and b1 being the parameters we want to adjust.

Let us generate random data, and feed that random data into a linear function. Then, as opposed to using the closed-form solution, we use an iterative algorithm to progressively become closer to a minimal cost, in this case using gradient descent to fit a linear regression.

We start by initializing the weights – b0 and b1 – with random variables, which naturally results in a poor fit. However, as one can see through the print statements as the model trains, b0 approaches the target value of 3, and b1 of 5, with the last printed step: [3.0229037, 4.9730182]

The next figure illustrates the progressive fitting of lines of the model, as weights change:

lr_fitting

 

Sources:

 

Getting through Deep Learning – Tensorflow intro (part 2)

Yes, I kind of jumped the guns on my initial post on Deep Learning straight into CNNs. For me this learning path works the best, as I dive straight into the fun part, and eventually stumble upon the fact that maybe I’m not that good of a swimmer, and it might be good to practice a bit before going out in deep waters. This post attempts to be exactly that: going back to the basics.

This post is part of a tutorial series:

  1. Getting through Deep Learning – CNNs (part 1)
  2. Getting through Deep Learning – TensorFlow intro (part 2)

TensorFlow is a great starting point for Deep Learning/ Machine Learning, as it provides a very concise yet extremely powerful API. It is an open-source project created by Google initially with numerical computation tasks in mind, and used for Machine Learning/Deep Learning.

TensorFlow provides APIs for both Python and C++, but it’s backend is written in C/C++, allowing it to achieve much greater performance milestones. Moreover, it supports CPU, GPU, as well as distributed computing in a cluster.

The first thing to realize is that TensorFlow uses the concept of a session. A session is nothing more than a series of operations to manipulate tensors, organized in a structure of a data flow graph. This graph building activity pretty much works like Lego building, by matching nodes and edges. Nodes represent mathematical operations, and edges multi-dimensional arrays – aka: Tensors. As the name hints, a tensor is the central data structure in TensorFlow, and is described by its shape. For example, one would characterize a 2 row by 3 columns matrix as a tensor with shape of [2,3].

Important also to note is that the graph is lazy loaded, meaning that computation will only be triggered by an explicit run order for that session graph. OK, enough talking, let us get into coding, by exemplifying how a basic graph Session is built:

In the previous example, variables “a” and “b” are the nodes in the graph, and the summation is the operation connecting both of them.

A TensorFlow program is typically split into two parts: construction phase – where the graph is built – and a second one called execution phase, when actually resources (CPU and/or GPU, RAM and disk) are allocated until the session is closed.

Typically machine learning applications strive to iteratively update model weights. So, of course, one can also specify tensors of variable type, and even combine those constants.

By defining the dtype of a node, one can gain/loose precision, and at the same time impact on memory utilization and computation times.

Note lines 35 to 37 now:

r1 = op1.eval()
r2 = op2.eval()
result = f.eval()

TensorFlow automatically detects which operations depend on each other. In this case, TensorFlow will know that op1 depends on x and y evaluation, op2 on a, b and c evaluation,  and finally that f depends on both op1 and op2. Thus internally the lazy evaluation is also aligned with the computation graph. So far so good.

However, all nodes values are dropped between graph runs, except for variable values, which are maintained by the session across graph runs. This has the import implication that op1 and op2 evaluation will not be reused upon f graph run – meaning the code will eveluate op1 and op2 twice.

To overcome this limitation, one needs to instruct TensorFlow to run those operations in a single graph:

And yes, that is all for today. I want to blog more frequently, and instead of writing just once every couple of months (and in the meanwhile pilling up a lot of draft posts that never see the light of day), I decided to keep it simple. See you soon 🙂

Sources:

 

 

Summary Terraform vs Spinakker

I recently needed to build this summary, so thought I’d rather share with more people as well. Please feel free to add any points you see fitting.

TL;DR

Rather then putting on versus another assuming mutual exclusivity, many companies are adopting both tools simultaneously.Terraform is usually used for static cloud Infrastructure setup and updates, such as networks/VLANs, Firewalls, Load Balancers, storage buckets, etc. Spinakker is used for setting up more complex deployment pipelines, mainly orchestration of software packages and application code to setup on servers.Though there is intersection (Spinakker can also deploy App environment), Terraform provides an easy and clean way to setup Infrastructure-as-Code.

Terraform

  • Hashicorp product focused on allowing you to cleanly describe and deploy infrastructure in Cloud and on premise environments;
  • Allows to describe your infrastructure as code, as well as to deploy it. Although complexer deployments replying on server images is possible, man-power effort starts growing exponentially
  • The sintax used to describe resources is own Hashicorp Configuration Language (HCL), which may be a turn off at first sight; however, after seeing how clear human readable it is, you won’t regret it;
  • Deployment form varies; from immutable infrastructure style deployments (e.g. new update = complete new resource) to updates, depending on the nature of resources. For example, server instances require immutable approach, where firewall rules do not;
  • Failure to deploy resources does not roll back and destroy “zombie” resources; instead they are marked as tainted, and left as it is. On a next execution plan run, tainted resources will be removed, to give place to new intended resources. More detail here;
  • Multi-cloud deployments, currently supporting AWS, GCP, Azure, OpenStack, Heroku, Atlas, DNSimple, CloudFlare.
  • Doesn’t require server deployment, e.g. can be launched from your own local machine. However, Hashicorp provides Atlas, to provide remote central deployments.

Spinakker

  • Is a Netflix open source tool for managing deployment of complex workflows (Continuous Delivery). It is kind of a second generation/evolution of Asgard.
  • Orchestrator for deploying full Application environments, from sourrounding infrastructure environment (networks, firewalls and load balancers), along with server images;
  • Works hand-in-hand with tools such as Jenkins and packer.
  • As an orchestrator, it focuses on Pipelines, a sequence of stages. A stage is an atomic building block; examples of stages:
    • Build an image (example AMI in case of AWS) in a specific Region
    • Deploy the image
    • Run a bash script
    • Resize a server group
  • Multi-cloud deployments, currently supporting AWS, GCP, Azure, OpenStack, Kubernetes, and CloudFoundry.
  • It is by itself a product with several functionality, requiring itself server deployemnt. Images for easy deployment are available both in AWS, Azure, GCP, and Kubernetes;
  • Provides Web GUI for full pipeline configuration, as well as an API
  • Allows to setup webhooks for notifications via email, SMS and chat clients (HipChat/Slack)

 

More Resources

Getting Started with Spark (part 3) – UDFs & Window functions

This post attempts to continue the previous introductory series “Getting started with Spark in Python” with the topics UDFs and Window Functions. Unfortunately it stayed marinating in my Word press for quite a while (was already more less 70% complete) and only now had the opportunity to complete it.

Note: For this post I’m using Spark 1.6.1. There are some minor differences in comparison to the new coming Spark 2.0, such as using a SparkSession object to initialize the Spark Context, instead of HiveContext as I do here. Nonetheless, the important parts are common in both.

HiveContext

Now up until Spark 1.6.2, the only way for you to enrich your SQL queries would be to use a HiveContext instead of Spark SQLContext.

With a HiveContext you got the same features of a SparkContext, but with some of additional advantageous, such as ability to use window functions, access to Hive UDFs, besides the ability to read data from Hive tables. For more detail, please refer here for a concise well explained answer to the differences between SQLContext and HiveContext.

OK, let us start by importing all required dependencies for this tutorial:

# python dependencies
import sys
from datetime import datetime as dt
# pyspark dependencies
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, HiveContext
from pyspark.sql.dataframe import DataFrame

from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf

… and initialize the HiveContext:

conf = SparkConf().setMaster("local[4]").setAppName("window-demo") \
      .set("spark.driver.memory", "4g")
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc)

As a reminder, our (extremely)  dummy dataset is comprised of the following (non-sense) data:

# note: to simplify, not providing the schema, Spark Df api will infer it
customers = sc.parallelize([("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Geoffrey", "2016-05-03", "B", "Lamp", 2, 38.00),
("Geoffrey", "2016-05-03", "D", "Solar Pannel", 1, 29.00),
("Geoffrey", "2016-05-03", "A", "apples", 3, 50.00),
("Geoffrey", "2016-05-03", "C", "Rice", 5, 15.00),
("Geoffrey", "2016-06-05", "A", "apples", 5, 50.00),
("Geoffrey", "2016-06-05", "A", "bananas", 5, 55.00),
("Geoffrey", "2016-06-15", "Y", "Motor skate", 7, 68.00),
("Geoffrey", "2016-06-15", "E", "Book: The noose", 1, 125.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00),
("Yann", "2016-05-03", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-05-03", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-03", "C", "Rice", 15, 15.00),
("Yann", "2016-04-02", "A", "bananas", 3, 55.00),
("Yann", "2016-04-02", "B", "Lamp", 2, 38.00),
("Yann", "2016-04-03", "E", "Book: Crime and Punishment", 5, 100.00),
("Yann", "2016-04-13", "E", "Book: The noose", 5, 125.00),
("Yann", "2016-04-27", "D", "Solar Pannel", 5, 29.00),
("Yann", "2016-05-27", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-27",  "A", "bananas", 3, 55.00),
("Yann", "2016-05-01", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-06-07", "Z", "space ship", 1, 227.00),
("Yoshua", "2016-02-07", "Z", "space ship", 2, 227.00),
("Yoshua", "2016-02-14", "A", "bananas", 9, 55.00),
("Yoshua", "2016-02-14", "B", "Lamp", 2, 38.00),
("Yoshua", "2016-02-14", "A", "apples", 10, 55.00),
("Yoshua", "2016-03-07", "Z", "space ship", 5, 227.00),
("Yoshua", "2016-04-07", "Y", "Motor skate", 4, 68.00),
("Yoshua", "2016-04-07", "D", "Recycle bin", 5, 27.00),
("Yoshua", "2016-04-07", "C", "Rice", 5, 15.00),
("Yoshua", "2016-04-07", "A", "bananas", 9, 55.00),
("Jurgen", "2016-05-01", "Z", "space ship", 1, 227.00),
("Jurgen", "2016-05-01", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "Y", "Motor skate", 1, 68.00),
("Jurgen", "2016-06-05", "A", "bananas", 5, 55.00),
("Jurgen", "2016-06-05", "C", "Rice", 5, 15.00),
("Jurgen", "2016-06-05", "Y", "Motor skate", 2, 68.00),
("Jurgen", "2016-06-05", "D", "Recycle bin", 5, 27.00),
]).toDF(["customer_name", "date", "category", "product_name", "quantity", "price"])

 

What if we wanted to answer a question such as: What is the cumulative sum of spending of each customer throughout time?
A way of thinking of a cumulative sum is as a recursive call where for every new period you sum the current value plus the all the previous accumulated. So one way to solve this is by using Window Functions, a functionality added back in Spark 1.4. However, let us start by adding a column with amount spent, using Spark User Defined Functions (UDFs) for that. These functions basically apply a given function to every row on one or more columns.


# create the general function
def amount_spent(quantity, price):
   """
   Calculates the product between two variables
   :param quantity: (float/int)
   :param price: (float/int)
   :return:
           (float/int)
   """
   return quantity * price

# create the general UDF
amount_spent_udf = udf(amount_spent, DoubleType())
# Note: DoubleType in Java/Scala is equal to Python float; thus you can alternatively specify FloatType()

# Apply our UDF to the dataframe
customers02 = customers.withColumn('amount_spent', amount_spent_udf(customers['quantity'], customers['price'])).cache()
customers02.show(3, truncate=False)

+-------------+----------+--------+------------+--------+-----+------------+
|customer_name|date      |category|product_name|quantity|price|amount_spent|
+-------------+----------+--------+------------+--------+-----+------------+
|Geoffrey     |2016-04-22|A       |apples      |1       |50.0 |50.0        |
|Geoffrey     |2016-05-03|B       |Lamp        |2       |38.0 |76.0        |
|Geoffrey     |2016-05-03|D       |Solar Pannel|1       |29.0 |29.0        |
+-------------+----------+--------+------------+--------+-----+------------+
only showing top 3 rows

To compute a cumulating sum over time, we need to build a window object and specify how it should be partitioned (aka how to determine which intervals should be used for the aggregation computation, meaning which column to use), and optionally the interval to build a window.

window_01 = Window.partitionBy("customer_name").orderBy("date", "category").rowsBetween(-sys.maxsize, 0)
# note: func was the name given to functions, a Spark API for a suite of convenience functions 
win_customers01 = customers02.withColumn("cumulative_sum", func.sum(customers02['amount_spent']).over(window_01))
win_customers01.show(10, truncate=False)

+-------------+----------+--------+--------------------------+--------+-----+------------+--------------+
|customer_name|date      |category|product_name              |quantity|price|amount_spent|cumulative_sum|
+-------------+----------+--------+--------------------------+--------+-----+------------+--------------+
|Yann         |2016-04-02|A       |bananas                   |3       |55.0 |165.0       |165.0         |
|Yann         |2016-04-02|B       |Lamp                      |2       |38.0 |76.0        |241.0         |
|Yann         |2016-04-03|E       |Book: Crime and Punishment|5       |100.0|500.0       |741.0         |
|Yann         |2016-04-13|E       |Book: The noose           |5       |125.0|625.0       |1366.0        |
|Yann         |2016-04-22|B       |Lamp                      |1       |38.0 |38.0        |1404.0        |
|Yann         |2016-04-27|D       |Solar Pannel              |5       |29.0 |145.0       |1549.0        |
|Yann         |2016-05-01|Y       |Motor skate               |1       |68.0 |68.0        |1617.0        |
|Yann         |2016-05-03|C       |Rice                      |15      |15.0 |225.0       |1842.0        |
|Yann         |2016-05-03|D       |Recycle bin               |5       |27.0 |135.0       |1977.0        |
|Yann         |2016-05-03|Y       |Motor skate               |1       |68.0 |68.0        |2045.0        |
+-------------+----------+--------+--------------------------+--------+-----+------------+--------------+
only showing top 10 rows

Cumalative sum calculation is partitioned by each customer in an interval from the beginning (-sys.maxsize effectively mean start at the very first row in that partition) until the current row (which when the aggregation function is sliding has an index of Zero), and finally ordered by date and category ascending (default).
So just to be sure we’re perfectly clear:
cumlative_sum row zero = (amount_spent row zero);
cumlative_sum row one = (cumlative_sum row zero + amount_spent row zero);
Also, and as a side note, alternatively than defining an UDF, we could specify directly the sum function over the product of two columns (as the Functions.sum is also a UDF).

# Note: instead of defining an UDF, you could alternatively specify directly 
window_01 = Window.partitionBy("customer_name").orderBy("date").rowsBetween(-sys.maxsize, 0)
win_customers01_B = customers.withColumn("cumulative_sum", func.sum(customers['price']*customers['quantity']).over(window_01))
win_customers01_B.show(3, truncate=False)

+-------------+----------+--------+--------------------------+--------+-----+--------------+
|customer_name|date      |category|product_name              |quantity|price|cumulative_sum|
+-------------+----------+--------+--------------------------+--------+-----+--------------+
|Yann         |2016-04-02|A       |bananas                   |3       |55.0 |165.0         |
|Yann         |2016-04-02|B       |Lamp                      |2       |38.0 |241.0         |
|Yann         |2016-04-03|E       |Book: Crime and Punishment|5       |100.0|741.0         |
+-------------+----------+--------+--------------------------+--------+-----+--------------+
only showing top 3 rows

If the rowsBetween() method still smells a bit funky, no worries, it will become clearer in the next example.

What about if we want to understand how much customers spend on average overall/in total (not grouped by product), throughout time? In other words, how does the average spending vary across a given time periodicy – aka: moving Average?

conf = SparkConf().setMaster("local[4]").setAppName("window-demo") \
      .set("spark.driver.memory", "4g")
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc)

 

window_02 = Window.partitionBy("customer_name").orderBy("customer_name", "date").rowsBetween(-3, 0)
win_customers02 = win_customers01.withColumn("movingAvg", func.avg(customers02['amount_spent']).over(window_02) )
win_customers02.show()

+-------------+----------+--------+--------------------+--------+-----+------------+--------------+-----------------+
|customer_name|      date|category|        product_name|quantity|price|amount_spent|cumulative_sum|        movingAvg|
+-------------+----------+--------+--------------------+--------+-----+------------+--------------+-----------------+
|         Yann|2016-04-02|       A|             bananas|       3| 55.0|       165.0|         165.0|            165.0|
|         Yann|2016-04-02|       B|                Lamp|       2| 38.0|        76.0|         241.0|            120.5|
|         Yann|2016-04-03|       E|Book: Crime and P...|       5|100.0|       500.0|         741.0|            247.0|
|         Yann|2016-04-13|       E|     Book: The noose|       5|125.0|       625.0|        1366.0|            341.5|
|         Yann|2016-04-22|       B|                Lamp|       1| 38.0|        38.0|        1404.0|           309.75|
|         Yann|2016-04-27|       D|        Solar Pannel|       5| 29.0|       145.0|        1549.0|            327.0|
|         Yann|2016-05-01|       Y|         Motor skate|       1| 68.0|        68.0|        1617.0|            219.0|
|         Yann|2016-05-03|       C|                Rice|      15| 15.0|       225.0|        1842.0|            119.0|
|         Yann|2016-05-03|       D|         Recycle bin|       5| 27.0|       135.0|        1977.0|           143.25|
|         Yann|2016-05-03|       Y|         Motor skate|       1| 68.0|        68.0|        2045.0|            124.0|
|         Yann|2016-05-27|       A|             bananas|       3| 55.0|       165.0|        2210.0|           148.25|
|         Yann|2016-05-27|       D|         Recycle bin|       5| 27.0|       135.0|        2345.0|           125.75|
|         Yann|2016-06-07|       Z|          space ship|       1|227.0|       227.0|        2572.0|           148.75|
|       Yoshua|2016-02-07|       Z|          space ship|       2|227.0|       454.0|         454.0|            454.0|
|       Yoshua|2016-02-14|       A|             bananas|       9| 55.0|       495.0|         949.0|            474.5|
|       Yoshua|2016-02-14|       A|              apples|      10| 55.0|       550.0|        1499.0|499.6666666666667|
|       Yoshua|2016-02-14|       B|                Lamp|       2| 38.0|        76.0|        1575.0|           393.75|
|       Yoshua|2016-03-07|       Z|          space ship|       5|227.0|      1135.0|        2710.0|            564.0|
|       Yoshua|2016-04-07|       A|             bananas|       9| 55.0|       495.0|        3205.0|            564.0|
|       Yoshua|2016-04-07|       C|                Rice|       5| 15.0|        75.0|        3280.0|           445.25|
+-------------+----------+--------+--------------------+--------+-----+------------+--------------+-----------------+
only showing top 20 rows

Before explaining how this is working, let us revisit the rowsBetween() method. Note that here we specify to compute between the interval of a maximum of 3 rows behind the current one. Alternatively we could say for example two values behind and two ahead interval:

window_03 = Window.partitionBy("customer_name").orderBy("customer_name", "date").rowsBetween(-2, 2)
win_customers03 = win_customers01.withColumn("movingAvg", func.avg(customers02['amount_spent']).over(window_03) )
win_customers03.show(5)

+-------------+----------+--------+--------------------+--------+-----+------------+--------------+---------+
|customer_name|      date|category|        product_name|quantity|price|amount_spent|cumulative_sum|movingAvg|
+-------------+----------+--------+--------------------+--------+-----+------------+--------------+---------+
|         Yann|2016-04-02|       A|             bananas|       3| 55.0|       165.0|         165.0|    247.0|
|         Yann|2016-04-02|       B|                Lamp|       2| 38.0|        76.0|         241.0|    341.5|
|         Yann|2016-04-03|       E|Book: Crime and P...|       5|100.0|       500.0|         741.0|    280.8|
|         Yann|2016-04-13|       E|     Book: The noose|       5|125.0|       625.0|        1366.0|    276.8|
|         Yann|2016-04-22|       B|                Lamp|       1| 38.0|        38.0|        1404.0|    275.2|
+-------------+----------+--------+--------------------+--------+-----+------------+--------------+---------+
only showing top 5 rows

The first row (247.0) is simply the current value plus the next two, devided by the total:
(165.0 + 76.0 + 500.0)/3 = 247.0

Simple, right?

Going back to how the computation is partioned, the way we structured this is to compute a moving average per customer but iterating over each event.
However let’s say we want to know how the customer spending varies on average across daily/weekly/monthly basis? For that, let’s extract those from our date column.

win_customers01.printSchema()
root
 |-- customer_name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: long (nullable = true)
 |-- price: double (nullable = true)
 |-- amount_spent: double (nullable = true)
 |-- cumulative_sum: double (nullable = true)

Spark automatically infered the type of our date column as being String (as we did not specify the schema when we created the Dataframe). Let’s use a UDF to cast it to datetime (using an anonymous function – lambda)

from datetime import datetime as dt
# create the general UDF
string_to_datetime = udf(lambda x: dt.strptime(x, '%Y-%m-%d'), DateType())

# Create a new column called datetime, and drop the date column
win_customers01_B = win_customers01.withColumn('datetime', string_to_datetime( win_customers01['date'])).drop('date')
# Add month and Week columns
win_customers01_C = win_customers01_B.withColumn('year', func.year( win_customers01_B['datetime'] )) \
.withColumn('month', func.month( win_customers01_B['datetime'] )) \
.withColumn('week', func.weekofyear( win_customers01_B['datetime']))
win_customers01_C.show(10, truncate=False)

+-------------+--------+--------------------------+--------+-----+------------+--------------+----------+----+-----+----+
|customer_name|category|product_name              |quantity|price|amount_spent|cumulative_sum|datetime  |year|month|week|
+-------------+--------+--------------------------+--------+-----+------------+--------------+----------+----+-----+----+
|Yann         |A       |bananas                   |3       |55.0 |165.0       |165.0         |2016-04-02|2016|4    |13  |
|Yann         |B       |Lamp                      |2       |38.0 |76.0        |241.0         |2016-04-02|2016|4    |13  |
|Yann         |E       |Book: Crime and Punishment|5       |100.0|500.0       |741.0         |2016-04-03|2016|4    |13  |
|Yann         |E       |Book: The noose           |5       |125.0|625.0       |1366.0        |2016-04-13|2016|4    |15  |
|Yann         |B       |Lamp                      |1       |38.0 |38.0        |1404.0        |2016-04-22|2016|4    |16  |
|Yann         |D       |Solar Pannel              |5       |29.0 |145.0       |1549.0        |2016-04-27|2016|4    |17  |
|Yann         |Y       |Motor skate               |1       |68.0 |68.0        |1617.0        |2016-05-01|2016|5    |17  |
|Yann         |C       |Rice                      |15      |15.0 |225.0       |1842.0        |2016-05-03|2016|5    |18  |
|Yann         |D       |Recycle bin               |5       |27.0 |135.0       |1977.0        |2016-05-03|2016|5    |18  |
|Yann         |Y       |Motor skate               |1       |68.0 |68.0        |2045.0        |2016-05-03|2016|5    |18  |
+-------------+--------+--------------------------+--------+-----+------------+--------------+----------+----+-----+----+
only showing top 10 rows

Let us group customers by spending:

# 
customer_grp_by_day = win_customers01_C.groupBy('customer_name', 'datetime', 'year') \
.agg({'amount_spent': 'sum'}) \
.withColumnRenamed('sum(amount_spent)', 'amount_spent') \
.orderBy('customer_name', 'datetime')
customer_grp_by_day.show(20)

+-------------+----------+----+------------+
|customer_name|  datetime|year|amount_spent|
+-------------+----------+----+------------+
|     Geoffrey|2016-04-22|2016|        50.0|
|     Geoffrey|2016-05-03|2016|       330.0|
|     Geoffrey|2016-06-05|2016|       525.0|
|     Geoffrey|2016-06-15|2016|       601.0|
|       Jurgen|2016-05-01|2016|       502.0|
|       Jurgen|2016-05-08|2016|       343.0|
|       Jurgen|2016-06-05|2016|       621.0|
|         Yann|2016-04-02|2016|       241.0|
|         Yann|2016-04-03|2016|       500.0|
|         Yann|2016-04-13|2016|       625.0|
|         Yann|2016-04-22|2016|        38.0|
|         Yann|2016-04-27|2016|       145.0|
|         Yann|2016-05-01|2016|        68.0|
|         Yann|2016-05-03|2016|       428.0|
|         Yann|2016-05-27|2016|       300.0|
|         Yann|2016-06-07|2016|       227.0|
|       Yoshua|2016-02-07|2016|       454.0|
|       Yoshua|2016-02-14|2016|      1121.0|
|       Yoshua|2016-03-07|2016|      1135.0|
|       Yoshua|2016-04-07|2016|       977.0|
+-------------+----------+----+------------+

Next, let’s check per customer visit (assuming each customer does not visit the store more than once), how much the customer’s pending progresses, with a 7 iterations back interval average:

window_04 = Window.partitionBy("customer_name").orderBy("customer_name", "datetime").rowsBetween(-7, 0)
win_customers04 = customer_grp_by_day.withColumn("movingAvg", func.avg(customer_grp_by_day['amount_spent']).over(window_04))
win_customers04.show(30)
+-------------+----------+----+------------+------------------+
|customer_name|  datetime|year|amount_spent|         movingAvg|
+-------------+----------+----+------------+------------------+
|         Yann|2016-04-02|2016|       241.0|             241.0|
|         Yann|2016-04-03|2016|       500.0|             370.5|
|         Yann|2016-04-13|2016|       625.0| 455.3333333333333|
|         Yann|2016-04-22|2016|        38.0|             351.0|
|         Yann|2016-04-27|2016|       145.0|             309.8|
|         Yann|2016-05-01|2016|        68.0|             269.5|
|         Yann|2016-05-03|2016|       428.0|292.14285714285717|
|         Yann|2016-05-27|2016|       300.0|           293.125|
|         Yann|2016-06-07|2016|       227.0|           291.375|
|       Yoshua|2016-02-07|2016|       454.0|             454.0|
|       Yoshua|2016-02-14|2016|      1121.0|             787.5|
|       Yoshua|2016-03-07|2016|      1135.0| 903.3333333333334|
|       Yoshua|2016-04-07|2016|       977.0|            921.75|
|     Geoffrey|2016-04-22|2016|        50.0|              50.0|
|     Geoffrey|2016-05-03|2016|       330.0|             190.0|
|     Geoffrey|2016-06-05|2016|       525.0| 301.6666666666667|
|     Geoffrey|2016-06-15|2016|       601.0|             376.5|
|       Jurgen|2016-05-01|2016|       502.0|             502.0|
|       Jurgen|2016-05-08|2016|       343.0|             422.5|
|       Jurgen|2016-06-05|2016|       621.0| 488.6666666666667|
+-------------+----------+----+------------+------------------+

Let us group customers by weekly spending:

customer_grp_by_week = win_customers01_C.groupBy('customer_name', 'year', 'week') \
.agg({'amount_spent': 'sum'}) \
.withColumnRenamed('sum(amount_spent)', 'amount_spent') \
.orderBy('customer_name', 'week')
customer_grp_by_week.show(20)
+-------------+----+----+------------+
|customer_name|year|week|amount_spent|
+-------------+----+----+------------+
|     Geoffrey|2016|  16|        50.0|
|     Geoffrey|2016|  18|       330.0|
|     Geoffrey|2016|  22|       525.0|
|     Geoffrey|2016|  24|       601.0|
|       Jurgen|2016|  17|       502.0|
|       Jurgen|2016|  18|       343.0|
|       Jurgen|2016|  22|       621.0|
|         Yann|2016|  13|       741.0|
|         Yann|2016|  15|       625.0|
|         Yann|2016|  16|        38.0|
|         Yann|2016|  17|       213.0|
|         Yann|2016|  18|       428.0|
|         Yann|2016|  21|       300.0|
|         Yann|2016|  23|       227.0|
|       Yoshua|2016|   5|       454.0|
|       Yoshua|2016|   6|      1121.0|
|       Yoshua|2016|  10|      1135.0|
|       Yoshua|2016|  14|       977.0|
+-------------+----+----+------------+

And computing the weekly moving average:

window_05 = Window.partitionBy('customer_name').orderBy('customer_name', 'week', 'year').rowsBetween(-4, 0)
win_customers05 = customer_grp_by_week.withColumn("movingAvg", func.avg(customer_grp_by_week['amount_spent']).over(window_05))
win_customers05.show(30)
+-------------+----+----+------------+-----------------+
|customer_name|year|week|amount_spent|        movingAvg|
+-------------+----+----+------------+-----------------+
|         Yann|2016|  13|       741.0|            741.0|
|         Yann|2016|  15|       625.0|            683.0|
|         Yann|2016|  16|        38.0|            468.0|
|         Yann|2016|  17|       213.0|           404.25|
|         Yann|2016|  18|       428.0|            409.0|
|         Yann|2016|  21|       300.0|            320.8|
|         Yann|2016|  23|       227.0|            241.2|
|       Yoshua|2016|   5|       454.0|            454.0|
|       Yoshua|2016|   6|      1121.0|            787.5|
|       Yoshua|2016|  10|      1135.0|903.3333333333334|
|       Yoshua|2016|  14|       977.0|           921.75|
|     Geoffrey|2016|  16|        50.0|             50.0|
|     Geoffrey|2016|  18|       330.0|            190.0|
|     Geoffrey|2016|  22|       525.0|301.6666666666667|
|     Geoffrey|2016|  24|       601.0|            376.5|
|       Jurgen|2016|  17|       502.0|            502.0|
|       Jurgen|2016|  18|       343.0|            422.5|
|       Jurgen|2016|  22|       621.0|488.6666666666667|
+-------------+----+----+------------+-----------------+

Finally, let us move to monthly groupping and calculations.

customer_grp_by_month = win_customers01_C.groupBy('customer_name', 'year', 'month')\
.agg({'amount_spent': 'sum'}) \
.withColumnRenamed('sum(amount_spent)', 'amount_spent') \
.orderBy('customer_name', 'month')
customer_grp_by_month.show(20)
+-------------+----+-----+------------+
|customer_name|year|month|amount_spent|
+-------------+----+-----+------------+
|     Geoffrey|2016|    4|        50.0|
|     Geoffrey|2016|    5|       330.0|
|     Geoffrey|2016|    6|      1126.0|
|       Jurgen|2016|    5|       845.0|
|       Jurgen|2016|    6|       621.0|
|         Yann|2016|    4|      1549.0|
|         Yann|2016|    5|       796.0|
|         Yann|2016|    6|       227.0|
|       Yoshua|2016|    2|      1575.0|
|       Yoshua|2016|    3|      1135.0|
|       Yoshua|2016|    4|       977.0|
+-------------+----+-----+------------+

 

# This shows how much the customer's pending progresses across months, with a 3 iterations back interval avg

window_06 = Window.partitionBy('customer_name').orderBy('customer_name', 'month', 'year').rowsBetween(-3, 0)
win_customers06 = customer_grp_by_month.withColumn("movingAvg", func.avg(customer_grp_by_month['amount_spent']).over(window_06))
win_customers06.show(30)
+-------------+----+-----+------------+-----------------+
|customer_name|year|month|amount_spent|        movingAvg|
+-------------+----+-----+------------+-----------------+
|         Yann|2016|    4|      1549.0|           1549.0|
|         Yann|2016|    5|       796.0|           1172.5|
|         Yann|2016|    6|       227.0|857.3333333333334|
|       Yoshua|2016|    2|      1575.0|           1575.0|
|       Yoshua|2016|    3|      1135.0|           1355.0|
|       Yoshua|2016|    4|       977.0|           1229.0|
|     Geoffrey|2016|    4|        50.0|             50.0|
|     Geoffrey|2016|    5|       330.0|            190.0|
|     Geoffrey|2016|    6|      1126.0|            502.0|
|       Jurgen|2016|    5|       845.0|            845.0|
|       Jurgen|2016|    6|       621.0|            733.0|
+-------------+----+-----+------------+-----------------+

As usual, I suggest further checking these sources:

 

 

Getting through Deep Learning – CNNs (part 1)

The number of available open source libraries making Deep learning easier to use is spreading fast as hype continuous to build. However, without understanding the background principles, it just feels like poking around a black box.

In this post (or several, most likely) will try to give an introduction to Convolution Neural Networks (CNNs). Note that, for the sake of brevity, I assume that you already know the basics about Neural Networks. If not, I would suggest you go through the following introduction.

This post is part of a tutorial series:

  1. Getting through Deep Learning – CNNs (part 1)
  2. Getting through Deep Learning – TensorFlow intro (part 2)
  3. Getting through Deep Learning – TensorFlow intro (part 3)

Disclaimer: this post uses images and formulas from distinct sources. I would suggest to have a look over the complete list of sources at the end of the post, as usual.

Inspiration

In 1958 and 1959 David H. Hubel and Torsten Wiesel performed a series of experiments, whereby they concluded that many neurons in the visual cortex focus on a limited region in the vision field.

This insight provided the notion of a local receptive field – a narrow sub-region of what is available in the whole visual field which serves as input – thus giving rise for a different architecture than the previously fully connected neural network architecture.

Basics – Convolution Layer

The first thing to realize is that Convolution networks are simply the application of “mini-neural networks” to segments of input space. In the case of images, that results in that neurons in the first convolutional layer are not connected to every single pixel in their Receiptive Field (RF).  The following image (source) shows an illustration of how a a a convolution layer is built using an image from the famous MNIST dataset – whereby the goal consists in identifyying the digits from handwritten numbers pictures.

Cnn_layer

 

OK, let’s break this down. In the MNIST dataset each, each image is 28 by 28 pixels – respectively height and width. For a fully connected neural network, the input space for the first layer of 28 x 28 = 728px if we were only to include height and width.

However, in a so-called convolution, you would instead apply a mini-neural network to just a single portion/segment of the image – let’s say a 3×3 px (width and height) rectangle. This 3×3 receptive field is also often refered to as a filter or kernel in Deep Learning (DL) lingo.

Next you would slide that kernel over the image, let us say 1px to the left in each step until we reach the far right end, and then 1px down until we reach the lower bound. The following animated illustration – credits go to Vincent Dumoulin and Francesco Visin with awesome CNN architectual overview available here  – shows the building of a convolution layer using this 3×3 Receptive Field/Filter/Kernel step-by-step building what is called a Feature Map – a layer full of neurons using the same filter.

base_conv_no_padding_no_strides

 

Thus a Feature Map can also be thought of a multitude of these mini-Neural Networks – whereby each filter – the dark blue rectangle – has its own weights, bias term, and activation function, and produces one output (darker green square).

The following illustration shows a detailed view of the progressive application of these mini neural network across filters of the initial input space -again credits go to Vincent Dumoulin and Francesco Visin – and producing a Feature map.

deep_learning_basics_trimmed.png

Note that the illustrations used an iterative sliding of a kernel of 3×3 in a step of 1 px. However this is not a requirement, as one can use for example a step size of 2, reducing the dimention of output space. By now you should be realizing that indeed this step size – called the stride in DL lingo – is yet another hyper parameter, just as the filter size.

OK, we are almost completed with the basic concepts related to a CNNs: we take a local receptive field  – which we call filter/kernel – slide it through an image in given step size – which we call stride – and produce a set of mini neural networks – which we call a feature map.

The missing detail that builds uppon the previous knowledge is the fact that we can simultaneously use different filters to the same input space, thus producing several feature maps as a result. The only restriction is that feature maps in a given layer have the same dimention. The following illustration from the excellent book “Hands-On Machine Learning with Scikit-Learn and TensorFlow”  gives more insight to the intuition of stacking multiple feature maps.

feature_maps_cnn

Until so far we have represented a convolution layer in a thin 2-D output space (1 single feature map). However, if one produces several feature maps in one layer, the result is a 3-D volume.  And this is when the notion of convolution comes into place: the output of this convolution will have a new height, width and depth.

conv_layer_depth

 

It is thus frequent to see convolutional networks illustrated as “tall and skinny” boxes (aka with high values of height and low of depth), and progressively getting “shorter and fatter” (smaller height and bigger depth).

 

Conv_layers.png

Basics – Pooling Layer

Before we go into more detail about different architectures a Convolution Layer may have (in the next post of this series), it is important to cover some ground aspects. To start, one is that you are not restricted to use Convolution Layer when creating a new hidden layer. In fact there are two more main types of layers: Pooling Layer (which we will cover in just a bit) and Fully-Connected layer (exactly as a regular Neural Network).

Also note that similarly to regular Neural Networks, where you can have as many hidden layers as you want, the same goes the CNN. That is, you can build convolutions that serve as input space for a next convolution layer or a pooling layer for example,  and so on.

Finally, and again similarly to Neural Networks, the last fully-connected layer will always contain as many neurons as the number of classes to be predicted.

Typical Activation functions

Again to be perfectly clear, what we do in each step of the sliding filter in a convolution is to a apply the dot product of a given set of weights (which we are trying to tune with training), plus a given bias. This is effectively a linear function represented in the following form:

linear_regression_trimmed

where the weights is a vector represented by W, Xi would be the pixel values in our example inside a given filter, and b the bias. So what usually happens (except from pooling)  is that we pass the output of this function to a neuron, which will then apply given activation function. The typical activation functions that are usually implemented are:

Softmax – is a generalized form of the logistic function/sigmoid function, which turns the outputs into probabilities (thus comprising in interval between 0 and 1).

softmax_trimmed

ReLU – Rectified Linear Unit functions have a smoothing effect on the output, making results always bigger than zero.

relu

Tanh – hyperbolic tangent function, which enables activation functions to range from -1 to +1.

tanh

Typical Cost Functions

As you know, cost functions are what makes the whole training of models possible. Here are three of the main, where cross entropy is probably the most frequently used.

Mean Squared Error –  used to train linear regression models

mean_squared_error

Hinge Loss – used to train Support Vector Machines (SVM) models

hinge_loss

 

Cross entropy – used to train logistic regression models

cross_entropy_trimmed

Regularization

Beyond activation functions applied to convolutions, there are also some other useful tricks applied to build a Deep Neural Network (DNN), which address the well known problem of over-fitting.

Pooling Layer – bottom line, pooling layers are used to reduce dimension. They sample from input space also using a filter/kernel with a given output dimension, and simply applying a reduce function. Usually a max()  – usually called max pooling – or mean()  – average pooling – functions.

For example, if one of kernels was a 2×2 matrix with the values [ [1,2], [3,4]], then max pooling would yield 4 as an output, where average pooling would yield 2.5 .

Dropouts -dropouts goal is exactly the same as regularization (it is after all a regularization technique); that is, it is intended to reduce over-fitting on outer sample. Initially it was used to simply turn off passing a portion of output of neurons at every iteration during training. That is, instead of passing all weights dot product computed against the input layer, we randomly (with a given specifiable probability) consciously decide to not add a given set of weights to the output layer.

In case you are wondering if this is similar trick to bagging that Random Forests does to Decision Trees, the answer would be not quite. The operation of averaging through lots of Decision Trees (wich have high propensity to over-fit data) using  sampling  with replacement is computationally doable (at today’ standards). However the same does not hold true to train distinct DNNs. So the dropouts technique is a practical method to average internally the outputs among layers of a network.

Final Notes

In part 2 I plan to get into more detail about convolution architectures, as well as provide a coding example in order to bring all these concepts home.

However, I didn’t want to terminate this post without any code snippet. So even though not going to go through it, just as a demonstration that with the concepts covered before you can already have some intuition around it, here is a code snippet where a training session with Google’s deep learning library TensorFlow is defined with two convolution layers:

As usual, here are the sources used for this post:

Spark 2.0: From quasi to proper-streaming?

This post attempts to follow the relatively recent new Spark release – Spark 2.0 – and understand the differences regarding Streaming applications. Why is streaming in particular?, you may ask. Well, Streaming is the ideal scenario most companies would like to use, and the competition landscape is definitely heating up, specially with Apache Flink and Google’s Apache Beam.

Why is streaming so difficult

There are three main problems when it comes to building real time applications based on streaming data:

  • Data consistency:  due to the distributed architecture nature it is possible that at any given point in time some events have been processed in some nodes and not  in other nodes, even though these events might actually have occurred before than others. For example, you may have a Logout event without a Login event, close app event without open app, etc.
  • Fault tolerance (FT): related to the first problem, on node failure processing engines may try to reprocess an event that had actually already been processed by the failed node, leading to duplicated records and/or inaccurate metrics
  • Dealing with late data/out-of-order data: either because you are reading from a message bus system such as Kafka or Kinesis, or simply because mobile app might only be able to sync data hours later, developers simply must tackle this challenge in application code.

See this post for an excellent detailed explanation.

Rewind to Spark – 1.6.2

Unlike Apache Flink, where batch jobs are the exception and Streaming the default, Apache Spark uses the exact opposite logic: Streaming is the exception of batch. To make this more clear, the way Spark Streaming works is by dividing live data streams of data into batches (sometimes called microbatches). The continuous streams are represented as a high level abstraction called discretized stream or DStream, which internally is represented as a sequence of RDDs.

Some things in common between RDDs and DStreams are that DStreams are executed lazily by the output operations – the equivalent to Actions in RDDs. For example, in the classical word count example, the print method would be the output operation.

Nonetheless, even though the main abstraction to work around was DStreams on top of RDDs, it was already possible to work with Dataframes (and thus SQL) on streaming data before.

 

Changes in Spark 2.0 – introducing “Structured Streaming”

So  the new kid on the block is structured streaming, a feature still in alpha for Spark 2.0. It’s marketing slogan: “Fast, fault-tolerant, exactly-once stateful stream processing without having to reason about streaming“. In other words, this is an attempt to simplify the life of all developers, struggling with the three main general problems in streaming, while bringing a streaming API for DataFrames and DataSets.

Note that now the streaming API has been unified with Batch API, which means that you should expect (almost always) to use same method calls exactly as you would in normal batch API. Of course, some minor exceptions, such as reading from file and writing to file,  require extra method call. Here is a copy paste from the examples shown at Spark Submit in the “A deep dive into structured streaming” presentation (highly recommendable).

Syntax for batch more:

input = spark.read.format("json").load("source-path")
result = input.select("device", "signal").where("signal > 15")
result.write.format("parquet").save("dest-path")

Now for structured streaming:

input = spark.read.format("json").stream("source-path")
result = input.select("device", "signal").where("signal > 15")
result.write.format("parquet").startStream("dest-path") 

In case you didn’t notice the difference (that’s good in this case), what changed is instead of load() of a file, you continuously stream(), and same logic to write with startStream(). Pretty cool, right?

Also note that the base premise of  spark’s streaming discussed before still stays; that is, micro-batching still stays.

Repeated Queries on input table

So besides unifying the API, where’s the magic sauce? The main concept is this new abstraction where streaming data is viewed as if it were much like a table in a relational DB. In other words new incoming data is treated as a new row inserted on an unbounded input table.

The developer has three tasks in the middle of this whole streaming party.

  1. One is to define a query that acts repeatedly on this input table, and produces a new table called result table that will be written to an output sink. This query will be analyzed under the hood by spark (as explained bellow) by the query planner into an execution plan, and Spark will figure out on its own what needs to be maintained in the input table to update the result table.
  2. The second is to specify the triggers that control when to update the results.
  3. Define an output mode, so that spark knows how it should write to an external system (such as S3, HDFS or database). Three alternatives: append (only new rows of result table will be written), complete (full result table will be flushed), update (only rows that were updated since last trigger will be changed)

Structured Streaming under the hood

Under the hood (and when I say “under the hood” I mean what is done for you that you don’t need implement in your code – but yes, you should nonetheless care about it) structured streaming are possible due to some considerable changes. The first is that the Query planner (the same for batch mode) generates a continuous series of multiple incremental execution plans for each processing of the next chunk of streaming data (for example, when pooling from Kafka this would mean the new range of offsets).

Now this can get particularly tricky because each execution plan needs to take into consideration the previous aggregations that were held by previous executions. So these continuous aggregations’ state is maintained internally as an in-memory stated, and backed by  Write Ahead Log (WAL) in the file system (for Fault Tolerance). So yes, the Query Planner is FT by storing tracks of offsets of each execution into a distributed file system such as HDFS.

Moreover on FT, the sinks are also idempotent, which means that one should expect that they handle re-executions to avoid double committing the output.

Finally, before you get all excited, I do want to emphasize that at the moment, this is still in alpha mode, and is explicitly not recommended for production environments.

Not enough?, bring on more resources: