Instagram Engineering

I’m a big fan of startups and generally lots of different technologies — particularly cool stuff I’ve never even heard of. One business that’s been kicking ass and taking names is Instagram. Their engineering prowess is impressive, if, for no other reason, they explain it in a fundamental, easy to understand way for those who have been around technology for a while and understand a lot of the core competencies. One could make the argument, though, that by telling the planet what your back-end looks like, you open yourself up to easy competition. Granted the details are missing and like anything else It’s not as easy as it looks, but just knowing what tools work well together is a HUGE migraine you won’t have to worry about. At any rate, Instagram engineering irregularly posted these design snippets to a Tumblr blog.

A few days ago, Instagram, after a mere 2 years in operation was acquired by Facebook for a paltry 2 billion US dollars. I don’t know if they’re headed toward corporate assimilation, or if, as Zuckerberg claimed, they will stay independent. I’m not taking any chances that the very cool write-ups they did will eventually be wiped from the Internet. What follows are all the entries (as of today) from http://instagram-engineering.tumblr.com/ for the simple fact that I hate digging for cool articles that have been removed.

Good luck guys. I hope it was a good decision.

Keeping Instagram up with over a million new users in twelve hours

On Tuesday we launched Instagram for Android, and it’s had a fantastic response so far. The last few weeks (on the infrastructure side) have been all about capacity planning and preparation to get everything in place, but on launch day itself the challenge is to find problems quickly, get to the bottom of them, and roll out fixes ASAP. Here are some tools & techniques we used to tackle problems as they arose:

statsd

We love statsd at Instagram. Written by Etsy, it’s a network daemon that aggregates and rolls-up data into Graphite. At its core, it has two types of statistics: counter and timers. We use the counters to track everything from number of signups per second to number of likes, and we use timers to time generation of feeds, how long it takes to follow users, and any other major action.

The single biggest reason we love statsd is how quickly stats show up and get updated in Graphite. Stats are basically realtime (in our system, they’re about 10 seconds delayed), which allows us to evaluate system and code changes immediately. Stats can be added at will, so if we discover a new metric to track, we can have it up and running very quickly. You can specify a sample rate, so we sprinkle logging calls throughout the web application at relatively low sample rates, without affecting performance.

Takeaway: having realtime stats that can be added dynamically lets you diagnose and firefight without having to wait to receive new data.

Dogslow

Written by Bitbucket, Dogslow is a piece of Django middleware that will watch your running processes, and if notices any taking longer than N seconds, will snapshot the current process and write the file to disk. We’ve found it’s too intrusive to run all the time, but when trying to identify bottlenecks that may have cropped up, it’s very useful (we’ve added a switch to enable it in our web servers).

We found, halfway through launch day, that processes that were taking over 1.5s to return a response were often stuck in memcached set() and get_many(). Switching over to Munin, which we use to track our machine stats over time, we saw that our memcached boxes were pushing 50k req/s, and though they weren’t maxing out the CPU, they were busy enough to slow down the application servers.

Takeaway: it’s often one piece of the backend infrastructure that becomes a bottleneck, and figuring out the point at which your real, live appservers get stuck can help surface the issue.

Replication & Read-slaves

Two of our main data backends—Redis and PostgreSQL—both support easy replication and read-slaving. When one of our Redis DBs crossed 40k req/s, and started becoming a bottleneck, bringing up another machine, SYNCing to the master, and sending read queries to it took less than 20 minutes. For machines we knew would be busy ahead of time, we’d brought up read-slaves, but in a couple of cases, machines reacted differently under load than we’d projected, and it was useful to split reads off quickly.

For Postgres, we use a combination of Streaming Replication and Amazon EBS Snapshots to bring up a new read-slave quickly. All of our master DBs stream to backup slaves that take frequent EBS snapshots; from these snapshots, we can have a new read-slave up and running, and caught up to the master, in around 20 minutes. Having our machines in an easily scriptable environment like AWS make provisioning and deploying new read-slaves a quick command-line task.

