Sunday, June 29, 2014

Exascale in perspective: RSC's 1.2 petaflop rack

Russian supercomputing manufacturer RSC generated some buzz at ISC'14 last week when they showed their 1.2 PF-per-rack Xeon Phi-based platform.  I was aware of this system from when they first announced it a few months prior, and I referenced it in a piece of a blog post I was writing about the scarier aspects of exascale computing.  Given my impending career change though, it is unclear that I will have the time to ever finish that post before it becomes outdated.  Since RSC is back in the spotlight though, I thought I'd post the piece I wrote up to illustrate how wacky this 1.2 PF rack really is in terms of power consumption.  Power consumption, of course, is the limiting factor standing between today and the era of exascale computing.

So, to put a 400 kW, 1.2 PF rack into perspective, here is that piece:



The Importance of Energy Efficiency

Up through the petascale era in which we currently live, raw performance of high-performance components--processors, RAM, and interconnect--were what limited the ultimate performance of a given high-end machine.  The first petaflop machine, Los Alamos' Roadrunner, derived most of its FLOPs from high-speed PowerXCell 8i processors pushing 3.2 GHz per core.  Similarly, the first 10 PF supercomputer, RIKEN's K computer, derived its performance from its sheer size of 864 cabinets.  Although I don't mean to diminish the work done by the engineers that actually got these systems to deliver this performance, the petascale era really was made possible by making really big systems out of really fast processors.

By contrast, Exascale represents the first milestone where the limitation does not lie in making these high-performance components faster; rather, performance is limited by the amount of electricity that can be physically delivered to a processor and the amount of heat that can be extracted from it.  This limitation is what has given rise to these massively parallel processors that eschew a few fast cores for a larger number of low-powered ones.  By keeping clock speeds low and densely packing many (dozens or hundreds) of compute cores on a single silicon die, these massively parallel processors are now realizing power efficiencies (flops per watt) that are an order of magnitude higher than what traditional CPUs can deliver.

The closest technology on the market that will probably resemble the future's exaflop machines are based on accelerators--either NVIDIA GPUs or Intel's MICs.  The goal will be to jam as many of these massively parallel processors into as small a space and with as tight of an integration as possible.  Recognizing this trend, NERSC has opted to build what I would call the first "pre-exascale" machine in its NERSC-8 procurement which will feature a homogeneous system of manycore processors.

However, such pre-exascale hardware doesn't actually exist yet, and NERSC-8 won't appear until 2016.  What does exist, though, is a product by Russia's RSC Group called PetaStream: a rack packed with 1024 current-generation Xeon Phi (Knight's Corner) coprocessors that has a peak performance of 1.2 PF/rack.  While this sounds impressive, it also highlights the principal challenge of exascale computing: power consumption.  One rack of RSC PetaStream is rated for 400 kW, delivering 3 GFLOPs/watt peak.  Let's put this into perspective.

Kilowatts, megawatts, and gigawatts in perspective

During a recent upgrade to our data center infrastructure, three MQ DCA220SS-series diesel generators were brought in for the critical systems.  Each is capable of producing 220 kVA according to the spec sheets.
Three 220 kVA diesel generators plugged in during a PM at SDSC
It would take three of these diesel generators to power a single rack of RSC's PetaStream.  Of course, these backup diesel generators aren't a very efficient way of generating commercial power, so this example is a bit skewed.

Let's look at something that is used to generate large quantities of commercial power instead.  A GE 1.5-77 wind turbine, which is GE's most popular model, is advertised as delivering 1.5 megawatts at wind speeds above 15 miles per hour.

GE 1.5 MW wind turbine.   Source: NREL
Doing the math, this means that the above pictured turbine would be able to power only three racks of RSC PetaStream on a breezy day.

To create a supercomputer with a peak capability of an exaflop using RSC's platform, you'd need over 800 racks of PetaStream and over 300 MW of power to turn it all on.  That's over 200 of the above GE wind turbines and enough electrity to power about 290,000 homes in the U.S.  Wind farms of this size do exist; for example,

