Datasources

Pulse supports multiple datasource types out of the box. Each datasource is configured with a YAML contract that defines connection settings, schema, and snapshot behavior.

Supported Datasources

PostgreSQL

type: postgresql

Connect to PostgreSQL databases with connection pooling and SSL support.

Amazon S3

type: s3

Read from S3 buckets with support for Parquet, CSV, and JSON formats.

BigQuery

type: bigquery

Query Google BigQuery tables with automatic schema inference.

Snowflake

type: snowflake

Connect to Snowflake data warehouses with role-based access.

HTTP API

type: http

Fetch data from REST APIs with authentication and pagination support.

PostgreSQL

Connect to PostgreSQL databases with full support for connection pooling, SSL, and incremental snapshots.

datasource: transactions
type: postgresql

connection:
  host: ${POSTGRES_HOST}
  port: 5432
  database: production
  user: ${POSTGRES_USER}
  password: ${POSTGRES_PASSWORD}
  ssl:
    mode: require
    ca: /path/to/ca.pem
  pool:
    min: 2
    max: 10
    idle_timeout: 30s

schema:
  table: transactions
  columns:
    - name: id
      type: uuid
      primary: true
    - name: user_id
      type: uuid
      index: true
    - name: amount
      type: decimal
      precision: 10
      scale: 2
    - name: status
      type: string
      enum: [pending, completed, failed]
    - name: created_at
      type: timestamp
      index: true

query:
  filter: "status = 'completed'"
  order_by: created_at DESC
  limit: 1000000

snapshot:
  strategy: incremental
  column: created_at
  retention: 90d

Amazon S3

Read data from S3 buckets with support for multiple file formats and partitioned data.

datasource: training-data
type: s3

connection:
  bucket: ml-training-data
  region: us-east-1
  credentials:
    access_key_id: ${AWS_ACCESS_KEY_ID}
    secret_access_key: ${AWS_SECRET_ACCESS_KEY}
  # Or use IAM role
  # role_arn: arn:aws:iam::123456789:role/pulse-reader

path:
  prefix: datasets/v2/
  pattern: "**/*.parquet"

schema:
  format: parquet
  compression: snappy
  partition_by:
    - name: date
      type: date
      format: "YYYY-MM-DD"
    - name: region
      type: string
  columns:
    - name: user_id
      type: string
    - name: features
      type: array
      items:
        type: number
    - name: label
      type: string

snapshot:
  strategy: full
  output:
    bucket: ml-snapshots
    prefix: "snapshots/training-data/"
  retention: 365d
  compression: zstd

BigQuery

Query Google BigQuery with automatic schema inference and cost controls.

datasource: analytics-events
type: bigquery

connection:
  project: my-gcp-project
  credentials:
    type: service_account
    path: ${GOOGLE_APPLICATION_CREDENTIALS}
  location: US

query:
  dataset: analytics
  table: events
  # Or use a custom query
  # sql: |
  #   SELECT *
  #   FROM analytics.events
  #   WHERE _PARTITIONTIME >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)

schema:
  columns:
    - name: event_id
      type: string
      primary: true
    - name: user_id
      type: string
    - name: event_name
      type: string
    - name: properties
      type: json
    - name: timestamp
      type: timestamp

cost_control:
  max_bytes_billed: 10GB
  use_query_cache: true

snapshot:
  strategy: incremental
  column: timestamp
  retention: 180d

Snowflake

Connect to Snowflake data warehouses with support for role-based access and time travel.

datasource: customer-data
type: snowflake

connection:
  account: xy12345.us-east-1
  user: ${SNOWFLAKE_USER}
  password: ${SNOWFLAKE_PASSWORD}
  warehouse: COMPUTE_WH
  database: PRODUCTION
  schema: PUBLIC
  role: DATA_READER

query:
  table: CUSTOMERS
  columns:
    - CUSTOMER_ID
    - EMAIL
    - CREATED_AT
    - LIFETIME_VALUE

schema:
  columns:
    - name: customer_id
      type: string
      primary: true
    - name: email
      type: string
    - name: created_at
      type: timestamp
    - name: lifetime_value
      type: decimal

snapshot:
  strategy: time_travel
  retention: 90d
  # Use Snowflake's time travel feature
  at: ${SNAPSHOT_TIMESTAMP}

HTTP API

Fetch data from REST APIs with authentication, pagination, and rate limiting.

datasource: external-api
type: http

connection:
  base_url: https://api.example.com/v1
  auth:
    type: bearer
    token: ${API_TOKEN}
  headers:
    Accept: application/json
    X-Custom-Header: value
  timeout: 30s
  retries: 3

request:
  method: GET
  path: /data/export
  query:
    format: json
    limit: 1000

pagination:
  type: cursor
  cursor_param: cursor
  cursor_path: meta.next_cursor
  has_more_path: meta.has_more

rate_limit:
  requests_per_second: 10
  burst: 20

schema:
  response_path: data
  columns:
    - name: id
      type: string
      primary: true
    - name: name
      type: string
    - name: attributes
      type: object

snapshot:
  strategy: full
  retention: 30d

Custom Datasource

Create custom datasource adapters for proprietary systems:

datasource: custom-system
type: custom

adapter:
  # Path to custom adapter implementation
  module: ./adapters/custom_adapter.py
  class: CustomDataSourceAdapter

connection:
  # Custom connection parameters passed to adapter
  endpoint: https://custom.internal.com
  api_key: ${CUSTOM_API_KEY}

# Schema and snapshot config as usual
schema:
  columns:
    - name: id
      type: string
    - name: data
      type: object

snapshot:
  strategy: full
  retention: 90d