Takeaway: if read capacity is likely to be a concern, bringing up read-slaves ahead of time and getting them in rotation is ideal; if any new read issues crop up, however, know ahead of time what your options are for bringing more read capacity into rotation.

PGFouine

PGFouine is a tool that analyzes PostgreSQL query logs and generates a page of analytics on their impact on your database; sliced by the “heaviest”, or most frequent, or slowest queries. To ease running it, we’ve created a Fabric script that will connect to a database, set it to log every query, wait 30 seconds, then download the file and run a pgfouine analysis on it; it’s available as a gist. PGFouine is our core tool in analyzing database performance and figuring out which queries could use memcached in front of them, which ones are fetching more data than is necessary, etc; as DBs showed signs of stress on launch day, we would run PGFouine, deploy targeted code improvement to relieve hotspots, and then run it again to make sure those changes had the correct effect.

It’s important to know what a “normal” day looks like for your databases, too, for a baseline, so we run PGFouine periodically to gather statistics on non-stressed-out database instances, too.

Takeaway: Database log analysis (especially coupled with a tight iteration loop on optimizing queries and caching what’s needed)

One more thing

Another tool that helped us get through the first day was one we wrote ourselves—node2dm, a node.js server for delivering push notifications to Android’s C2DM service. It’s handled over 5 million push notifications for us so far.

We surveyed the different options for C2DM servers, but didn’t find any open source ones that looked like they were being actively maintained, or fully supported the Google service. We’re open sourcing node2dm today; feel free to fork and pull-request if you have any suggestions for improvements.

Interested?

If all of this is interesting/exciting to you, and you’d like to chat more about working with us, drop us a note; we’d love to hear from you.

You can discuss this post at Hacker News.

Mike Krieger, co-founder

What Powers Instagram: Hundreds of Instances, Dozens of Technologies

One of the questions we always get asked at meet-ups and conversations with other engineers is, “what’s your stack?” We thought it would be fun to give a sense of all the systems that power Instagram, at a high-level; you can look forward to more in-depth descriptions of some of these systems in the future. This is how our system has evolved in the just-over-1-year that we’ve been live, and while there are parts we’re always re-working, this is a glimpse of how a startup with a small engineering team can scale to our 14 million+ users in a little over a year. Our core principles when choosing a system are:

  • Keep it very simple
  • Don’t re-invent the wheel
  • Go with proven and solid technologies when you can

We’ll go from top to bottom:

OS / Hosting

We run Ubuntu Linux 11.04 (“Natty Narwhal”) on Amazon EC2. We’ve found previous versions of Ubuntu had all sorts of unpredictable freezing episodes on EC2 under high traffic, but Natty has been solid. We’ve only got 3 engineers, and our needs are still evolving, so self-hosting isn’t an option we’ve explored too deeply yet, though is something we may revisit in the future given the unparalleled growth in usage.

Load Balancing

Every request to Instagram servers goes through load balancing machines; we used to run 2 nginx machines and DNS Round-Robin between them. The downside of this approach is the time it takes for DNS to update in case one of the machines needs to get decomissioned. Recently, we moved to using Amazon’s Elastic Load Balancer, with 3 NGINX instances behind it that can be swapped in and out (and are automatically taken out of rotation if they fail a health check). We also terminate our SSL at the ELB level, which lessens the CPU load on nginx. We use Amazon’s Route53 for DNS, which they’ve recently added a pretty good GUI tool for in the AWS console.

Application Servers

Next up comes the application servers that handle our requests. We run Django on Amazon High-CPU Extra-Large machines, and as our usage grows we’ve gone from just a few of these machines to over 25 of them (luckily, this is one area that’s easy to horizontally scale as they are stateless). We’ve found that our particular work-load is very CPU-bound rather than memory-bound, so the High-CPU Extra-Large instance type provides the right balance of memory and CPU.

We use http://gunicorn.org/ as our WSGI server; we used to use mod_wsgi and Apache, but found Gunicorn was much easier to configure, and less CPU-intensive. To run commands on many instances at once (like deploying code), we use Fabric, which recently added a useful parallel mode so that deploys take a matter of seconds.

