For the SQL interface, both sources and sinks are treated as tables. Sources you SELECT FROM, while sinks you INSERT INTO. Right now it is incumbent on the user to correctly specify the types of a source for deserialization. How getting this wrong behaves is a little source-dependent, as some data formats are stricter. Parquet will fail hard at read-time, while JSON will coerce as best as it is able, optionally dropping the data instead of failing the job depending on the bad_data parameter: https://doc.arroyo.dev/connectors/overview#bad-data.
Currently we don't support much in the way of changing configuration in external systems, instead focusing on defining long-running pipelines.
What did you have in mind for an HTTP source? We have a polling HTTP source, as well as a WebSocket source:
So if I'm understanding, you actually read data directly from (say) S3? It isn't copied from S3 and stored locally (ie, a bunch of local .arrow files.)
(Apologies if I'm ignorant of the underlying tech - I think this is really cool and just trying to wrap my head around what happens from "I upload some data to S3" and "we get query results")
Yep, pretty much. Right now filesystem^ sources are finite, scanning the target path at operator startup time and processing all matching files. This processing is done by opening an asynchronous reader, courtesy of the object_store crate.
^We call these Filesystem Sources/Sinks to match terminology present in other streaming systems, but I'm not in love with it.
Hi there! We actually already have a built-in Nexmark source. It's pretty useful for developing new capabilities, and available as a source out of the box.
Just read through the DBSP docs and it looks like it is working in a similar space. The biggest differences in my mind are around distribution and reliability. Arroyo works across a cluster of machines and has built in fault tolerance, while for DBSP that's still just planned for the future.
That'd be great! We have versions of most of the nexmark queries and some internal benchmark vs Flink, we'd love to help if you're interested in benchmarking against more systems. Reach out at micah@arroyo.systems.
Those switches all occurred at the pipeline level, leaving the map-reduce platform untouched. Switching our base logs to something like Parquet, Thrift or Protobuf would be a much larger project. We do support writing and reading Parquet to allow us to interface with other big data systems.
We started developing rowfiles around 2005. Thrift wasn't open sourced until 2007. I couldn't find a date for protobuf's release, but I don't think it was standard outside of google at that time. We use protobufs internally, and have a number of Rows whose field values are byte[]s containing protobufs. One big thing our rowfiles gives us is fast indexing. The only other big data format I know of that gives that is Kudu, which uses the same indexing scheme.
Nope, Kudu https://kudu.apache.org/. Although from Arrow's homepage it looks like it works with Kudu. "Apache Arrow is backed by key developers of 13 major open source projects, including Calcite, Cassandra, Drill, Hadoop, HBase, Ibis, Impala, Kudu, Pandas, Parquet, Phoenix, Spark, and Storm making it the de-facto standard for columnar in-memory analytics."
Kudu was designed to be a columnar data store. Which means that if you had a schema like table PERSON(name STRING, age INT), you would store all the names together, and then all the ages together. This lets you do fancy tricks like run-length encoding on the age fields, and so forth.
There is also a Kudu RPC format which uses protocol buffers in places. But Kudu also sends some data over the wire that is not encoded as protocol buffers, to avoid the significant overhead of protocol buffer serialization / deserialization.
Apache Arrow is a separate project which started taking shape later. Essentially it was conceived as a way of allowing multiple applications to share columnar data that was in memory. I remember there being a big focus on sharing memory locally in a "zero-copy" fashion by using things like memory mapping. I never actually worked on the project, though. Developers of projects like Impala, Kudu, etc. thought this was a cool idea since it would speed up their projects. I don't know how far along the implementations are, though. Certainly Kudu did not have any integration with Arrow when I worked on it, although that may have changed.
Protocol buffers is simple and has a lot of language bindings, but its performance is poor on Java. The official PB Java libraries generate a lot of temporary objects, which is a big problem in projects like HDFS. It works better on C++, but it's still not super-efficient or anything. It's... better than ASN.1, I guess? It has optional fields, and XDR didn't...
If you want more details, we were packing a Row class into a base64 encoded string using an ObjectOutputStream. This is a fine thing for small scale serialization but sucks at scale, because of the reasons mentioned in the post.
Sorry we don't have code examples, but it's unclear how useful it'd be given that no one else uses our file format. If you want a bit more detail on how the format works. Each metadata contains a list of typed columns to define the schema of a given part. Our map-reduce framework has a bunch of internal logic that tries to justify the written Row class with the one the Mapper class is asking for. This allows us to do things like ingest different versions of a row with in the context of a single job.
I think questions of serialization at the scale are generally interesting, although ymmv. I know of one company using Avro, which doesn't let you cleanly update or track schema. They've ended up storing every schema in an HBase table and reserving the first 8 bytes to do a lookup into this table to know the row's schema.
Avro can store the schema inline or out of line; with inline schemas, it's at the start of the file (embedded JSON), and it describes the schema for all the rows in that file. If you're working with Hive, the schema you put in the Hive metastore is cross-checked with each Avro file read; if any given Avro file doesn't contain a particular column, it just turns up as null for that subset of rows. Spark and Impala work similarly.
I agree serialization at scale is interesting. My particular interest right at this moment is in efficiently doing incremental updates of HDFS files (Parquet & Avro) from observing changes in MySQL tables - not completely trivial because some ETL with joins and unions is required to get data in the right shape.
What do you mean when you say Avro doesn't let you "cleanly update or track schema"?
From what I've read about Avro
1. It can transform data between two compatible schemas.
2. It can serialize/load schemas off the wire, so you can send the schema in the header.
If schema serialization causes too much overhead, you can set things up so you only send the schema version identifier, as long as the receiver can use that to get access to the full schema.
A standard database table isn't large enough to handle our large datasets. For example, the Hercules dataset was over 2 petabytes and even after optimization is almost 1 petabyte. Big data systems like Spark, Impala, Presto, etc. are designed to make the data look like a table, even though it is spread out into many files in a distributed filesystem. This is what we do. It's pretty common to reimplement some database features onto these big data file formats. In our case we have very fast indexes that let us quickly fetch data, similar to an index in a postgresql table.
Well, you understand your system and requirements better than I, obviously, but...
A standard database table isn't large enough to handle our large datasets
... isn't much of an answer as-to why you're storing objects in your database.
As you already mentioned in your post, serialized objects are big - they contain all of their data, plus everything necessary to deserialize the object into something usable.
I imagine your objects have the standard amount of strings, characters, numbers, booleans, etc... why not just store those in the database and select them back out when needed? Less data in the database, and faster retrieval time since you skip serialization in both steps (storage and retrieval). Even if you have nested objects within nested objects, you can write-out a "flat" version of the data to a couple of joined tables surely.
On the other hand, serializing the object is probably more "simple" to implement and use... but then you get the classical tradeoff of performance vs. convenience.
Start out with the idea that you have hundreds of machines in your cluster, with 1000s of TB of data. Suppose the current data efficiency is on the order of 80% - that is, 80% of the 1000s of TB is the actual bytes of the data fields. What database do you have in mind to store this data, still on the order of 1000s of TB?
You say: a couple of joined tables. So you have hundreds of machines, and the tables are not all going to fit on one machine; they're going to be scattered across hundreds of machines each. How do you efficiently do a join across two distributed tables?
It's no picnic.
If each row in one table only has a few related rows in the other table, it's much, much better to store the related data inline. Locality is key; you want data in memory right now, not somewhere on disk across the network.
I don't think protobuf was around for public use when we came up with this format, which began around 2005. We use Protobuf internally, and some of our columns are actually byte[]'s containing protobuf data. We now support Parquet and are doing more work with other big data tools, but we've had a hard time matching the performance of our custom stuff.
1) Can you provide any more details about how Rowfiles are structured and/or implemented? Specifically, how does it handle nested objects? Does it support `transient`? Do `writeObject` and/or `readObject` come into play?
2) Do you feel this is a generic enough solution that you would consider submitting it as a JSR?
It natively supports a limited set of Columns. Basically boxed primitives, java.util.Date, joda.time.DateTime, and arrays and double arrays of both boxed and unboxed versions of the preceding. The list of Columns being used is used to read and write to a byte buffer. The byte buffer is almost entirely the field's data, with one or two bytes describing how the subsequent field is encoded. Nested objects aren't handled out of the box, but there is the capability to define a UserRowField that allows for serialization/deserialization to bytes of any Serializable class. This gets used for our SQL map-reduce function a lot. The downside is that you need to have the UserRowField implementation in your classpath in order to read the Row, which is not generally the case.
We're a big data advertise and measure company based in San Francisco. We run online display ad campaigns for marketers across realtime bidding exchanges (RTB), such as those run by Google and AppNexus. We also provide a publisher product to give site owners insights into their audience. Stack Overflow's profile is at https://www.quantcast.com/stackoverflow.com.
Currently we don't support much in the way of changing configuration in external systems, instead focusing on defining long-running pipelines.
What did you have in mind for an HTTP source? We have a polling HTTP source, as well as a WebSocket source:
https://doc.arroyo.dev/connectors/polling-http https://doc.arroyo.dev/connectors/websocket