300 MW Stateline Wind Farm.  Source: Wikimedia Commons
the Stateline Wind Farm, which was built on the border between Oregon and Washington, has a capacity of about 300 MW.  Of course, wind farms of this capacity cannot be built in any old place.

Commercial nuclear power plants can be built in a variety of places though, and they typically generate on the order of 1 gigawatt (GW) of power per reactor.  In my home state of New Jersey, the Hope Creek Nuclear Generating Station has a single reactor that was built to deliver about 1.2 GW of power:

1.2 GW Hope Creek nuclear power station.  The actual reactor is housed in the concrete cylinder to the bottom left.  Courtesy of the Nuclear Regulatory Commission.

This is enough to power almost 4 exaflops of PetaStream.  Of course, building a nuclear reactor for every exaflop supercomputer would be extremely costly, given the multi-billion dollar cost of building reactors like this.  Clearly, the energy efficiency (flops/watt) of computing technology needs to improve substantially before we can arrive at the exascale era.

Tuesday, June 24, 2014

Perspectives on the Current State of Data-Intensive Scientific Computing

I recently had the benefit of being invited to attend two workshops in Oakland, CA, hosted by the U.S. Department of Energy (DOE), that shared the common theme of emerging trends in data-intensive computing: the Joint User Forum on Data-Intensive Computing and the High Performance Computing Operational Review.  My current employment requires that I stay abreast of all topics in data-intensive scientific computing (I wish there was an acronym to abbreviate this...DISC perhaps?) so I didn't go in with the expectation of being exposed to a world of new information.  As it turned out though, I did gain a very insightful perspective on how data-intensive scientific computing (DISC), and I daresay Big Data, is seen from the people who operate some of the world's largest supercomputers.

The DOE perspective is surprisingly realistic, application-oriented, and tightly integrated with high-performance computing.  There was the obligatory discussion of Hadoop and how it may be wedged into machines at LLNL with Magpie, ORNL with Spot Hadoop, and SDSC with myHadoop, of course, and there was also some discussion of real production use of Hadoop on bona fide Hadoop clusters at some of the DOE labs.  However, Hadoop played only a minor role in the grand scheme of the two meetings for all of the reasons I've outlined previously.

Rather, these two meetings had three major themes that crept into all aspects of the discussion:
  1. Scientific workflows
  2. Burst buffers
  3. Data curation
I found this to be a very interesting trend, as #1 and #2 (workflows and burst buffers) aren't topics I'd heard come up at any other DISC workshops, forums, or meetings I've attended.  The connection between DISC and workflows wasn't immediately evident to me, and burst buffers are a unique aspect of cyberinfrastructure that have only been thrust into the spotlight with the NERSC-8/LANL Trinity RFP last fall.  However, all three of these topics will become central to both data-intensive scientific computing and, by virtue of their ability to produce data, exascale supercomputers.

Scientific workflows

Workflows are one of those aspects of scientific computing that have been easy to dismiss as the toys of computer scientists because traditional problems in high-performance computing have typically been quite monolithic in how they are run.  SDSC's own Kepler and USC's Pegasus systems are perhaps the most well-known and highly engineered workflow management systems, and I have to confess that when I'd first heard of them a few years ago, I thought they seemed like a very complicated way to do very simple tasks.

As it turns out though, both data-intensive scientific computing and exascale computing (by virtue of the output size of exaflop calculations) tend to follow patterns that look an awful lot like map/reduce at a very abstract level.  This is a result of the fact that most data-intensive problems are not processing giant monoliths of tightly coupled and inter-related data; rather, they are working on large collections of generally independent data.  Consider the recent talk I gave about a large-scale genomic study on which I consulted; the general data processing flow was
  1. Receive 2,190 input files, 20 GB each, from a data-generating instrument
  2. Do some processing on each input file
  3. Combine groups of five input files into 438 files, each 100 GB in size
  4. Do more processing 
  5. Combine 438 files into 25 overlapping groups to get 100 files, each 2.5 GB in size
  6. Do more processing
  7. Combine 100 files into a single 250 GB file
  8. Perform statistical analysis on this 250 GB file for scientific insight