Data storage

Most of our data (users, photo metadata, tags, etc) lives in PostgreSQL; we’ve previously written about how we shard across our different Postgres instances. Our main shard cluster involves 12 Quadruple Extra-Large memory instances (and twelve replicas in a different zone.)

We’ve found that Amazon’s network disk system (EBS) doesn’t support enough disk seeks per second, so having all of our working set in memory is extremely important. To get reasonable IO performance, we set up our EBS drives in a software RAID using mdadm.

As a quick tip, we’ve found that vmtouch is a fantastic tool for managing what data is in memory, especially when failing over from one machine to another where there is no active memory profile already. Here is the script we use to parse the output of a vmtouch run on one machine and print out the corresponding vmtouch command to run on another system to match its current memory status.

All of our PostgreSQL instances run in a master-replica setup using Streaming Replication, and we use EBS snapshotting to take frequent backups of our systems. We use XFS as our file system, which lets us freeze & unfreeze the RAID arrays when snapshotting, in order to guarantee a consistent snapshot (our original inspiration came from ec2-consistent-snapshot. To get streaming replication started, our favorite tool is repmgr by the folks at 2ndQuadrant.

To connect to our databases from our app servers, we made early on that had a huge impact on performance was using Pgbouncer to pool our connections to PostgreSQL. We found Christophe Pettus’s blog to be a great resource for Django, PostgreSQL and Pgbouncer tips.

The photos themselves go straight to Amazon S3, which currently stores several terabytes of photo data for us. We use Amazon CloudFront as our CDN, which helps with image load times from users around the world (like in Japan, our second most-popular country).

We also use Redis extensively; it powers our main feed, our activity feed, our sessions system (here’s our Django session backend), and other related systems. All of Redis’ data needs to fit in memory, so we end up running several Quadruple Extra-Large Memory instances for Redis, too, and occasionally shard across a few Redis instances for any given subsystem. We run Redis in a master-replica setup, and have the replicas constantly saving the DB out to disk, and finally use EBS snapshots to backup those DB dumps (we found that dumping the DB on the master was too taxing). Since Redis allows writes to its replicas, it makes for very easy online failover to a new Redis machine, without requiring any downtime.

For our geo-search API, we used PostgreSQL for many months, but once our Media entries were sharded, moved over to using Apache Solr. It has a simple JSON interface, so as far as our application is concerned, it’s just another API to consume.

Finally, like any modern Web service, we use Memcached for caching, and currently have 6 Memcached instances, which we connect to using pylibmc & libmemcached. Amazon has an Elastic Cache service they’ve recently launched, but it’s not any cheaper than running our instances, so we haven’t pushed ourselves to switch quite yet.

Task Queue & Push Notifications

When a user decides to share out an Instagram photo to Twitter or Facebook, or when we need to notify one of our Real-time subscribers of a new photo posted, we push that task into Gearman, a task queue system originally written at Danga. Doing it asynchronously through the task queue means that media uploads can finish quickly, while the ‘heavy lifting’ can run in the background. We have about 200 workers (all written in Python) consuming the task queue at any given time, split between the services we share to. We also do our feed fan-out in Gearman, so posting is as responsive for a new user as it is for a user with many followers.

For doing push notifications, the most cost-effective solution we found was https://github.com/samuraisam/pyapns, an open-source Twisted service that has handled over a billion push notifications for us, and has been rock-solid.

Monitoring

With 100+ instances, it’s important to keep on top of what’s going on across the board. We use Munin to graph metrics across all of our system, and also alert us if anything is outside of its normal range. We write a lot of custom Munin plugins, building on top of Python-Munin, to graph metrics that aren’t system-level (for example, signups per minute, photos posted per second, etc). We use Pingdom for external monitoring of the service, and PagerDuty for handling notifications and incidents.

For Python error reporting, we use Sentry, an awesome open-source Django app written by the folks at Disqus. At any given time, we can sign-on and see what errors are happening across our system, in real time.

You?

If this description of our systems interests you, or if you’re hopping up and down ready to tell us all the things you’d change in the system, we’d love to hear from you. We’re looking for a DevOps person to join us and help us tame our EC2 instance herd.

Instagram Engineering Challenge: The Unshredder

In our office, we have a pretty amazing paper shredder. Seriously, the thing shreds just about anything. It even has a special slot for credit cards (why anyone would want to regularly shred credit cards is beyond me, but I digress…).

One day, after shredding some paper, I thought to myself: shredding paper is a pretty insecure way of destroying important stuff. I figured, it’s a small set of shreds that are all relatively uniform in width and could be pieced back together algorithmically in a fraction of a second.

So, I sat down and though about what approach I’d use to piece the document back together. It’s unlike a regular puzzle in that all the pieces are exactly the same size, so you can’t rely upon the spatial domain to solve piecing shreds together. However, if you think about it, there’s a pretty simple approach that would allow you to find matches in a different domain. That is, imagine you’re sitting there trying to find a match between two pieces. What are you looking for to decide whether they’re a fit or not?

Anyway, we got really excited about writing a script to take in an image of shreds of paper and piece them back into an original document. It’s an interesting challenge that marries image processing with an interesting algorithmic challenge as well.

THE CHALLENGE

Your challenge, if you choose to accept it, is to write a simple script that takes a shredded image in as input:

and outputs an unshredded and reconstituted image. That is, imagine if you took an image, divided it into an even number of columns and shuffled those columns randomly to produce a shredded image. Then, take that image into the script and output the original image:

We tackled this, and our solution took a few hours plus another few hours for the bonus challenge (more on that later).

THE REWARD

Due to overwhelming response, we’ve run out of our entire stock of tee-shirts! With future challenges we’ll be offering a reward for the first group of people who respond.

GUIDELINES

1) Choose a scripting language of your choice. We chose Python for its relative ease prototyping and availability of the Python Imaging Library (PIL) that allowed us to do the image stuff we wanted to do. You can easily use something like C++ or Ruby for this as well.

