Getting Started with Arkiv (Python)
Build decentralized applications with Python and Arkiv
Last updated: January 2025
Setup & Installation
Prerequisites
What you need before starting
✓
Python 3.8+
Latest stable version recommended
✓
pip
Python package manager
✓
Ethereum Wallet
With Hoodi testnet ETH
✓
Test ETH
From Arkiv faucet
Installation
1# Create project directory
2mkdir arkiv-python-practice
3cd arkiv-python-practice
4
5# Create virtual environment
6python -m venv venv
7
8# Activate virtual environment
9source venv/bin/activate # On Windows: venv\Scripts\activate
10
11# Install dependencies
12pip install arkiv-sdk python-dotenvEnvironment Configuration
Create a .env file in your project directory
1PRIVATE_KEY=0x...
2CHAIN_ID=60138453025
3RPC_URL=https://kaolin.hoodi.arkiv.network/rpc
4WS_URL=wss://kaolin.hoodi.arkiv.network/rpc/wsConnect to Arkiv
Basic Connection
Connect to Arkiv using your private key
1from arkiv_sdk import create_client, Tagged, Annotation
2from arkiv_sdk.types import AccountData, ArkivCreate, ArkivUpdate
3from dotenv import load_dotenv
4import os
5from uuid import uuid4
6
7# Load environment variables
8load_dotenv()
9
10async def main():
11 # Configure connection from .env
12 raw_key = os.getenv('PRIVATE_KEY', '')
13 hex_key = raw_key[2:] if raw_key.startswith('0x') else raw_key
14 key: AccountData = Tagged("privatekey", bytes.fromhex(hex_key))
15
16 chain_id = int(os.getenv('CHAIN_ID', '60138453025'))
17 rpc_url = os.getenv('RPC_URL', 'https://kaolin.hoodi.arkiv.network/rpc')
18 ws_url = os.getenv('WS_URL', 'wss://kaolin.hoodi.arkiv.network/rpc/ws')
19
20 # Create a client to interact with the Arkiv API
21 client = await create_client(
22 chain_id,
23 key,
24 rpc_url,
25 ws_url,
26 )
27
28 print("Connected to Arkiv testnet!")
29
30 # Get owner address
31 owner_address = await client.get_owner_address()
32 print(f"Connected with address: {owner_address}")
33
34# Run the async main function
35if __name__ == "__main__":
36 import asyncio
37 asyncio.run(main())Creating & Managing Entities
Create Entity
1# Create a new entity with annotations
2entity_id = str(uuid4())
3creates = [
4 ArkivCreate(
5 data=b"Test entity",
6 expires_in=300, # 300 seconds = 5 minutes
7 string_annotations=[
8 Annotation("testTextAnnotation", "demo"),
9 Annotation("id", entity_id)
10 ],
11 numeric_annotations=[Annotation("version", 1)]
12 )
13]
14
15create_receipt = await client.create_entities(creates)
16print('Receipt', create_receipt)
17
18# create_entities takes a list of ArkivCreate objects with 4 fields:
19# - data: Payload in bytes
20# - expires_in: Time-to-live in seconds
21# - string_annotations: Text annotations for querying
22# - numeric_annotations: Numeric annotations for queryingUpdate and Delete Entity
1# Update the entity
2update_receipt = await client.update_entities([
3 ArkivUpdate(
4 entity_key=create_receipt[0].entity_key,
5 data=b"Updated entity",
6 expires_in=1200, # 1200 seconds = 20 minutes
7 string_annotations=[Annotation("id", entity_id)],
8 numeric_annotations=[Annotation("version", 2)]
9 )
10])
11print('Update', update_receipt)
12
13# Updating an entity overrides all of its elements,
14# including the payload, annotations, and expiration.
15
16# Delete the entity
17delete_receipt = await client.delete_entities([entity_key])
18print('Delete', delete_receipt)Query Entities
Search with Annotations
1# Meta data and storage
2if entity_key:
3 meta = await client.get_entity_metadata(entity_key)
4 print('Meta data:', meta)
5
6 data = await client.get_storage_value(entity_key)
7 print('Storage value:', data.decode())
8
9# 1. Simple equality query
10greetings = await client.query_entities('type = "greeting"')
11print(f'Found {len(greetings)} greeting entities')
12
13# 2. Processing query results
14for entity in greetings:
15 data = entity.storage_value.decode()
16 print(f'Entity {entity.entity_key}: {data}')
17
18# 3. Numeric comparison operators
19await print_entities('High priority', await client.query_entities('priority > 5'))
20await print_entities('Old versions', await client.query_entities('version < 3'))
21await print_entities('In range', await client.query_entities('score >= 80 && score <= 100'))
22
23# 4. Combining conditions with AND (&&)
24await print_entities('Specific', await client.query_entities('type = "greeting" && version = 1'))
25
26# 5. Using OR (||) for multiple options
27await print_entities('Messages', await client.query_entities('type = "message" || type = "other"'))
28
29# 6. Complex queries with mixed operators
30await print_entities('Filtered', await client.query_entities('(type = "task" && priority > 3) || status = "urgent"'))
31
32# Note: Query string must use double quotes for string values
33# Numbers don't need quotes: priority = 5
34# Strings need quotes: type = "message"
35
36
37async def print_entities(label: str, entities: list):
38 print(f'{label} - found {len(entities)} entities:')
39 for entity in entities:
40 data = entity.storage_value.decode()
41 print(f'{label} EntityKey: {entity.entity_key}, Data: {data}')Real-time Events
Event Monitoring
Listen to real-time blockchain events
1async def setup_event_monitoring(client):
2 """Watch for events from the blockchain"""
3
4 def on_created(args):
5 print("Entity created:", args.entity_key)
6
7 def on_updated(args):
8 print("Entity updated:", args.entity_key)
9
10 def on_deleted(args):
11 print("Entity deleted:", args.entity_key)
12
13 def on_extended(args):
14 print("Entity extended:", args.entity_key)
15
16 def on_error(error):
17 print("Watch error:", error)
18
19 # Watch for events
20 unwatch = client.watch_logs(
21 from_block=0,
22 on_created=on_created,
23 on_updated=on_updated,
24 on_deleted=on_deleted,
25 on_extended=on_extended,
26 on_error=on_error
27 )
28
29 # Return unwatch function to stop monitoring later
30 return unwatchBatch Operations
Create Multiple Entities
Efficiently create multiple entities at once
1async def batch_operations(client):
2 """Create multiple entities at once"""
3 entities = []
4 batch_id = str(uuid4())
5
6 for i in range(10):
7 entities.append(ArkivCreate(
8 data=f"Message {i}".encode(),
9 expires_in=100,
10 string_annotations=[
11 Annotation("type", "batch"),
12 Annotation("batchId", batch_id),
13 Annotation("index", str(i))
14 ],
15 numeric_annotations=[]
16 ))
17
18 receipts = await client.create_entities(entities)
19 print(f'Created {len(receipts)} entities in batch')
20
21 batch_entity_keys = await client.query_entities(f'batchId = "{batch_id}"')
22 print(f'Queried {len(batch_entity_keys)} entities in batch')Expires In & Data Lifecycle
Managing Entity Lifetime
Control when your data expires with Expires In
1async def manage_expires_in(client):
2 """Manage entity lifetime with Expires In"""
3
4 # Create entity with specific Expires In
5 entity = ArkivCreate(
6 data=b"Temporary data",
7 expires_in=50, # 50 seconds
8 string_annotations=[Annotation("type", "temporary")],
9 numeric_annotations=[]
10 )
11
12 receipt = (await client.create_entities([entity]))[0]
13 print(f'Entity expires at block: {receipt.expiration_block}')
14
15 # Extend entity lifetime
16 extend_receipts = await client.extend_entities([{
17 'entity_key': receipt.entity_key,
18 'number_of_blocks': 150 # Add 150 more blocks
19 }])
20
21 print(f'Extended to block: {extend_receipts[0].new_expiration_block}')
22
23 # Check remaining expiration
24 metadata = await client.get_entity_metadata(receipt.entity_key)
25 print(f'Entity expires at block: {metadata.expires_at_block}')Troubleshooting
Common Issues
Connection Failed
- • Check your RPC URL and WS URL in .env
- • Verify your private key format (with or without 0x prefix)
- • Ensure your wallet has test ETH from faucet
Installation Issues
- • Update to Python 3.8+
- • Activate virtual environment before installing packages
- • Try: pip install --upgrade arkiv-sdk
Query Issues
- • Use double quotes for string values in queries
- • Verify annotation names match exactly
- • Check entity still exists (not expired)
Complete Example
Full Application
A complete Python application demonstrating all Arkiv features
1from arkiv_sdk import create_client, Tagged, Annotation
2from arkiv_sdk.types import ArkivCreate, ArkivUpdate
3from dotenv import load_dotenv
4import os
5from uuid import uuid4
6import json
7import time
8import asyncio
9
10# Load environment variables
11load_dotenv()
12
13
14async def main():
15 # 1. INITIALIZE CLIENT
16 private_key_hex = os.getenv('PRIVATE_KEY', '0x...').replace('0x', '')
17 private_key = bytes.fromhex(private_key_hex)
18
19 client = await create_client(
20 60138453025,
21 Tagged("privatekey", private_key),
22 "https://kaolin.hoodi.arkiv.network/rpc",
23 "wss://kaolin.hoodi.arkiv.network/rpc/ws"
24 )
25
26 print("Connected to Arkiv!")
27 owner_address = await client.get_owner_address()
28 print(f"Owner address: {owner_address}")
29
30 # Get and check client account balance
31 balance = await client.get_raw_client().http_client.get_balance(owner_address)
32 balance_eth = float(balance) / 10**18
33 print(f"Client account balance: {balance_eth} ETH")
34
35 if balance_eth == 0:
36 print("Warning: Account balance is 0 ETH. Please acquire test tokens from the faucet.")
37
38 # Set up real-time event watching
39 def on_created(args):
40 print("WATCH-> Create:", args)
41
42 def on_updated(args):
43 print("WATCH-> Update:", args)
44
45 def on_extended(args):
46 print("WATCH-> Extend:", args)
47
48 def on_deleted(args):
49 print("WATCH-> Delete:", args)
50
51 def on_error(error):
52 print("WATCH-> Error:", error)
53
54 block_number = await client.get_raw_client().http_client.get_block_number()
55 unsubscribe = client.watch_logs(
56 from_block=block_number,
57 on_created=on_created,
58 on_updated=on_updated,
59 on_extended=on_extended,
60 on_deleted=on_deleted,
61 on_error=on_error,
62 polling_interval=1000,
63 transport="websocket"
64 )
65
66 # 2. CREATE - Single entity with annotations
67 entity_id = str(uuid4())
68 entity = ArkivCreate(
69 data=json.dumps({
70 "message": "Hello from Arkiv!",
71 "timestamp": int(time.time() * 1000),
72 "author": "Developer"
73 }).encode(),
74 expires_in=300, # 300 seconds = 5 minutes
75 string_annotations=[
76 Annotation("type", "message"),
77 Annotation("event", "arkiv"),
78 Annotation("id", entity_id)
79 ],
80 numeric_annotations=[
81 Annotation("version", 1),
82 Annotation("timestamp", int(time.time() * 1000))
83 ]
84 )
85
86 create_receipts = await client.create_entities([entity])
87 entity_key = create_receipts[0].entity_key
88 print(f"Created entity: {entity_key}")
89
90 # 3. QUERY - Find entity by annotations
91 query_results = await client.query_entities(f'id = "{entity_id}" && version = 1')
92 print(f"Found {len(query_results)} matching entities")
93
94 for result in query_results:
95 data = json.loads(result.storage_value.decode())
96 print("Query result:", data)
97
98 # 4. UPDATE - Modify existing entity
99 update_data = ArkivUpdate(
100 entity_key=entity_key,
101 data=json.dumps({
102 "message": "Updated message from Arkiv!",
103 "updated": True,
104 "update_time": int(time.time() * 1000)
105 }).encode(),
106 expires_in=600, # 600 seconds = 10 minutes
107 string_annotations=[
108 Annotation("type", "message"),
109 Annotation("id", entity_id),
110 Annotation("status", "updated")
111 ],
112 numeric_annotations=[
113 Annotation("version", 2)
114 ]
115 )
116
117 update_receipts = await client.update_entities([update_data])
118 print(f"Updated entity: {update_receipts[0].entity_key}")
119
120 # 5. QUERY updated entity
121 updated_results = await client.query_entities(f'id = "{entity_id}" && version = 2')
122 print(f"Found {len(updated_results)} updated entities")
123
124 # 6. BATCH OPERATIONS - Create multiple entities
125 batch_entities = []
126 for i in range(5):
127 batch_entities.append(ArkivCreate(
128 data=f"Batch message {i}".encode(),
129 expires_in=100,
130 string_annotations=[
131 Annotation("type", "batch"),
132 Annotation("index", str(i))
133 ],
134 numeric_annotations=[
135 Annotation("sequence", i + 1) # Start from 1, not 0 (SDK bug with value 0)
136 ]
137 ))
138
139 batch_receipts = await client.create_entities(batch_entities)
140 print(f"Created {len(batch_receipts)} entities in batch")
141
142 # 7. EXPIRATION MANAGEMENT - Extend entity lifetime
143 extend_receipts = await client.extend_entities([{
144 'entity_key': entity_key,
145 'number_of_blocks': 100
146 }])
147 print(f"Extended expiration to block: {extend_receipts[0].new_expiration_block}")
148
149 # Check metadata to verify expiration
150 metadata = await client.get_entity_metadata(entity_key)
151 print(f"Entity expires at block: {metadata.expires_at_block}")
152
153 # 8. DELETE - Remove entity
154 delete_receipts = await client.delete_entities([entity_key])
155 print(f"Deleted entity: {delete_receipts[0].entity_key}")
156
157 # Clean up batch entities
158 for receipt in batch_receipts:
159 await client.delete_entities([receipt.entity_key])
160
161 # Stop watching events
162 unsubscribe()
163 print("Complete!")
164
165
166if __name__ == "__main__":
167 asyncio.run(main())