The natural data-parallelism inherent from the data-generating instrument means that any collective insight to be gleaned from this data requires some sort of mapping and reduction, and the process of managing this large volume of distributed data is where scientific workflows become a necessary part of data-intensive scientific computing.  Managing terabytes or petabytes of data distributed across thousands or millions of logical records (whether they be files on a file system, rows in a database, or whatever else) very rapidly becomes a problem that nobody will want to do by hand.  Hadoop/HDFS delivers an automated framework for managing these sorts of workflows if you don't mind rewriting all of your processing steps against the Hadoop API and building out HDFS infrastructure, but if this is not the case, alternate workflow management systems begin to look very appealing.

The core debate was not whether or not workflow management systems were a necessary component in DISC; rather, I observed two salient, open questions:
  1. The systems in use at DOE (notably Fireworks and qdo) are primarily used to work around deficiencies in current HPC schedulers (e.g., Moab and SLURM) in that they cannot handle scheduling hundreds of thousands of tiny jobs concurrently.  Thus, should these workflow managers be integrated into the scheduler to address these shortcomings at their source?
  2. How do we stop every user from creating his or her own workflow manager scripts and adopt an existing solution instead?  Should one workflow manager rule them all, or should a Darwinian approach be taken towards the current diverse landscape of existing software?
Question #1 is a highly technical question that has several dimensions; ultimately though, it's not clear to me that there is enough incentive for resource manager and scheduler developers to really dig into this problem.  They haven't done this yet, and I can only assume that this is a result of the perceived domain-specificity and complexity of each workflow.  In reality, a large number of workflows can be accommodated by two simple features: support for directed acyclic graphs (DAGs) of tasks and support for lightweight, fault-tolerant task scheduling within a pool of reserved resources.  Whether or not anyone will rise to the challenge of incorporating this in a usable way is an open question, but there certainly is a need for this in the emerging realm of DISC.

Question #2 is more interesting to me since this problem of multiple people cooking up different but equivalent solutions to the same problems is pervasive throughout computational and computer science. This is in large part due to the fatal assumption held by many computer scientists that good software can be simply "thrown over the fence" to scientists and it will be adopted.  This has never worked; rather, the majority of widely adopted software technologies in HPC have been a result of the standardization of a landscape of similar but non-standard tools.  This is something I touched on in a previous post when outlining the history of MPI and OpenMP's successes.

I don't think the menagerie of workflows' developers are ready to settle on a standard, as the field is not mature enough to have a holistic understanding of all of the issues that workflows need to solve.  Despite the numerous presentations and discussions of various workflow solutions being used across DOE's user facilities, my presentation was the only one that considered optimizing workflow execution for the underlying hardware.  Given that the target audience of these talks were users of high-performance computing, the lack of consideration given to the performance aspects of workflow optimization is a testament to this immaturity.

Burst buffers

For those who haven't been following the details of one of DOE's more recent procurement rounds, the NERSC-8 and Trinity request for proposals (RFP) explicitly required that all vendor proposals include a burst buffer to address the capability of multi-petaflop simulations to dump tremendous amounts of data in very short order.  The target use case is for petascale checkpoint-restart, where the memory of thousands of nodes (hundreds of terabytes of data) needs to be flushed to disk in an amount of time that doesn't dominate the overall execution time of the calculation.

The concept of what a burst buffer is remains poorly defined.  I got the sense that there are two outstanding definitions:
  • The NERSC burst buffer is something more tightly integrated on the compute side of the system and may be a resource that can be allocated on a per-job basis
  • The Argonne burst buffer is something more tightly integrated on the storage side of the system and acts in a fashion that is largely transparent to the user.  This sounded a lot like the burst buffer support being explored for Lustre.