2) Produce a script that reads in a shredded image (like the one below) and produces the original image. For this image, you can assume shreds are 32 pixels wide and uniformly spaced across the image horizontally. These shreds are scattered at random and if rearranged, will yield the original image.

Use this image as the source image – it’s 640 pixels wide and 359 pixels high.

3) Your solution should algorithmically unshred the image. This means it should work on arbitrarily shredded images we feed your script that are shredded in the same manner.

4) BONUS CHALLENGE: We went the extra mile and made our script even spiffier by auto-detecting how wide the uniform strips are. Extra bonus points to anyone who works this into their solution. But first, we’d recommend getting your script to work assuming 32 pixel-wide shreds. For this you can assume shreds will never end up next to each other correctly in the source image.

5) The key to this problem is being able to access pixel data in the image. We used Python Imaging Library – PIL (http://www.pythonware.com/products/pil/) which made it very easy to parse. See the PIL tips below. If you’re using Ruby, check out RMagick (http://rmagick.rubyforge.org/) which is a gem that serves the same purpose as PIL. C++ has the boost libraries and included is “GIL” which will help you. If you’re using another language, there are most certainly equivalents of PIL, RMagick, and GIL.

SUBMIT YOUR SOLUTION

We’re no longer offering the tee-shirt reward but if you’re still interested in working with us, please submit your information & a link to your solution here: http://bit.ly/unshredder

PIL TIPS

from PIL import Image
image = Image.open(‘file.jpg’)
data = image.getdata() # This gets pixel data

# Access an arbitrary pixel. Data is stored as a 2d array where rows are
# sequential. Each element in the array is a RGBA tuple (red, green, blue,
# alpha).

x, y = 20, 90
def get_pixel_value(x, y):
width, height = image.size
pixel = data[y * width + x]
return pixel
print get_pixel_value(20, 30)

# Create a new image of the same size as the original
# and copy a region into the new image
NUMBER_OF_COLUMNS = 5
unshredded = Image.new(“RGBA”, image.size)
shred_width = unshredded.size[0]/NUMBER_OF_COLUMNS
shred_number = 1
x1, y1 = shred_width * shred_number, 0
x2, y2 = x1 + shred_width, height
source_region = image.crop(x1, y1, x2, y2)
destination_point = (0, 0)
unshredded.paste(source_region, destination_point)
# Output the new image
unshredded.save(“unshredded.jpg”, “JPEG”)

TIPS

1) Don’t overthink it. Use of really complex algorithms isn’t needed. Our solution WITH the bonus exercise comes in at just over 150 lines of python.

