harlow / kinesis-consumer
Golang library for consuming Kinesis stream data
AI Architecture Analysis
This repository is indexed by RepoMind. By analyzing harlow/kinesis-consumer in our AI interface, you can instantly generate complete architecture diagrams, visualize control flows, and perform automated security audits across the entire codebase.
Our Agentic Context Augmented Generation (Agentic CAG) engine loads full source files into context on-demand, avoiding the fragmentation of traditional RAG systems. Ask questions about the architecture, dependencies, or specific features to see it in action.
Repository Overview (README excerpt)
Crawler viewGolang Kinesis Consumer Kinesis consumer applications written in Go. This library is intended to be a lightweight wrapper around the Kinesis API to read records, save checkpoints (with swappable backends), and gracefully recover from service timeouts/errors. __Alternate serverless options:__ • Kinesis to Firehose can be used to archive data directly to S3, Redshift, or Elasticsearch without running a consumer application. • Process Kinesis Streams with Golang and AWS Lambda for serverless processing and checkpoint management. Installation Get the package source: $ go get github.com/harlow/kinesis-consumer Note: This repo requires the AWS SDK V2 package. If you are still using AWS SDK V1 then use: https://github.com/harlow/kinesis-consumer/releases/tag/v0.3.5 Overview The consumer leverages a handler func that accepts a Kinesis record. The method will consume all shards concurrently and call the callback func as it receives records from the stream. _Important 1: The func will also poll the stream to check for new shards, it will automatically start consuming new shards added to the stream._ _Important 2: The default Log, Counter, and Checkpoint are no-op which means no logs, counts, or checkpoints will be emitted when scanning the stream. See the options below to override these defaults._ ScanFunc ScanFunc is the type of the function called for each message read from the stream. The record argument contains the original record returned from the AWS Kinesis library. If an error is returned, scanning stops. The sole exception is when the function returns the special value SkipCheckpoint. ScanBatch (experimental) For interval/size-based batch processing, use : Checkpoint behavior in batch mode: • checkpoint advances only after a batch callback succeeds • on callback error, scan stops and that batch is not checkpointed Use context cancel to signal the scan to exit without error. For example if we wanted to gracefully exit the scan on interrupt. Consumer Groups (DynamoDB Leases, Opt-In) By default, consumes all shards in a single process. For multi-process shard coordination, use the opt-in consumer-group package. > Note: Consumer-group support is experimental and may evolve. If is omitted, the library generates a unique worker ID per process. If both and are set, is used. The lease table schema is: For worker row cleanup, enable DynamoDB TTL on attribute . Worker heartbeat rows write this field automatically. Integration tests for this path are available and opt-in: Options The consumer allows the following optional overrides. Store To record the progress of the consumer in the stream (checkpoint) we use a storage layer to persist the last sequence number the consumer has read from a particular shard. The boolean value ErrSkipCheckpoint of consumer.ScanError determines if checkpoint will be activated. ScanError is returned by the record processing callback. This will allow consumers to re-launch and pick up at the position in the stream where they left off. The uniq identifier for a consumer is Note: The default storage is in-memory (no-op). Which means the scan will not persist any state and the consumer will start from the beginning of the stream each time it is re-started. The consumer accepts a option to set the storage layer: Checkpoint durability depends on the store implementation: • writes checkpoints immediately. • , , and buffer checkpoints in memory and flush them periodically. • , , and flush buffered checkpoints automatically before they return. • If you manage a buffered store outside the consumer lifecycle, call to persist pending checkpoints or to flush and stop the store. To persist scan progress choose one of the following storage layers: Redis The Redis checkpoint requires App Name, and Stream Name: DynamoDB The DynamoDB checkpoint requires Table Name, App Name, and Stream Name: To leverage the DDB checkpoint we'll also need to create a table: Postgres The Postgres checkpoint requires Table Name, App Name, Stream Name and ConnectionString: To leverage the Postgres checkpoint we'll also need to create a table: The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity. Mysql The Mysql checkpoint requires Table Name, App Name, Stream Name and ConnectionString (just like the Postgres checkpoint!): To leverage the Mysql checkpoint we'll also need to create a table: The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity. Kinesis Client Override the Kinesis client if there is any special config needed: Metrics Add optional counter for exposing counts for checkpoints and records processed: The expvar package will display consumer counts: Consumer starting point Kinesis allows consumers to specify where on the stream they'd like to start consuming from. The default in this library is (Start reading just after the most recent record in the shard). This can be adjusted by using the option in the library: See AWS Docs for more options. Logging Logging supports the basic built-in logging library or use third party external one, so long as it implements the Logger interface. For example, to use the builtin logging package, we wrap it with myLogger structure. The package defaults to so swallow all logs. This can be customized with the preferred logging strategy: To use a more complicated logging library, e.g. apex log Examples There are examples of producer and comsumer in the directory. These should help give end-to-end examples of setting up consumers with different checkpoint strategies. The examples run locally against Kinesis Lite. $ kinesalite & Produce data to the stream: $ cat exam…