In addition, Los Alamos National Labs (LANL) is exploring burst buffers for the Trinity procurement, and it wasn't clear to me if they had chosen a definition or if they are exploring all angles.  One commonality is that DOE is going full-steam ahead on providing this burst buffer capability in some form or another, and solid-state storage is going to be a central enabling component.

Personally, I find the NERSC burst buffer concept a lot more interesting since it provides a more general purpose flash-based resource that can be used in novel ways.  For example, emerging software-defined storage platforms like EMC's Vipr can potentially provide very fine-grained access to flash as-needed to make better overall use of the underlying SSDs in HPC environments serving a broad user base (e.g., NERSC and the NSF centers).  Complementing these software technologies are emerging hardware technologies like DSSD's D5 product which will be exposing flash to compute systems in innovative ways at hardware, interconnect, and software levels.

Of course, the fact that my favorite supercomputer provides dynamically allocatable SSDs in a fashion not far removed from these NERSC burst buffers probably biases me, but we've demonstrated unique DISC successes enabled by our ability to pile tons of flash on to single compute nodes.  This isn't to say that the Argonne burst buffer isn't without merit; given that the Argonne Leadership Computing Facility (ALCF) caters to capability jobs rather than capacity jobs, their user base is better served by providing a uniform, transparent burst I/O capability across all nodes.  The NERSC burst buffer, by comparison, is a lot less transparent and will probably be much more susceptible to user disuse or misuse.  I suspect that when the dust settles, both takes on the burst buffer concept will make their way into production use.

A lot of the talk and technologies surrounding burst buffers are shrouded in NNSA secrecy or vendor non-disclosures, so I'm not sure what more there is to be said.  However, the good folks at HPCwire ran an insightful article on burst buffers after the NERSC-8 announcement for those who are interested in more detail.

Data curation

The final theme that bubbled just beneath the surface of the DOE workshops was the idea that we are coming upon an era where scientists can no longer save all their data from all their calculations in perpetuity.  Rather, someone will have to become the curator of the scientific data being generated by computations and figure out what is and is not worth keeping, and how or where that data should be stored and managed.  This concept of selectively retaining user data manifested in a variety of discussions ranging from in-place data sharing and publication with Globus Plus and science DMZs to transparently managing online data volumes with hierarchical storage management (HSM).  However, the common idea was that scientists are going to have to start coming to grips with data management themselves, as facilities will soon be unable to cope with the entirety of their users' data.

This was a particularly interesting problem to me because it very closely echoed the sentiments that came about from Datanami's recent LeverageBIGDATA event which had a much more industry-minded audience.  The general consensus is that several fields are far ahead of the pack in terms of addressing this issue; the high-energy physics community has been filtering data at its genesis (e.g., ignoring the data from uninteresting collision events) for years now, and enterprises seem comfortable with retaining marketing data for only as long as it is useful.  By comparison, NERSC's tape archive has not discarded user data since its inception several decades ago; each new tape system simply repacks the previous generation's tape to roll all old data forward.

All of the proposed solutions for this problem revolve around metadata.  The reality is that not all user data has equal importance, and there is a need to provide a mechanism for users (or their applications) to describe this fact.  For example, the principal use case for the aforementioned burst buffers is to store massive checkpoint-restart files; while these checkpoints are important to retain while a calculation is running, they have limited value after the calculation has completed.  Rather than rely on a user to manually recognize that these checkpoints can be deleted, the hope is that metadata attributes can be attached to these checkpoint files to indicate that they are not critical data that must be retained forever for automated curation systems to understand.

The exact way this metadata would be used to manage space on a file system remains poorly defined.  A few examples of exactly how metadata can be used to manage data volume in data-intensive scientific computing environments include
  • tagging certain files or directories as permanent or ephemeral, signaling that the file system can purge certain files whenever a cleanup is initiated;
  • tagging certain files with a set expiration date, either as an option or by default.  When a file ages beyond a certain point, it would be deleted;
  • attributing a sliding scale of "importance" to each file, so that files of low importance can be transparently migrated to tape via HSM