2) Think about how you would quantify whether or not two shreds ‘fit’ together by using pixel data

3) Assume you’re using the source image, or other normal photographs without edge-case patterns.

4) There are edge cases where the script we wrote with our approach will not work because of repeating patterns. This is OK in your script as well. Don’t worry about special cases – focus on making the sample images work that we’ve provided.

4) Bonus Challenge: If you decide you want to auto-detect how many columns there are in an image, you should remember that there are a finite amount of columns that are possible given an image of a certain width if you assume columns are evenly distributed and uniformly sized.

SHREDDER

If you’d like to produce your own sample images, you can use our simple script here to generate some:

from PIL import Image
from random import shuffle

SHREDS = 10
image = Image.open(“sample.png”)
shredded = Image.new(“RGBA”, image.size)
width, height = image.size
shred_width = width/SHREDS
sequence = range(0, SHREDS)
shuffle(sequence)

for i, shred_index in enumerate(sequence):
shred_x1, shred_y1 = shred_width * shred_index, 0
shred_x2, shred_y2 = shred_x1 + shred_width, height
region =image.crop((shred_x1, shred_y1, shred_x2, shred_y2))
shredded.paste(region, (shred_width * i, 0))

shredded.save(“sample_shredded.png”)

Storing hundreds of millions of simple key-value pairs in Redis

When transitioning systems, sometimes you have to build a little scaffolding. At Instagram, we recently had to do just that: for legacy reasons, we need to keep around a mapping of about 300 million photos back to the user ID that created them, in order to know which shard to query (see more info about our sharding setup). While eventually all clients and API applications will have been updated to pass us the full information, there are still plenty who have old information cached. We needed a solution that would:

  1. Look up keys and return values very quickly
  2. Fit the data in memory, and ideally within one of the EC2 high-memory types (the 17GB or 34GB, rather than the 68GB instance type)
  3. Fit well into our existing infrastructure
  4. Be persistent, so that we wouldn’t have to re-populate it if a server died

One simple solution to this problem would be to simply store them as a bunch of rows in a database, with “Media ID” and “User ID” columns. However, a SQL database seemed like overkill given that these IDs were never updated (only inserted), didn’t need to be transactional, and didn’t have any relations with other tables.

Instead, we turned to Redis, an advanced key-value store that we use extensively here at Instagram (for example, it powers our main feed). Redis is a key-value swiss-army knife; rather than just normal “Set key, get key” mechanics like Memcached, it provides powerful aggregate types like sorted sets and lists. It has a configurable persistence model, where it background saves at a specified interval, and can be run in a master-slave setup. All of our Redis deployments run in master-slave, with the slave set to save to disk about every minute.

At first, we decided to use Redis in the simplest way possible: for each ID, the key would be the media ID, and the value would be the user ID:

SET media:1155315 939 GET media:1155315 > 939 

While prototyping this solution, however, we found that Redis needed about 70 MB to store 1,000,000 keys this way. Extrapolating to the 300,000,000 we would eventually need, it was looking to be around 21GB worth of data—already bigger than the 17GB instance type on Amazon EC2.

