# HMM PySpark Implementation: A Zalando Hack Week Project

Training models using larger amounts of data for a great Zalando Hack Week project.

Every year, Zalando’s Hack Week gives us the opportunity to join together in cross-disciplinary teams to solve a wide variety of problems (you can check this year’s amazing winners here). The projects come from any point of the organization and we are encouraged to band together with other employees across locations and business units.

For our 2016 edition of Hack Week, we implemented a PySpark version of Hidden Markov Model (HMM). HMM has been extensively used in a wide variety of problems such as speech recognition, stock market prediction or behavioural analysis.

Our approach was based on the Python library *hmmlearn*. It provides a complete set of tools for multinomial (discrete), Gaussian and Gaussian mixture HMM. *hmmlearn* was previously part of the well-known machine learning library scikit-learn (http://scikit-learn.org/stable/). This implementation is limited by two major factors:

- Memory: it constrains the amount of data we can use for training. Traditionally, the datasets used for training an HMM contained just a few thousand of samples and could be easily fitted in a single machine. Nowadays, we cannot limit ourselves to just one single machine if we want to harness the power of Big Data.
- CPU: as we will see later, training an HMM model is a very expensive process. If we want to integrate HMM into live systems, we need to reduce the time required to update our models.

Since prediction using HMM is easily parallelizable (the model can be applied independently to each input sequence), we focused our efforts on training. An HMM model is trained using the Expectation-Maximization (EM) algorithm. During the expectation phase, the current model parameters are used to compute the expected value of the likelihood function. In the maximization (M) step, the model parameters are updated to maximize the likelihood function.

The EM is an iterative algorithm. The initial value of the model’s parameters may be chosen randomly. Then, on each iteration, the parameters are recomputed and the likelihood function increases. EM always converges but unfortunately, it is not possible to guarantee that the algorithm will find the global maximum. Since each iteration uses the values of the parameters computed in the previous one, it is not possible to parallelize the overall process. Our efforts were thus aimed to distribute the computations required on each one of the iterations.

Assuming the input sequences are stored in a PySpark data frame, each distributed iteration is made up of the following steps:

**1.** The values of the transition matrix, emission probabilities and start probabilities are broadcasted:

```
transmat_broadcast = sparkSession.sparkContext.broadcast(self.transmat_)
emissionprob_broadcast = sparkSession.sparkContext.broadcast(self.emissionprob_)
startprob_broadcast = sparkSession.sparkContext.broadcast(self.startprob_)
```

**2.** Complete the E step. It can be parallelized easily by computing the necessary stats independently of each input sequence (using the map method in the rdd).

```
partial_rdd = df.rdd.map(lambda row: _process_sequence(row['sequence']))
```

It consists of the following steps:

- Compute the logarithmic likelihood according to the distribution of the input sequences (multinomial in our case).

```
framelogprob = np.log(emissionprob_broadcast.value)[:, X].T
n_samples, n_components = framelogprob.shape
```

- Calculate the forward and backward lattice.

```
# _BaseHMM._do_forward_pass()
fwdlattice = np.zeros((n_samples, n_components))
_hmmc._forward(n_samples, n_components,
log_mask_zero(startprob_broadcast.value),
log_mask_zero(transmat_broadcast.value),
framelogprob, fwdlattice)
logprob = logsumexp(fwdlattice[-1])
# _BaseHMM._do_backward_pass()
bwdlattice = np.zeros((n_samples, n_components))
_hmmc._backward(n_samples, n_components,
log_mask_zero(startprob_broadcast.value),
log_mask_zero(transmat_broadcast.value),
framelogprob, bwdlattice)
```

- Calculate the posteriors.

```
log_gamma = fwdlattice + bwdlattice
log_normalize(log_gamma, axis=1)
posteriors = np.exp(log_gamma)
```

- Compute the changes on the transition matrix, emission and start probabilities using the previously obtained values.

```
# accumulate sufficient statistics variables
# - start probabilities
start = posteriors[0]
# - transition probabilities
if n_samples > 1:
lneta = np.zeros((n_samples - 1, n_components, n_components))
_hmmc._compute_lneta(n_samples, n_components, fwdlattice,
np.log(self.transmat_),
bwdlattice, framelogprob, lneta)
trans = np.exp(logsumexp(lneta, axis=0))
else:
trans = np.zeros((n_samples - 1, n_components, n_components))
# - emission probabilities
obs = np.zeros((n_components, self.n_features))
for t, symbol in enumerate(X):
obs[:, symbol] += posteriors[t]
return [start.tolist(), trans.tolist(), obs.tolist(), float(logprob)]
```

Aggregate the partial results of the previous step.

```
aggregation = partial_df.groupBy().agg(
array(*[sum(col("start").getItem(i)) for i in range(self.n_components)]).alias("start"),
array(*[
array(*[sum(col("trans").getItem(i).getItem(j)) for j in range(self.n_components)])
for i in range(self.n_components)]).alias("trans"),
array(*[
array(*[sum(col("obs").getItem(i).getItem(j)) for j in range(self.n_features)])
for i in range(self.n_components)]).alias("obs"),
sum(col("logprob")).alias("logprob"),
count('*')
).collect()[0]
```

**3.** Complete the M step. The values of the transition matrix, emission probabilities and start probabilities are updated with the aggregated values of the stats computed in the previous step.

**4.** Convergence monitor. If the update of the model parameters is smaller than the training tolerance, or if we have reached the maximum number of iterations, then stop or go back to the first step.

Our distributed HMM implementation was tested on EMR 5.2.0 using Python 3.4 and Spark 2.0.2. We compared the training phase of the standalone version of the algorithm on an m4.2xlarge instance versus the distributed version running on a cluster made up of 15 m4.2xlarge nodes. The training data was made up of integer sequences drawn from a multinomial distribution. The training times were computed using datasets containing 1,000 sequences and up to 10 million sequences.

The figure below contains the results we obtained from our experiments. The standalone version was only faster on the 1,000-sequences dataset, where the cost of communication between executors overwhelmed the gain of using distributed computing. The standalone version hitt its limits on the 500,000-sequences dataset, with a training time of 91.6 minutes. Our distributed implementation completed the training in only 6.4 minutes! This means an improvement in speed of a factor of 14.3 (really close to 15, the maximum we could reach since we were using 15 nodes in our cluster). We also experimented with very large datasets. The distributed algorithm was able to train a model on a dataset containing 10 million sequences in 102 minutes.

Thanks to our HMM PySpark implementation, we can now train models using larger amounts of data. It translates into a better understanding of the requirements of our customers and into services tailored to their needs. Our next steps are: (i) explore how we can share our project with the open source community and (ii) extend the implementation to other emission probabilities (e.g. Gaussian or Poisson). And… we are already thinking about new cool ideas for this year’s Hack Week!

If you have some questions about our HMM PySpark implementation, please feel free to reach out via Twitter at @S_Gonzalez_S. We’d be happy to hear from you.

**Footnotes:**

Rabiner, L. and Juang, B., 1986. An introduction to hidden Markov models. IEEE ASSP magazine, 3(1), pp.4-16.

Moon, T.K., 1996. The expectation-maximization algorithm. IEEE Signal processing magazine, 13(6), pp.47-60.

*We're hiring! Do you like working in an ever evolving organization such as Zalando? Consider joining our teams as a Data Engineer!*