Some of these concepts are already implemented, but the ability for users and applications to attach extensible metadata to files in a file system-agnostic way does not yet exist.  I think this is a significant gap in technology that will need to be filled in very short order as pre-exascale machines begin to demonstrate the ability to generate tremendous I/O loads.  Frankly, I'm surprised this issue hasn't been solved in a broadly deployable way yet.

The good news here is that the problem of curating digital data is not new; it is simply new to high-performance computing.  In the spirit of doing things the right way, DOE invited the director of LANL's Research Library to attend the workshops, and she provided valuable insights into how methods of digital data curation may be applied to these emerging challenges in data-intensive scientific computing.

Final Thoughts

The products of the working groups' conventions at the HPC Operational Review are being assembled into a report to be delivered to DOE's Office of Science, and it should be available online at the HPCOR 2014 website as well as the usual DOE document repository in a few months.  Hopefully it will reflect what I feel was the essence of the workshop, but at any rate, it should contain a nice perspective on how we can expect the HPC community to address the new demands emerging from data-intensive scientific computing (DISC) community.

In the context of high-performance computing, 
  • Workflow management systems will continue to gain importance as data sets become larger, more parallel, and more unwieldy.
  • Burst buffers, in one form or another, will become the hardware solution to the fact that all exascale simulations will become data-intensive problems.
  • Data curation frameworks are the final piece of the puzzle and will provide the manageability of data at rest.
None of these three legs are fully developed, and this is simply an indication of data-intensive scientific computing's immaturity relative to more traditional high-performance computing:  
  • Workflows need to converge on some sort of standardized API or feature set in order to provide the incentive to users to abandon their one-off solutions.
  • Burst buffer technology has diverged into two solutions centered at either the compute or storage side of a DISC platform; both serve different workloads, and the underlying hardware and software configurations remain unfinished.
  • Effective data curation requires a metadata management system that will allow both users and their applications to identify the importance of data to automate sensible data retention policy enforcement and HSM.
Of course, I could be way off in terms of what I took away from these meetings seeing as how I don't really know what I'm talking about.  Either way, it was a real treat to be invited out to hang out with the DOE folks for a week; I got to meet some of my personal supercomputing heroes, share war stories, and make some new pals.

I also got to spend eight days getting to know the Bay Area.  So as not to leave this post entirely without a picture,


I also learned that I have a weird fascination with streetcars.  I'm glad I was introduced to supercomputers first.

Saturday, June 7, 2014

Spark on Supercomputers: A Few Notes

I've been working with Apache Spark quite a bit lately in an effort to bring it into the fold as a viable tool for solving some of the data-intensive problems encountered in supercomputing.  I've already added support for provisioning Spark clusters to a branch of the myHadoop framework I maintain so that Slurm, Torque, and SGE users can begin playing with it, and as a result of these efforts, I've discovering a number of interesting issues with Spark running on traditional supercomputers.

At this point in time, Spark is very rough around the edges.  The core implementation of resilient distributed datasets are all there and work wonderfully, but I've found that it doesn't take long to start discovering bugs and half-implemented features that can get very confusing very quickly.  Perhaps half of the problems I've faced are the result of the fact that I have been trying to run Spark in non-traditional ways (for example, over hosts' TCP over InfiniBand interfaces and with non-default config directories), and although the documentation claims to support all of the features necessary to make this possible, the reality is a bit different.

What follows are just some incoherent notes I've taken while porting Spark to the myHadoop framework.  Spark is rapidly developing and it is constantly improving, so I hope this post becomes outdated as the Spark developers make the framework more robust.

Control Script Problems

Hadoop and Spark both ship with "control scripts" or "cluster launch scripts" that facilitate the starting and stopping of the entire cluster of daemons.  At the highest level, this includes start-all.sh and stop-all.sh, which make calls to start-dfs.sh and start-yarn.sh (in Hadoop) and start-master.sh and start-slaves.sh.  In Hadoop, these scripts work wonderfully, but Spark's implementation of these control scripts is still quite immature because they carry implicit assumptions about users' Spark configurations.