We asked the always-helpful Pieter Noordhuis, one of Redis’ core developers, for input, and he suggested we use Redis hashes. Hashes in Redis are dictionaries that are can be encoded in memory very efficiently; the Redis setting ‘hash-zipmap-max-entries’ configures the maximum number of entries a hash can have while still being encoded efficiently. We found this setting was best around 1000; any higher and the HSET commands would cause noticeable CPU activity. For more details, you can check out the zipmap source file.

To take advantage of the hash type, we bucket all our Media IDs into buckets of 1000 (we just take the ID, divide by 1000 and discard the remainder). That determines which key we fall into; next, within the hash that lives at that key, the Media ID is the lookup key *within* the hash, and the user ID is the value. An example, given a Media ID of 1155315, which means it falls into bucket 1155 (1155315 / 1000 = 1155):

HSET "mediabucket:1155" "1155315" "939" HGET "mediabucket:1155" "1155315" > "939" 

The size difference was pretty striking; with our 1,000,000 key prototype (encoded into 1,000 hashes of 1,000 sub-keys each), Redis only needs 16MB to store the information. Expanding to 300 million keys, the total is just under 5GB—which in fact, even fits in the much cheaper m1.large instance type on Amazon, about 1/3 of the cost of the larger instance we would have needed otherwise. Best of all, lookups in hashes are still O(1), making them very quick.

If you’re interested in trying these combinations out, the script we used to run these tests is available as a Gist on GitHub (we also included Memcached in the script, for comparison—it took about 52MB for the million keys). And if you’re interested in working on these sorts of problems with us, drop us a note, we’re hiring!.
Mike Krieger, co-founder

Simplifying EC2 SSH Connections

Here at Instagram, we run our infrastructure on Amazon Web Services, running instances on their Elastic Compute Cloud (EC2). Since we’re often spinning up new machines and changing details of our infrastructure, there’s an ever-growing list of machines that we SSH into.

To authenticate with our instances, we use public key authentication (the recommended way of doing SSH log-ins to EC2 machines), but we need to figure out what host we’re connecting to first. A common way of connecting to an EC2 instance is via it’s public hostname (ec2-x-x-x-x.compute-1.amazonaws.com). However, managing a list of these hostnames once there are several of them is tedious. Because of this, we wrote EC2-SSH, a set of Python scripts that help easily connect to EC2 instances, and that we’re open sourcing for the community’s benefit.

EC2-SSH takes advantage of the EC2 instance tagging. If you’re using the web console, when launching a new instance, Amazon prompts you to provide an optional value for the (pre-filled) “Name” tag. Tags can also be edited using the available EC2 command line tools.

Assuming you’ve already tagged all of your instances with names, using EC2-SSH is as easy as using regular SSH with hostnames— because, behind the scenes, that’s all it’s really doing. Better illustrated with an example: Let’s assume you have an instance tagged with the name “nginx3”; using EC2-SSH you could connect to the instance by typing `ec2-ssh nginx3` into your terminal.

EC2-SSH first calls the Amazon EC2 web service, resolving the tag name (in this case “nginx3”) to the public DNS address. It then substitutes out the tag name with the hostname and sends it, along with any other arguments and parameters, to `/usr/bin/ssh`.

The `ec2-ssh` script is small shell script that calls another Python script, `ec2-host`, that eventually calls `/usr/bin/ssh`. Let’s detail out the process in depth:

The Python script `ec2-host` is distributed in the EC2-SSH Python package and can be used unaccompanied— you might find it rather useful, in fact. Let’s take a look at it’s usage output:

% ec2-host --help Usage: ec2-host [-k KEY] [-s SECRET] [NAME] Prints server host name. --help display this help and exit -k, --aws-key KEY EC2 Key, defaults to ENV[AWS_ACCESS_KEY_ID]
 -s, --aws-secret SECRET EC2 Secret, defaults to ENV[AWS_SECRET_ACCESS_KEY] 

By default, with no arguments, `ec2-host` host will return a list of all running EC2 instances and their associated public host names. I often use `ec2-host` this way, combined with grep I use it to filter out and identify a specific instance, or set of instances. Here’s an example:

% ec2-host | grep django django1 ec2-x-x-x-x.amazonaws.com django2 ec2-x-x-x-x.amazonaws.com .... 

