Snowpipe (Files)

Snowpipe loads data from files as soon as they are available in a stage. The data is loaded according to the COPY statement defined in a referenced pipe.

Common Workflow

To enable automatic data ingestion from a stage to Snowpipe, a managed event queue (e.g., AWS SQS) is created when the Snowpipe is set up. You can retrieve the queue’s ARN using the SHOW PIPES command. This ARN is then configured as the destination for object creation events in the storage service (e.g., S3 event notifications). When a new file is uploaded to the storage (set as external stage), an event is sent to the SQS queue, triggering the execution of the COPY INTO statement.


Two mechanisms:

Automating Snowpipe using cloud messaging

  • leverage event notifications for cloud storage to inform Snowpipe of the arrival of new data files to load.
  • Snowpipe polls the event notifications from a queue

Calling Snowpipe REST endpoints

  • calls a public REST endpoint with the name of a pipe object and a list of data filenames.
  • POST: provide the file path in the request body, receiving the load status response
  • GET: Retrieve the loading history

Snowpipe Streaming (Rows)

  • continuous, low-latency loading of streaming data directly into Snowflake. It enables near real-time data ingestion and analysis, crucial for timely insights and immediate operational responses
Feature / AspectSnowpipe Streaming – High-Performance (Preview)Snowpipe Streaming – Classic
SDKNew snowpipe-streaming SDKsnowflake-ingest-java SDK (up to v4.x)
Data Flow ManagementUses PIPE object to manage data flow and enable lightweight transformations at ingest time; channels open against the PIPENo PIPE object; channels are opened directly against target tables
Ingestion MethodREST API for direct, lightweight ingestion through PIPESDK-based ingestion directly into tables
Schema ValidationServer-side during ingestion, validated against PIPE schemaTypically handled client-side or at query time
PerformanceSignificantly higher throughput; improved query efficiency on ingested dataReliable, but lower throughput compared to high-performance version
Pricing ModelTransparent, throughput-based pricing (credits per uncompressed GB)Based on serverless compute usage + number of active client connections
Use CaseRecommended for new streaming projects needing high throughput, predictable cost, and advanced capabilitiesReliable solution for established pipelines, production-ready but less performant

offset token:

  • like a bookmark/ resume point for your data, tracking how far you’ve ingested data into Snowflake so you know which rows have already been processed.
  • When you send data to Snowpipe Streaming (via insertRow/insertRows or appendRow/appendRows), you include an offset token for each row or batch. Snowflake stores this token after the data is successfully committed. Later, you can ask Snowflake for the latest committed offset token to see how far ingestion has progressed.

Checking

Check pipe status by PIPE_STATUS

SYSTEM$PIPE_STATUS( '<pipe_name>' )

Retrieve details about all loading activity in a table using COPY_HISTORY

select *
from table(information_schema.copy_history(TABLE_NAME=>'MYTABLE', START_TIME=> DATEADD(hours, -1, CURRENT_TIMESTAMP())));

Retrieve error in a past execution of the COPY INTO command and returns all the errors encountered during the load using VALIDATE

SELECT * FROM TABLE(VALIDATE(t1, JOB_ID=>'5415fa1e-59c9-4dda-b652-533de02fdcf1'));

Retrieve error for any loads for the mypipe pipe within the previous hour using VALIDATE_PIPE_LOAD:

select * from table(validate_pipe_load(
  pipe_name=>'MY_DB.PUBLIC.MYPIPE',
  start_time=>dateadd(hour, -1, current_timestamp())));