Like Hadoop, Spark supports a spark-env.sh file (located in $SPARK_CONF_DIR) which defines environment variables for all of the remote Spark workers that are spawned across the cluster.  This file is an ideal place to put the following environment variable definitions:
  • SPARK_MASTER_IP - the default value for this is `hostname` which is generally not a great default on most clusters.  On Rocks, we append ".ibnet" to the hostname to get Spark to operate over the InfiniBand fabric.
  • SPARK_LOCAL_IP - again, ensure that this is set up to use the correct interface on the cluster.  We append .ibnet on Rocks.
  • SPARK_HOME, SPARK_PREFIX, and SPARK_CONF_DIR should also be defined here since spark-env.sh will usually override the variables defined by spark-config.sh (see below)
$SPARK_HOME/sbin/spark-config.sh is where much of the Spark control scripts' "intelligence" comes from as far as defining the environment variables that Spark needs to launch.  In particular, spark-config.sh defines the following variables before reading spark-env.sh:
  • SPARK_PREFIX
  • SPARK_HOME
  • SPARK_CONF_DIR
The problem is that spark-config.sh will stomp all over anything the user defines for the above variables, and since spark-config.sh is called from within all of the Spark control scripts (both evoked by the user and evoked by sub-processes on remote hosts during the daemon spawning process), trying to get Spark to use non-default values for SPARK_CONF_DIR (e.g., exactly what myHadoop does) gets to be tedious.

The Spark developers tried to work around this by having the control scripts call spark-env.sh after spark-config.sh, meaning you should be able to define your own SPARK_CONF_DIR in spark-env.sh.  Unfortunately, this mechanism of calling spark-env.sh after spark-config.sh appears as

. "$sbin/spark-config.sh"

if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
  . "${SPARK_CONF_DIR}/spark-env.sh"
fi

That is, spark-config.sh will stomp all over any user-specified SPARK_CONF_DIR, and then use the SPARK_CONF_DIR from spark-config.sh to look for spark-env.sh.  Thus, there is no actual way to get the Spark control scripts (as of version 0.9) to honor the user-specified SPARK_CONF_DIR.  It looks like the latest commits to Spark have started to address this, but a cursory glance over the newest control scripts suggests that this remains broken.

Anyway, as a result of this, myHadoop's Spark integration eschews the Spark control scripts and handles spawning the daemons more directly using the manual method of spawning slaves.  Doing this averts the following issues:
  1. start-slaves.sh can't find any slaves because it always looks for $SPARK_HOME/etc/slaves.  This can be worked around by passing SPARK_SLAVES=$SPARK_CONF_DIR/slaves to start-slaves.sh for a non-default SPARK_CONF_DIR.
  2. stop-master.sh doesn't do anything useful because you still need to kill -9 the master process by hand.  Not sure why this is the case.

Deciphering Spark Errors

Here are various cryptic stack traces I've encountered while working on Spark.  I kept these mostly for myself, but I've started meeting people that hit the same problems and thought it might be worthwhile to share the diagnoses I've found.

In general, Spark seems to work best when used conservatively, but when you start doing things that do not strictly fall within the anticipated use case, things break in strange ways.  For example, if you try to write an RDD with an empty element (e.g., a text file with empty lines), you would get this really crazy error that does not actually say anything meaningful:

14/04/30 16:23:07 ERROR Executor: Exception in task ID 19
scala.MatchError: 0 (of class java.lang.Integer)
     at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110)
     at org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:153)
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
     at org.apache.spark.scheduler.Task.run(Task.scala:53)
     at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
     at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
     at java.lang.Thread.run(Thread.java:722)

I filed a bug report about this particular problem and the issue has been fixed, but it's just one of those edge cases where Spark will fail catastrophically (I had to look at the source code to figure out what "scala.MatchError" meant).  Usually you wouldn't be operating on empty data sets, but I discovered this error when I was trying to quickly determine if my Spark slaves were communicating with my master correctly by issuing