When passing the value of an instance’s “Name” tag as an argument, ec2-host will return the associated public hostname. This is exactly what the `ec2-ssh` shell script does. Here’s an example:

% ec2-host nginx2 ec2-x-x-x-x.compute-1.amazonaws.com 

You may be asking how `ec2-host` has access to enumerate over your running EC2 instances, that’s a valid question. Before you start using `ec2-ssh` or `ec2-host` you have to provide your AWS key and secret. You can pass them via command line arguments to `ec2-host` like this:

% ec2-host --aws-key  AKJASKSA1234JDSJ8123 --aws-secret B3JDJRYQ1234QWRHFJ1234AJJDAH1kjd1234 

To save time, you can also set the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables.

Putting it all together, if you issue the following command, you’ll be signed in to the instance in your system with the specified name:

% ec2-ssh nginx3

Any arguments after the instance name will be passed into ssh:

% ec2-ssh nginx5 date Thu Oct 13 17:25:22 UTC 2011 

We’ve made these tools available via PyPy, so you can issue an `easy_install ec2-ssh` or a `pip install ec2-ssh` to install the tools. If you’d like to contribute, you can also fork the code on GitHub, or discuss this post on Hacker News . And if you’re interested in helping us scale our systems, we’d love to hear from you.

Shayne Sweeney, Mobile & Server Engineer

Sharding & IDs at Instagram

With more than 25 photos & 90 likes every second, we store a lot of data here at Instagram. To make sure all of our important data fits into memory and is available quickly for our users, we’ve begun to shard our data—in other words, place the data in many smaller buckets, each holding a part of the data.

Our application servers run Django with PostgreSQL as our back-end database. Our first question after deciding to shard out our data was whether PostgreSQL should remain our primary data-store, or whether we should switch to something else. We evaluated a few different NoSQL solutions, but ultimately decided that the solution that best suited our needs would be to shard our data across a set of PostgreSQL servers.

Before writing data into this set of servers, however, we had to solve the issue of how to assign unique identifiers to each piece of data in the database (for example, each photo posted in our system). The typical solution that works for a single database—just using a database’s natural auto-incrementing primary key feature—no longer works when data is being inserted into many databases at the same time. The rest of this blog post addresses how we tackled this issue.

Before starting out, we listed out what features were essential in our system:

  1. Generated IDs should be sortable by time (so a list of photo IDs, for example, could be sorted without fetching more information about the photos)
  2. IDs should ideally be 64 bits (for smaller indexes, and better storage in systems like Redis)
  3. The system should introduce as few new ‘moving parts’ as possible—a large part of how we’ve been able to scale Instagram with very few engineers is by choosing simple, easy-to-understand solutions that we trust.

Existing solutions

Many existing solutions to the ID generation problem exist; here are a few we considered:

Generate IDs in web application

This approach leaves ID generation entirely up to your application, and not up to the database at all. For example, MongoDB’s ObjectId, which is 12 bytes long and encodes the timestamp as the first component. Another popular approach is to use UUIDs.

Pros:

  1. Each application thread generates IDs independently, minimizing points of failure and contention for ID generation
  2. If you use a timestamp as the first component of the ID, the IDs remain time-sortable

Cons:

  1. Generally requires more storage space (96 bits or higher) to make reasonable uniqueness guarantees
  2. Some UUID types are completely random and have no natural sort

Generate IDs through dedicated service

Ex: Twitter’s Snowflake, a Thrift service that uses Apache ZooKeeper to coordinate nodes and then generates 64-bit unique IDs

Pros:

  1. Snowflake IDs are 64-bits, half the size of a UUID
  2. Can use time as first component and remain sortable
  3. Distributed system that can survive nodes dying

Cons:

  1. Would introduce additional complexity and more ‘moving parts’ (ZooKeeper, Snowflake servers) into our architecture

DB Ticket Servers

