If you’re a programmer now, there’s one reality you’d best be getting used to. People expect you to know how to deal with big data. The kind of data that will take a while to process. The kind that will crash your program if you try to bring it all into memory at the same time. But you also want to avoid making individual SQL requests to a database to access every single row. That would be awfully slow. So how do you solve this problem? More specifically, how do you solve this problem in Haskell? This is exactly the type of problem the library exists for. This article will go through a simple example. We’ll stream some integers using a conduit "source" and add them all together. Luckily, we only ever need to see one number at a time from the source, so we have no need of bringing a ton a data into memory. Let’s first take a look at how we create this source. Data.Conduit Getting Our Data Suppose we are just trying to find the sum of the numbers from 1 up to 10 million. The naive way to do this would be to have our “source” of numbers be a raw list, and then we would take the sum of this list: myIntegerSourceBad :: [Integer]myIntegerSourceBad = [1..10000000] sumFromSource :: [Integer] -> IntegersumFromSource lst = sum lst This method necessitates having the entire list in memory at the same time. Imagine if we were querying a database with tens of millions of entries. That would be problematic. Our first stroke to solve this will be to use to create a “Source” of integers from this list. We can do this with the function. Note that is the simple Identity monad. That is, the monad base monad that doesn’t actually do anything: Data.Conduit sourceList Identity import Control.Monad.Identity (Identity)import Data.Conduit (Source)import Data.Conduit.List (sourceList) myIntegerSource :: Source Identity IntegermyIntegerSource = sourceList [1..10000000] So now instead of returning a lump list of values, we’re actually getting a stream of values we call a . Let’s describe the meaning of some of the conduit types so we’ll have a better idea of what’s going on here. Int Source Conduit Types The conduit types are all built on , which is the fundamental monad for conduits. This type has four different parameters. We’ll write out its definition like so: ConduitM type ConduitM i o m r You should think of each conduit as a data processing funnel. The type is the input that you can bring into the funnel. The type stands for output. These are the values you can push out of the funnel. The type is the underlying monad. We'll see examples with both the monad and the monad. i o m Identity IO We can think of the last type as the “result”. But it’s different from the output. This is what the function itself will return as its value once it’s done. We'll see a return type when we write our sink later. But otherwise, we'll generally just use . As a result, we can rewrite some types using the type synonym. This synonym is helpful for ignoring the return type. But it is also annoying because it flips the placement of the and types. Don’t let this trip you up. r () Conduit m o type Conduit i m o = ConduitM i o m () Now we can define our source and sink types. A is a conduit that takes no input (so is the unit type) and produces some output. Conversely, a is a conduit that takes some input but produces no output (so the type): Source i Sink Void type Source m o = Conduit () m otype Sink i m r = ConduitM i Void m r So in the above example, our function is a “source”. It creates a series of integer values without taking any inputs. We’ll see how we can match up different conduits that have different matching types. Primitive Functions There are three basic functions in the conduits library: yield, await, and leftover. Yield is how we pass a value downstream to another conduit. In other words, it is a “production” function, or a source of values. We can only yield types that fit with the output type of our conduit function. So then how do we receive values from upstream? The answer is the function. This operates as a sink and allows a function to remain inert while it waits to receive a value from upstream. Naturally, this value must be of the type of the input of the conduit. Now, the actual resulting type of is a value. We could receive a value of . This indicates that our upstream source has terminated and we won't receive any more values. This is usually the circumstance in which we’ll let our function return. await await Maybe Nothing The final function is the function. This allows you to take a value that you have already pulled from upstream and put it back upstream. This way, we can consume the value again from a different sink, or a different iteration of this sink. One caveat the docs add is that you should not use with values that you have have created yourself. You should ONLY use values you got from upstream. leftover leftover Writing Our Conduits So above we’ve already written a source conduit for our toy program. We return a list of integers so we can stream them into our program 1-by-1. Now we’ll write a sink that will take these values and add them together. This will take the form of a recursive function with an accumulator argument. Sinks often function recursively to receive their next input. So we’ll start off by awaiting some value from the upstream conduit. myIntegerSink :: Integer -> Sink Integer Identity IntegermyIntegerSink accum = do maybeFirstVal <- await ... If that value is , we’ll know there are no more values coming. We can then proceed to return whatever accumulated value we have. Nothing myIntegerSink :: Integer -> Sink Integer Identity IntegermyIntegerSink accum = do maybeFirstVal <- await case maybeFirstVal of Nothing -> return accum ... If that value contains another , we’ll first calculate the new sum by adding them together. Then we’ll make a recursive call to the sink function with the new accumulator argument: Int myIntegerSink :: Integer -> Sink Integer Identity IntegermyIntegerSink accum = do maybeFirstVal <- await case maybeFirstVal of Nothing -> return accum Just val -> do let newSum = val + accum myIntegerSink newSum And that’s it really! Our example is simple so the code ends up being simple as well. Combining Our Conduits Now we’ll want to combine our conduits. We do this with the “fuse” operator, written out as . Haskell libraries can be notorious for having strange operators. This library isn’t necessarily an exception. But think of conduits as a tunnel, and this operator looks like it's connecting two parts of a tunnel. =$= With this operator, the output type of the first conduit needs to match the input type of the second conduit. With how we’ve set up our conduits, this is the case. So now to polish things off, we use with our combined conduit: runConduitPure fullConduit :: IntegerfullConduit = runConduitPure $ myIntegerSource =$= myIntegerSink 0 It’s generally quite easy using fuse to add more conduits. For instance, suppose we wanted even numbers to count double for our purposes. We could accomplish this in our source or sink, but we could also add another conduit. It will take an as input and yield an as output. It will want for its input integer, check its value, and then double the value if it is even. Then we will yield the resulting value. Once again, if we haven’t seen the end of the conduit, we need to recursively jump back into the conduit. Int Int myDoublingConduit :: Conduit Integer Identity IntegermyDoublingConduit = do maybeVal <- await case maybeVal of Nothing -> return () Just val -> do let newVal = if val `mod` 2 == 0 then val * 2 else val yield newVal myDoublingConduit Then we can stick this conduit between our other conduits with the fuse operator! fullConduit :: IntegerfullConduit = runConduitPure $ myIntegerSource =$= myDoublingConduit =$= myIntegerSink 0 Vectorizing There are also times when you’ll want to batch up certain transactions. A great example of this is when you want to insert all results from your sink into a database. You don’t want to keep sending individual insert queries down the line. You can instead wait and group a bunch of inputs together and send batch inserts. For our toy example, we’ll gather our ints in groups of 100000 so that we can give log progress updates along the way. This will require changing our conduits to live on top of the IO monad. But once we’ve made this change, we can use the function like so: conduitVector fullConduitIO :: IO IntegerfullConduitIO = runConduit $ myIntegerSourceIO =$= conduitVector 100000 =$= myIntegerVectorSink 0 myIntegerSourceIO :: Source IO IntegermyIntegerSourceIO = sourceList [1..100000000] myIntegerVectorSink :: Integer -> Sink (Vector Integer) IO IntegermyIntegerVectorSink accum = do maybeFirstVal <- await case maybeFirstVal of Nothing -> return accum Just vals -> do let newSum = (Vec.sum vals) + accum lift $ print newSum myIntegerVectorSink newSum By vectorizing the conduit, we need to change the sink so that it takes as its input type instead of . Then we get a vector from , so we have to sum those values as well. Vector Int Int await Summary The Data.Conduit library allows you to deal with large amounts of data in sustainable ways. You can use it to stream data from a database or some other source. This is more efficient than bringing a large chunk of information into memory all at once. It also allows you to pass information through “tunnels”, called conduits. You can make these perform many complicated operations. You mainly compose conduit functions from the , and functions. You merge conduits together into a larger conduit with the “fuse” operator . You can also use the function to batch certain operators. yield await leftover =$= conduitVector This was more advanced of a topic. If you’ve never written Haskell before, it’s not as scary as this article makes it out to be! Check out our to take the first steps on your Haskell journey! Getting Started Checklist If you’ve done a little bit of Haskell before but need some more practice on the fundamentals, you should download our . It has some great material on recursion as well as 10 practice problems! Recursion Workbook Next week we’ll keep up with some more advanced topics. We’ll look into the library and how it allows us to serialize Haskell objects into JSON format! So stay tuned to ! Data.Aeson Monday Morning Haskell Appendix I mentioned earlier that operators can be a major pain point in Haskell. Unfortunately, so can documentation. This can be especially true when tracking down the right docs for the conduit concepts. So for reference, here are all the imports I used: import Conduit (conduitVector, lift)import Control.Monad.Identity (Identity)import Data.Conduit (Source, Sink, await, runConduitPure, (=$=), Conduit, yield, runConduit)import Data.Conduit.List (sourceList)import Data.Vector hiding (sum)import qualified Data.Vector as Vec Note that and come from the library on hackage. However, the module actually comes from . This is very deceptive. Data.Conduit Data.Conduit.List conduit Conduit conduit-combinators is how hackers start their afternoons. We’re a part of the family. We are now and happy to opportunities. Hacker Noon @AMI accepting submissions discuss advertising & sponsorship To learn more, , , or simply, read our about page like/message us on Facebook tweet/DM @HackerNoon. If you enjoyed this story, we recommend reading our and . Until next time, don’t take the realities of the world for granted! latest tech stories trending tech stories