file = sc.textFile('hdfs://master.ibnet0/user/glock/input.txt')
file.saveAsTextFile('hdfs://master.ibnet0/user/glock/output')

That is, simply reading in a file and writing it back out with pyspark would cause catastrophic failure.  This is what I meant when I say Spark's still rough around the edges.

Here are a few more errors I've encountered.  They're not problems with Spark, but the stack traces and exceptions thrown can be a little mysterious.  I'm pasting it all here for the sake of googlers who may run into these same problems.

If you try to use Spark built against Hadoop 2 with a Hadoop 1 HDFS, you'll get this IPC error:

>>> file.saveAsTextFile('hdfs://s12ib:54310/user/glock/gutenberg.out')
Traceback (most recent call last):
  File "", line 1, in 
  File "/home/glock/apps/spark-0.9.0/python/pyspark/rdd.py", line 682, in saveAsTextFile
    keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
  File "/home/glock/apps/spark-0.9.0/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line 537, in __call__
  File "/home/glock/apps/spark-0.9.0/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o23.saveAsTextFile.
: org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
     at org.apache.hadoop.ipc.Client.call(Client.java:1070)
     at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
     at $Proxy7.getProtocolVersion(Unknown Source)
     at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
     at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)


If your Pythons aren't all the same version across the nodes when Spark workers are instantiated, you might get a cryptic error like this when trying to call the count() method on an RDD:

14/04/30 16:15:11 ERROR Executor: Exception in task ID 12
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop1/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop1/python/pyspark/serializers.py", line 182, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop1/python/pyspark/serializers.py", line 117, in dump_stream
    for obj in iterator:
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop1/python/pyspark/serializers.py", line 171, in _batched
    for item in iterator:
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop1/python/pyspark/rdd.py", line 493, in func
    if acc is None:
TypeError: an integer is required

     at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:131)
     at org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:153)
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
     at org.apache.spark.scheduler.Task.run(Task.scala:53)
     at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
     at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
     at java.lang.Thread.run(Thread.java:722)


If you try to write an RDD to a file with mismatched Python versions, or if you were using anything earlier than Python 2.7 (e.g., 2.6) with any Spark version earlier than 1.0.0, you'd see this:

14/04/30 17:53:20 WARN scheduler.TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/serializers.py", line 117, in dump_stream
    for obj in iterator:
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", line 677, in func
    if not isinstance(x, basestring):
SystemError: unknown opcode

     at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:131)
     at org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:153)
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
     at org.apache.spark.scheduler.Task.run(Task.scala:53)
     at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
     at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
     at java.lang.Thread.run(Thread.java:722)


If your HDFS URI is wrong, the error message actually makes sense.  It is buried quite deeply though.

Traceback (most recent call last):
  File "", line 1, in 
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", line 682, in saveAsTextFile
    keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line 537, in __call__
  File "/home/glock/apps/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o23.saveAsTextFile.
: java.lang.IllegalArgumentException: java.net.UnknownHostException: s12ib.ibnet0
     at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:418)
     at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:231)
     at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:139)
     at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:510)
     at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:453)
     at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:136)
     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
     at org.apache.hadoop.mapred.SparkHadoopWriter$.createPathFromString(SparkHadoopWriter.scala:193)
     at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:685)
     at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:572)
     at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:894)
     at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:355)
     at org.apache.spark.api.java.JavaRDD.saveAsTextFile(JavaRDD.scala:27)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
     at java.lang.reflect.Method.invoke(Method.java:597)
     at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
     at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
     at py4j.Gateway.invoke(Gateway.java:259)
     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
     at py4j.commands.CallCommand.execute(CallCommand.java:79)
     at py4j.GatewayConnection.run(GatewayConnection.java:207)
     at java.lang.Thread.run(Thread.java:619)
Caused by: java.net.UnknownHostException: s12ib.ibnet0
     ... 29 more