Python SDK

Getting Started with Arkiv (Python)

Build decentralized applications with Python and Arkiv

Last updated: January 2025

Setup & Installation

Prerequisites

What you need before starting

Python 3.8+

Latest stable version recommended

pip

Python package manager

Ethereum Wallet

With Hoodi testnet ETH

Test ETH

From Arkiv faucet

Installation

1# Create project directory
2mkdir arkiv-python-practice
3cd arkiv-python-practice
4
5# Create virtual environment
6python -m venv venv
7
8# Activate virtual environment
9source venv/bin/activate  # On Windows: venv\Scripts\activate
10
11# Install dependencies
12pip install arkiv-sdk python-dotenv

Environment Configuration

Create a .env file in your project directory

1PRIVATE_KEY=0x...
2CHAIN_ID=60138453025
3RPC_URL=https://kaolin.hoodi.arkiv.network/rpc
4WS_URL=wss://kaolin.hoodi.arkiv.network/rpc/ws

Connect to Arkiv

Basic Connection

Connect to Arkiv using your private key

1from arkiv_sdk import create_client, Tagged, Annotation
2from arkiv_sdk.types import AccountData, ArkivCreate, ArkivUpdate
3from dotenv import load_dotenv
4import os
5from uuid import uuid4
6
7# Load environment variables
8load_dotenv()
9
10async def main():
11    # Configure connection from .env
12    raw_key = os.getenv('PRIVATE_KEY', '')
13    hex_key = raw_key[2:] if raw_key.startswith('0x') else raw_key
14    key: AccountData = Tagged("privatekey", bytes.fromhex(hex_key))
15
16    chain_id = int(os.getenv('CHAIN_ID', '60138453025'))
17    rpc_url = os.getenv('RPC_URL', 'https://kaolin.hoodi.arkiv.network/rpc')
18    ws_url = os.getenv('WS_URL', 'wss://kaolin.hoodi.arkiv.network/rpc/ws')
19
20    # Create a client to interact with the Arkiv API
21    client = await create_client(
22        chain_id,
23        key,
24        rpc_url,
25        ws_url,
26    )
27
28    print("Connected to Arkiv testnet!")
29
30    # Get owner address
31    owner_address = await client.get_owner_address()
32    print(f"Connected with address: {owner_address}")
33
34# Run the async main function
35if __name__ == "__main__":
36    import asyncio
37    asyncio.run(main())

Creating & Managing Entities

Create Entity

1# Create a new entity with annotations
2entity_id = str(uuid4())
3creates = [
4    ArkivCreate(
5        data=b"Test entity",
6        expires_in=300,  # 300 seconds = 5 minutes
7        string_annotations=[
8            Annotation("testTextAnnotation", "demo"),
9            Annotation("id", entity_id)
10        ],
11        numeric_annotations=[Annotation("version", 1)]
12    )
13]
14
15create_receipt = await client.create_entities(creates)
16print('Receipt', create_receipt)
17
18# create_entities takes a list of ArkivCreate objects with 4 fields:
19# - data: Payload in bytes
20# - expires_in: Time-to-live in seconds
21# - string_annotations: Text annotations for querying
22# - numeric_annotations: Numeric annotations for querying

Update and Delete Entity

1# Update the entity
2update_receipt = await client.update_entities([
3    ArkivUpdate(
4        entity_key=create_receipt[0].entity_key,
5        data=b"Updated entity",
6        expires_in=1200,  # 1200 seconds = 20 minutes
7        string_annotations=[Annotation("id", entity_id)],
8        numeric_annotations=[Annotation("version", 2)]
9    )
10])
11print('Update', update_receipt)
12
13# Updating an entity overrides all of its elements,
14# including the payload, annotations, and expiration.
15
16# Delete the entity
17delete_receipt = await client.delete_entities([entity_key])
18print('Delete', delete_receipt)

Query Entities

Search with Annotations

1# Meta data and storage
2if entity_key:
3    meta = await client.get_entity_metadata(entity_key)
4    print('Meta data:', meta)
5
6    data = await client.get_storage_value(entity_key)
7    print('Storage value:', data.decode())
8
9# 1. Simple equality query
10greetings = await client.query_entities('type = "greeting"')
11print(f'Found {len(greetings)} greeting entities')
12
13# 2. Processing query results
14for entity in greetings:
15    data = entity.storage_value.decode()
16    print(f'Entity {entity.entity_key}: {data}')
17
18# 3. Numeric comparison operators
19await print_entities('High priority', await client.query_entities('priority > 5'))
20await print_entities('Old versions', await client.query_entities('version < 3'))
21await print_entities('In range', await client.query_entities('score >= 80 && score <= 100'))
22
23# 4. Combining conditions with AND (&&)
24await print_entities('Specific', await client.query_entities('type = "greeting" && version = 1'))
25
26# 5. Using OR (||) for multiple options
27await print_entities('Messages', await client.query_entities('type = "message" || type = "other"'))
28
29# 6. Complex queries with mixed operators
30await print_entities('Filtered', await client.query_entities('(type = "task" && priority > 3) || status = "urgent"'))
31
32# Note: Query string must use double quotes for string values
33# Numbers don't need quotes: priority = 5
34# Strings need quotes: type = "message"
35
36
37async def print_entities(label: str, entities: list):
38    print(f'{label} - found {len(entities)} entities:')
39    for entity in entities:
40        data = entity.storage_value.decode()
41        print(f'{label} EntityKey: {entity.entity_key}, Data: {data}')