Uses the database’s auto-incrementing abilities to enforce uniqueness. Flickr uses this approach, but with two ticket DBs (one on odd numbers, the other on even) to avoid a single point of failure.

Pros:

  1. DBs are well understood and have pretty predictable scaling factors

Cons:

  1. Can eventually become a write bottleneck (though Flickr reports that, even at huge scale, it’s not an issue).
  2. An additional couple of machines (or EC2 instances) to admin
  3. If using a single DB, becomes single point of failure. If using multiple DBs, can no longer guarantee that they are sortable over time.

Of all the approaches above, Twitter’s Snowflake came the closest, but the additional complexity required to run an ID service was a point against it. Instead, we took a conceptually similar approach, but brought it inside PostgreSQL.

Our solution

Our sharded system consists of several thousand ‘logical’ shards that are mapped in code to far fewer physical shards. Using this approach, we can start with just a few database servers, and eventually move to many more, simply by moving a set of logical shards from one database to another, without having to re-bucket any of our data. We used Postgres’ schemas feature to make this easy to script and administrate.

Schemas (not to be confused with the SQL schema of an individual table) are a logical grouping feature in Postgres. Each Postgres DB can have several schemas, each of which can contain one or more tables. Table names must only be unique per-schema, not per-DB, and by default Postgres places everything in a schema named ‘public’.

Each ‘logical’ shard is a Postgres schema in our system, and each sharded table (for example, likes on our photos) exists inside each schema.

We’ve delegated ID creation to each table inside each shard, by using PL/PGSQL, Postgres’ internal programming language, and Postgres’ existing auto-increment functionality.

Each of our IDs consists of:

  • 41 bits for time in milliseconds (gives us 41 years of IDs with a custom epoch)
  • 13 bits that represent the logical shard ID
  • 10 bits that represent an auto-incrementing sequence, modulus 1024. This means we can generate 1024 IDs, per shard, per millisecond

Let’s walk through an example: let’s say it’s September 9th, 2011, at 5:00pm and our ‘epoch’ begins on January 1st, 2011. There have been 1387263000 milliseconds since the beginning of our epoch, so to start our ID, we fill the left-most 41 bits with this value with a left-shift:

id = 1387263000 << (64-41)

Next, we take the shard ID for this particular piece of data we’re trying to insert. Let’s say we’re sharding by user ID, and there are 2000 logical shards; if our user ID is 31341, then the shard ID is 31341 % 2000 -> 1341. We fill the next 13 bits with this value:

id |= 1341 << (64-41-13)

Finally, we take whatever the next value of our auto-increment sequence (this sequence is unique to each table in each schema) and fill out the remaining bits. Let’s say we’d generated 5,000 IDs for this table already; our next value is 5,001, which we take and mod by 1024 (so it fits in 10 bits) and include it too:

id |= (5001 % 1024)

We now have our ID, which we can return to the application server using the RETURNING keyword as part of the INSERT.

Here’s the PL/PGSQL that accomplishes all this (for an example schema insta5):

CREATE OR REPLACE FUNCTION insta5.next_id(OUT result bigint) AS $$ DECLARE our_epoch bigint := 1314220021721; seq_id bigint; now_millis bigint; shard_id int := 5; BEGIN SELECT nextval('insta5.table_id_seq') %% 1024 INTO seq_id; SELECT FLOOR(EXTRACT(EPOCH FROM clock_timestamp()) * 1000) INTO now_millis; result := (now_millis - our_epoch) << 23; result := result | (shard_id << 10); result := result | (seq_id); END; $$ LANGUAGE PLPGSQL; 

And when creating the table, we do:

CREATE TABLE insta5.our_table ( "id" bigint NOT NULL DEFAULT insta5.next_id(), ...rest of table schema... )

And that’s it! Primary keys that are unique across our application (and as a bonus, contain the shard ID in them for easier mapping). We’ve been rolling this approach into production and are happy with the results so far. Interested in helping us figure out these problems at scale? We’re hiring!

Mike Krieger, co-founder

Leave a Reply

Your email address will not be published. Required fields are marked *