Skip to content

Latest commit

 

History

History
74 lines (70 loc) · 2.53 KB

iceberg-stream-writer.md

File metadata and controls

74 lines (70 loc) · 2.53 KB

The IcebergStreamWriter is for writing data-frames to iceberg tables in streaming mode.

The table of the writing is the full name of an iceberg table.

  • The supported write-options are as follows
    • fanout-enabled - override the target table’s write.spark.fanout.enabled. Default: false. Fanout writer opens the files per partition value and doesn’t close these files till write task is finished. This functionality is encouraged for streaming writes to eliminate the sorting requirements for partitioned tables.
    • checkpointLocation - the location for writing streaming checkpoints.
  • The trigger mode must be one of the following values:
    • processingTime - trigger a micro-batch query to start (one micro-batch) by an interval
    • once - trigger the streaming process one time
  • The output mode must be one of the following values:
    • complete - all the rows in the streaming DataFrame/Dataset will be written to the sink every time there are some updates.
    • append - only the new rows in the streaming DataFrame/Dataset will be written to the sink.
  • The test.waittimeMS is for testing purpose which specify how long the streaming run will be last.

Actor Class: com.qwshen.etl.sink.IcebergStreamWriter

The definition of the DeltaStreamWriter:

  • In YAML format
  actor:
    type: iceberg-stream-writer
    properties:
      table: events.db.features
      options:
        fanout-enabled: "true"
        checkpointLocation: "/tmp/events/features"
      outputMode: complete
      trigger:
        mode: processingTime
        interval: 3 seconds
      test.waittimeMS: 30000
      view: features      
  • In JSON format
  {
    "actor": {
      "type": "iceberg-stream-writer",
      "properties": {
        "table": "events.db.features",
        "options": {
          "fanout-enabled": "true",
          "checkpointLocation": "/tmp/events/features"
        },
        "outputMode": "append",
        "trigger": {
          "mode": "processingTime",
          "interval": "3 seconds"
        },
        "test.waittimeMS": "30000",
        "view": "features"
      }
    }
  }
  • In XML format
  <actor type="delta-writer">
    <properties>
      <table>events.db.features</table>
      <options>
        <fanout-enabled>true</fanout-enabled>
        <checkpointLocation>/tmp/events/features</checkpointLocation>
      </options>
      <trigger>
        <mode>once</mode>
      </trigger>
      <outputMode>append</outputMode>
      <test.waittimeMS>30000</test.waittimeMS>
      <view>features</view>
    </properties>
  </actor>