Real-time Events

Event Monitoring

Listen to real-time blockchain events

1async def setup_event_monitoring(client):
2    """Watch for events from the blockchain"""
3
4    def on_created(args):
5        print("Entity created:", args.entity_key)
6
7    def on_updated(args):
8        print("Entity updated:", args.entity_key)
9
10    def on_deleted(args):
11        print("Entity deleted:", args.entity_key)
12
13    def on_extended(args):
14        print("Entity extended:", args.entity_key)
15
16    def on_error(error):
17        print("Watch error:", error)
18
19    # Watch for events
20    unwatch = client.watch_logs(
21        from_block=0,
22        on_created=on_created,
23        on_updated=on_updated,
24        on_deleted=on_deleted,
25        on_extended=on_extended,
26        on_error=on_error
27    )
28
29    # Return unwatch function to stop monitoring later
30    return unwatch

Batch Operations

Create Multiple Entities

Efficiently create multiple entities at once

1async def batch_operations(client):
2    """Create multiple entities at once"""
3    entities = []
4    batch_id = str(uuid4())
5
6    for i in range(10):
7        entities.append(ArkivCreate(
8            data=f"Message {i}".encode(),
9            expires_in=100,
10            string_annotations=[
11                Annotation("type", "batch"),
12                Annotation("batchId", batch_id),
13                Annotation("index", str(i))
14            ],
15            numeric_annotations=[]
16        ))
17
18    receipts = await client.create_entities(entities)
19    print(f'Created {len(receipts)} entities in batch')
20
21    batch_entity_keys = await client.query_entities(f'batchId = "{batch_id}"')
22    print(f'Queried {len(batch_entity_keys)} entities in batch')

Expires In & Data Lifecycle

Managing Entity Lifetime

Control when your data expires with Expires In

1async def manage_expires_in(client):
2    """Manage entity lifetime with Expires In"""
3
4    # Create entity with specific Expires In
5    entity = ArkivCreate(
6        data=b"Temporary data",
7        expires_in=50,  # 50 seconds
8        string_annotations=[Annotation("type", "temporary")],
9        numeric_annotations=[]
10    )
11
12    receipt = (await client.create_entities([entity]))[0]
13    print(f'Entity expires at block: {receipt.expiration_block}')
14
15    # Extend entity lifetime
16    extend_receipts = await client.extend_entities([{
17        'entity_key': receipt.entity_key,
18        'number_of_blocks': 150  # Add 150 more blocks
19    }])
20
21    print(f'Extended to block: {extend_receipts[0].new_expiration_block}')
22
23    # Check remaining expiration
24    metadata = await client.get_entity_metadata(receipt.entity_key)
25    print(f'Entity expires at block: {metadata.expires_at_block}')

Troubleshooting

Common Issues

Connection Failed

  • • Check your RPC URL and WS URL in .env
  • • Verify your private key format (with or without 0x prefix)
  • • Ensure your wallet has test ETH from faucet

Installation Issues

  • • Update to Python 3.8+
  • • Activate virtual environment before installing packages
  • • Try: pip install --upgrade arkiv-sdk

Query Issues

  • • Use double quotes for string values in queries
  • • Verify annotation names match exactly
  • • Check entity still exists (not expired)

Complete Example

Full Application

A complete Python application demonstrating all Arkiv features

1from arkiv_sdk import create_client, Tagged, Annotation
2from arkiv_sdk.types import ArkivCreate, ArkivUpdate
3from dotenv import load_dotenv
4import os
5from uuid import uuid4
6import json
7import time
8import asyncio
9
10# Load environment variables
11load_dotenv()
12
13
14async def main():
15    # 1. INITIALIZE CLIENT
16    private_key_hex = os.getenv('PRIVATE_KEY', '0x...').replace('0x', '')
17    private_key = bytes.fromhex(private_key_hex)
18
19    client = await create_client(
20        60138453025,
21        Tagged("privatekey", private_key),
22        "https://kaolin.hoodi.arkiv.network/rpc",
23        "wss://kaolin.hoodi.arkiv.network/rpc/ws"
24    )
25
26    print("Connected to Arkiv!")
27    owner_address = await client.get_owner_address()
28    print(f"Owner address: {owner_address}")
29
30    # Get and check client account balance
31    balance = await client.get_raw_client().http_client.get_balance(owner_address)
32    balance_eth = float(balance) / 10**18
33    print(f"Client account balance: {balance_eth} ETH")
34
35    if balance_eth == 0:
36        print("Warning: Account balance is 0 ETH. Please acquire test tokens from the faucet.")
37
38    # Set up real-time event watching
39    def on_created(args):
40        print("WATCH-> Create:", args)
41
42    def on_updated(args):
43        print("WATCH-> Update:", args)
44
45    def on_extended(args):
46        print("WATCH-> Extend:", args)
47
48    def on_deleted(args):
49        print("WATCH-> Delete:", args)
50
51    def on_error(error):
52        print("WATCH-> Error:", error)
53
54    block_number = await client.get_raw_client().http_client.get_block_number()
55    unsubscribe = client.watch_logs(
56        from_block=block_number,
57        on_created=on_created,
58        on_updated=on_updated,
59        on_extended=on_extended,
60        on_deleted=on_deleted,
61        on_error=on_error,
62        polling_interval=1000,
63        transport="websocket"
64    )
65
66    # 2. CREATE - Single entity with annotations
67    entity_id = str(uuid4())
68    entity = ArkivCreate(
69        data=json.dumps({
70            "message": "Hello from Arkiv!",
71            "timestamp": int(time.time() * 1000),
72            "author": "Developer"
73        }).encode(),
74        expires_in=300,  # 300 seconds = 5 minutes
75        string_annotations=[
76            Annotation("type", "message"),
77            Annotation("event", "arkiv"),
78            Annotation("id", entity_id)
79        ],
80        numeric_annotations=[
81            Annotation("version", 1),
82            Annotation("timestamp", int(time.time() * 1000))
83        ]
84    )
85
86    create_receipts = await client.create_entities([entity])
87    entity_key = create_receipts[0].entity_key
88    print(f"Created entity: {entity_key}")
89
90    # 3. QUERY - Find entity by annotations
91    query_results = await client.query_entities(f'id = "{entity_id}" && version = 1')
92    print(f"Found {len(query_results)} matching entities")
93
94    for result in query_results:
95        data = json.loads(result.storage_value.decode())
96        print("Query result:", data)
97
98    # 4. UPDATE - Modify existing entity
99    update_data = ArkivUpdate(
100        entity_key=entity_key,
101        data=json.dumps({
102            "message": "Updated message from Arkiv!",
103            "updated": True,
104            "update_time": int(time.time() * 1000)
105        }).encode(),
106        expires_in=600,  # 600 seconds = 10 minutes
107        string_annotations=[
108            Annotation("type", "message"),
109            Annotation("id", entity_id),
110            Annotation("status", "updated")
111        ],
112        numeric_annotations=[
113            Annotation("version", 2)
114        ]
115    )
116
117    update_receipts = await client.update_entities([update_data])
118    print(f"Updated entity: {update_receipts[0].entity_key}")
119
120    # 5. QUERY updated entity
121    updated_results = await client.query_entities(f'id = "{entity_id}" && version = 2')
122    print(f"Found {len(updated_results)} updated entities")
123
124    # 6. BATCH OPERATIONS - Create multiple entities
125    batch_entities = []
126    for i in range(5):
127        batch_entities.append(ArkivCreate(
128            data=f"Batch message {i}".encode(),
129            expires_in=100,
130            string_annotations=[
131                Annotation("type", "batch"),
132                Annotation("index", str(i))
133            ],
134            numeric_annotations=[
135                Annotation("sequence", i + 1)  # Start from 1, not 0 (SDK bug with value 0)
136            ]
137        ))
138
139    batch_receipts = await client.create_entities(batch_entities)
140    print(f"Created {len(batch_receipts)} entities in batch")
141
142    # 7. EXPIRATION MANAGEMENT - Extend entity lifetime
143    extend_receipts = await client.extend_entities([{
144        'entity_key': entity_key,
145        'number_of_blocks': 100
146    }])
147    print(f"Extended expiration to block: {extend_receipts[0].new_expiration_block}")
148
149    # Check metadata to verify expiration
150    metadata = await client.get_entity_metadata(entity_key)
151    print(f"Entity expires at block: {metadata.expires_at_block}")
152
153    # 8. DELETE - Remove entity
154    delete_receipts = await client.delete_entities([entity_key])
155    print(f"Deleted entity: {delete_receipts[0].entity_key}")
156
157    # Clean up batch entities
158    for receipt in batch_receipts:
159        await client.delete_entities([receipt.entity_key])
160
161    # Stop watching events
162    unsubscribe()
163    print("Complete!")
164
165
166if __name__ == "__main__":
167    asyncio.run(main())