An introduction to Autonomous Economic Agents representing data

This guide is an introduction to building a data-representation agent that can both represent static and dynamic data. The agent then advertises this data on the Open Economic Framework. In this example, we show an agent that represents weather data (real and/or fake) and makes that information available to be queried by other agents/clients. Available data can be filtered by date parameters.

This reference assumes your are running on a Mac or Linux-based computer and that you are following along with a Raspberry Pi and WeatherStation. If you are not using a Raspberry Pi & Weather station, this can all be run on one machine.

To follow this tutorial to completion you will need:

  • PostgreSQL Database. Version : 11
  • Raspberry Pi 3 Model B+
  • OEF SDK (Click here to get started with OEF SDK)
  • Fetch.AI Ledger (link)

Additionally, if you would like to connect directly to hardware, you will need:

This tutorial uses Python 3.6

Set Up your Environment

Download the Raspbian image zip from here. We recommend following the installation instructions that are detailed here. From here on the Raspberry Pi is referred to as the RPi.

Change directory to a working directory, and create a new folder named weatherStation.

Install requirements automatically

Create a folder named weather_station, and change to that directory

mkdir weather_station
cd weather_station

As good practice create a virtual environment and activate:

pip3 install virtualenv
virtualenv  env
source env/bin/activate

Download the requirements file:

Them install the requirements for the project by running:

pip3 install -r requirements.txt

Install requirements manually

OEF SDK set up

The open economic framework (OEF) package requires the Google Protocol Buffers compiler (version that is at least 2.0.0)

You can install Google Protocol Buffers by using the package manager:

sudo apt-get install protobuf-compiler

Create a folder called src, so your directory would look something like this: /weather_station/src

cd into src too.

Install Ledger

To install the ledger-api-py (The Python Ledger API) follow these steps :

git clone https://github.com/fetchai/ledger-api-py.git
cd ledger-api-py.git
sudo python3 setup.py install

Install OEF

To install the Pythons package follow these steps:

git clone https://github.com/fetchai/oef-sdk-python.git --recursive && cd oef-sdk-python/
sudo python3 setup.py install

Install PostgreSQL

To install Postgres open a terminal window and run the following command:

sudo apt install postgresql libpq-dev postgresql-client 
postgresql-client-common -y

To connect to your database from a Python script, you will need the Psycopg2 Python package.

You can install it with pip:

sudo pip3 install psycopg2

If connecting to hardware you will also need to install Pywws

Install Pywws

To install Pythons software for USB Wireless Weather Stations open a terminal windows and run the following command:

sudo pip3 install libusb1
sudo pip3 install pywws

With everything installed, you can start to create the project. It’s easiest to start with your data collection or creation, if you do not have the hardware for weather data collection, you can skip this part and follow “Collecting fake Data”.

We're not going to concentrate too much on the workings of the data collection, as they're likely to be expanded by us, and ultimately changed by you.

Collecting real Data

Create a new file and name it weatherParser.py, paste in the following code :

import time
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import pywws
from pywws import livelog
import datetime
import pprint

#Checking if the database exists#

con = psycopg2.connect(dbname='postgres')
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = con.cursor()
cur.execute("SELECT COUNT(*) = 0 FROM pg_catalog.pg_database WHERE datname = 'weather'")
not_exists_row = cur.fetchone()
not_exists = not_exists_row[0]
if not_exists:
    cur.execute('CREATE DATABASE weather')

cur.close()
con.commit()
con.close()

#################################

command = (''' CREATE TABLE IF NOT EXISTS data (
                                 abs_pressure REAL,
                                 delay REAL,
                                 hum_in REAL,
                                 hum_out REAL,
                                 idx TEXT,
                                 rain REAL,
                                 temp_in REAL,
                                 temp_out REAL,
                                 wind_ave REAL,
                                 wind_dir REAL,
                                 wind_gust REAL)''')


con = psycopg2.connect('dbname = weather')
con.autocommit = True
cur = con.cursor()
cur.execute(command)
cur.close()
con.commit()
if con is not None:
    con.close()



class Forecast():
    def addData(self,tagged_data) :
        con = psycopg2.connect('dbname = weather')
        con.autocommit = True
        cur = con.cursor()
        command = ('''INSERT INTO data(abs_pressure, 
                                       delay,
                                       hum_in,
                                       hum_out,
                                       idx,
                                       rain,
                                       temp_in,
                                       temp_out,
                                       wind_ave,
                                       wind_dir,
                                       wind_gust) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''')
        
        
        cur.execute(command,(tagged_data['abs_pressure'], 
                             tagged_data['delay'],
                             tagged_data['hum_in'], 
                             tagged_data['hum_out'],
                             datetime.datetime.now().strftime('%s'),
                             tagged_data['rain'],
                             tagged_data['temp_in'],
                             tagged_data['temp_out'],
                             tagged_data['wind_ave'],
                             tagged_data['wind_dir'],
                             tagged_data['wind_gust']))
        con.commit()
        cur.close()
        con.close()
        
        m_time = datetime.datetime.now().strftime('%s')
        print( m_time)
        print(time.ctime(int(m_time)))
        print(tagged_data['idx'])

        
    
    def main(self):

        ws = pywws.weatherstation.WeatherStation()
        for data, ptr ,logged in ws.live_data() :
            print(data)
            self.addData(data)
    
if __name__ == '__main__':
    a = Forecast()
    a.main()

addData(self,tagged_data): establishes a connection with the database and populates the database with data from the weather forecaster.

Main(self): Creates a weather station object in order to ask for live_data. Then it loops inside the data in order to pass the items to the add_data method and store them in the database.

Open a new terminal window and run the script:

python3 weatherParser.py

Creating/Collecting fake data

In case you don’t own a weather station you can paste in the following code to emulate the data from a weather station :

import time
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import datetime
import random


#Checking if the database exists#
con = psycopg2.connect(dbname='postgres')
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = con.cursor()
cur.execute("SELECT COUNT(*) = 0 FROM pg_catalog.pg_database WHERE datname = 'weather_fake'")
not_exists_row = cur.fetchone()
not_exists = not_exists_row[0]
if not_exists:
    cur.execute('CREATE DATABASE weather_fake')

cur.close()
con.commit()
con.close()
###############################

command = (''' CREATE TABLE IF NOT EXISTS data (
                                 abs_pressure REAL,
                                 delay REAL,
                                 hum_in REAL,
                                 hum_out REAL,
                                 idx TEXT,
                                 rain REAL,
                                 temp_in REAL,
                                 temp_out REAL,
                                 wind_ave REAL,
                                 wind_dir REAL,
                                 wind_gust REAL)''')


con = psycopg2.connect('dbname = weather_fake')
con.autocommit = True
cur = con.cursor()
cur.execute(command)
cur.close()
con.commit()
if con is not None:
    con.close()



class Forecast():
    def addData(self,tagged_data) :
        con = psycopg2.connect('dbname = weather_fake')
        con.autocommit = True
        cur = con.cursor()
        command = ('''INSERT INTO data(abs_pressure, 
                                        delay,
                                        hum_in,
                                        hum_out,
                                        idx,
                                        rain,
                                        temp_in,
                                        temp_out,
                                        wind_ave,
                                        wind_dir,
                                        wind_gust) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''')
        
        
        cur.execute(command,(tagged_data['abs_pressure'], 
                             tagged_data['delay'],
                             tagged_data['hum_in'], 
                             tagged_data['hum_out'],
                             datetime.datetime.now().strftime('%s'),
                             tagged_data['rain'],
                             tagged_data['temp_in'],
                             tagged_data['temp_out'],
                             tagged_data['wind_ave'],
                             tagged_data['wind_dir'],
                             tagged_data['wind_gust']))
        con.commit()
        cur.close()
        con.close()
        
        m_time = datetime.datetime.now().strftime('%s')
        print( m_time)
        print(time.ctime(int(m_time)))
        print(tagged_data['idx'])

        
    
    def main(self):
        while True : 
            dict_of_data = {}
            dict_of_data['abs_pressure'] = random.randrange(1022.0 , 1025, 1)
            dict_of_data['delay'] = random.randint(2, 7)
            dict_of_data['hum_in'] = random.randrange(33.0 , 40.0 , 1)
            dict_of_data['hum_out'] = random.randrange(33.0 , 80.0 , 1)
            dict_of_data['idx'] = datetime.datetime.now()
            dict_of_data['rain'] = random.randrange(70.0, 74.0 , 1)
            dict_of_data['temp_in'] = random.randrange(18, 28 , 1)
            dict_of_data['temp_out'] = random.randrange(2,20 , 1)
            dict_of_data['wind_ave'] = random.randrange(0, 10 , 1)
            dict_of_data['wind_dir'] = random.randrange(0 , 14 , 1)
            dict_of_data['wind_gust'] = random.randrange(1 , 7, 1 )
            print(dict_of_data)
            self.addData(dict_of_data)
            time.sleep(5)
    
if __name__ == '__main__':
    a = Forecast()
    a.main()

addData(self,tagged_data): establishes a connection with the database and populates the database with data passed to it from main().

Main(): every 5 seconds creates fake data in the structure the DB is expecting, this is then added to the DB.

Open a terminal window and run the script:

python3 weatherParser.py

Create the Agent

Agents on the OEF (Open Economic Framework) require a DataModel to be defined; this can be done inline or as a separate file, or stored on a DB (etc). This allows a relatively fixed way for Agents to find one another, and allows the OEF to be match agents as they search/conduct. DataModels are not strict though, the OEF in later releases handles the ontology and semantics of this structure text well, allowing languages and similar words to match correctly, enforcing agents to not be limited by word choice.

We create a DataModel as a standalone python file in this example; so first let's

Create a new file and name it weather_station_dataModel.py

  1. Paste in the following code:
from typing import List
import os, sys

from oef.query import Eq, Range, Constraint, Query, AttributeSchema
from oef.schema import DataModel, Description , Location


class WEATHER_STATION_DATAMODEL (DataModel):
    ATTRIBUTE_ID = AttributeSchema("id",str, True, "Identification")
    ATTRIBUTE_COUNTRY = AttributeSchema("country", str, True, "Country")
    ATTRIBUTE_CITY = AttributeSchema("city", str, True, "City")
    
    def __init__(self):
        super().__init__("weatherStation_datamodel",[self.ATTRIBUTE_ID,
                                                        self.ATTRIBUTE_COUNTRY,
                                                        self.ATTRIBUTE_CITY],
                                                        "A localizable weather agent")

As a simple example, this is all that is needed for the agent to register as a service on the OEF.

We import DataModel from oef.schema which is a class representing a DataModel (in Python), effectively calling the super on the inherited object DataModel, which then initalises the DataModel, and checks the DataModel's validity. You can see, and read more about this at: https://github.com/fetchai/oef-sdk-python/blob/master/oef/schema.py

DataModels are a very large topic, we'll be covering them in greater detail soon.

Communicating with the Database

Create a file and name it db_communication.py

Paste in the following code :

import psycopg2
import datetime
import time


class Db_communication():

	source = None
	def __init__(self, source):
		self.source = source

	def db_connection(self):
		con = None
		print (self.source)
		if self.source is "fake":
			con = psycopg2.connect('dbname =  weather_fake')
		else:
			con = psycopg2.connect('dbname =  weather')

		con.autocommit = True
		return con


	def specific_dates (self, start, end) :

		con = self.db_connection()
		cur = con.cursor()
		
		if type(start) is str :
			start = datetime.datetime.strptime(start,'%d/%m/%Y')
			start = start.strftime('%s')
		if type(end) is str :
			end = datetime.datetime.strptime(end,'%d/%m/%Y')
			end = end.strftime('%s')
		command = "SELECT * FROM data WHERE idx BETWEEN %s::Text AND %s::Text"
		cur.execute(command, (float(start),float(end),))
		data = cur.fetchall()

		cur.close()
		con.close()
		return data


if __name__ == '__main__':
	a = Db_communication()
	date1 = "10/12/2018"
	date2 = "9/12/2018"
	test = a.specific_dates(date2, date1)

These methods are used by the agent to search for weather within specifed start and end dates, this could be more complicated and specilaised, but for the purpose of this reference we are keeping it simple.

The Agent

Create a file and name it weatherAgent.py

Paste in the following code:

import base64
import hashlib
import binascii


from fetchai.ledger.api import TokenApi, TransactionApi
from fetchai.ledger.crypto import Identity

from oef.agents import OEFAgent
from typing import List
import os, sys

from codecs import encode

import json
import time
import struct
from struct import * 

from oef.proxy import  OEFProxy, PROPOSE_TYPES
from oef.query import Eq, Range, Constraint, Query, AttributeSchema, Distance 
from oef.schema import DataModel, Description , Location
from oef.messages import CFP_TYPES


import weather_station_dataModel
from weather_station_dataModel import WEATHER_STATION_DATAMODEL

import db_communication
from db_communication import Db_communication

import json
import datetime
 
class WeatherAgent(OEFAgent):
    
    def __init__(self, public_key: str, oef_addr: str, db_source : str, oef_port: int = 3333):
        super().__init__(public_key, oef_addr, oef_port)

        self.scheme = {}
        self.scheme['country'] = None
        self.scheme['city'] = None
        self.description = None
        self.db = db_communication.Db_communication(db_source)
        self.identity = Identity()
        print("your private key: " + self.identity.private_key)
        self.tokens = TokenApi("ledger.economicagents.com", 80)
        self.balance = self.tokens.balance(self.identity.public_key)
        self.fetched_data = None
        self.price_per_row = 0.02
        self.totalPrice = 0

    def on_cfp(self, msg_id: int, dialogue_id: int, origin: str, target: int, query: CFP_TYPES):
        """Send a simple Propose to the sender of the CFP."""
        print("[{0}]: Received CFP from {1}".format(self.public_key, origin))

        print(query.constraints[0].constraint.values)

        self.fetched_data  = self.db.specific_dates(query.constraints[0].constraint.values[0], 
                                                    query.constraints[0].constraint.values[1])
        if len(self.fetched_data) >= 1 : 
            print(len(self.fetched_data))
            self.totalPrice = self.price_per_row * len(self.fetched_data)
            proposal = Description({"Rows" : len(self.fetched_data),
                                    "Price" : self.totalPrice })
            print("[{}]: Sending propose at price: {}".format(self.public_key, self.totalPrice))
            self.send_propose(msg_id + 1, dialogue_id, origin, target + 1, [proposal])
        else :
            #self.send_propose(msg_id + 1, dialogue_id, origin, target + 1, [])
            self.send_decline(msg_id + 1, dialogue_id, origin, target + 1)

    def on_accept(self, msg_id: int, dialogue_id: int, origin: str, target: int):
        """Once we received an Accept, send the requested data."""
        print("[{0}]: Received accept from {1}."
              .format(self.public_key, origin))

        command = {}
        command['Public_Key'] = binascii.hexlify(self.identity.public_key_bytes).decode()
        msg = json.dumps(command)
        self.send_message(0,dialogue_id, origin, msg.encode())

 
    def on_decline(self, msg_id: int, dialogue_id: int, origin: str, target: int):
        print("declined")
        

    def on_message(self, msg_id: int, dialogue_id: int, origin: str, content: bytes):
        data = json.loads(content.decode())

        if data['Command'] == "Executed" :
            correct_balance = self.balance + int(self.totalPrice)

            if correct_balance == self.tokens.balance(self.identity.public_key) :
                print("Success")
                self.balance = correct_balance
                print(self.balance)
                command = {}
                command['Command'] = "success"
                command['fetched_data'] = [] 
                
                for items in self.fetched_data :
                    dict_of_data = {}
                    dict_of_data['abs_pressure'] = items[0]
                    dict_of_data['delay'] = items[1]
                    dict_of_data['hum_in'] = items[2]
                    dict_of_data['hum_out'] = items[3]
                    dict_of_data['idx'] = time.ctime(int(items[4]))
                    dict_of_data['rain'] = items[5]
                    dict_of_data['temp_in'] = items[6]
                    dict_of_data['temp_out'] = items[7]
                    dict_of_data['wind_ave'] = items[8]
                    dict_of_data['wind_dir'] = items[9]
                    dict_of_data['wind_gust'] = items[10]
                    command['fetched_data'].append(dict_of_data)
                

                msg = json.dumps(command)
                print("Sending Data")
                self.send_message(0,dialogue_id,origin,msg.encode())
            else : 
                print("Fail")
                command = {}
                command['Command'] = "fail"
                msg = json.dumps(command)
                self.send_message(0,dialogue_id,origin,msg.encode())

 
if __name__ == '__main__':

    # sys arg fake or actual

    if len(sys.argv) < 3:
        sys.exit("required ID and db source")

    else:
 
        # create agent and connect it to OEF
        server_agent = WeatherAgent("weather_station_{}".format(sys.argv[1]), oef_addr="oef.economicagents.com", db_source=sys,argv[2], oef_port=3333)
        
        server_agent.scheme['country'] = "UK"
        server_agent.scheme['city'] = "Cambridge"

        server_agent.connect()
     
        # register a service on the OEF
        server_agent.description = Description(server_agent.scheme, weather_station_dataModel.WEATHER_STATION_DATAMODEL())
     
        server_agent.register_service(0,server_agent.description)
     
        # run the agent
        server_agent.run()

An agent's components are very important so we're going to break this down a little now, you may have already read our breakdown of the agents on the OEF here.

The agent has to be run by defining it's ID and whether or not you are connecting to real, or fake data.

python3 weatherAgent.py 12 fake

Now, let us go over the methods and their use.

__init__
: Initialises the variables for the specific agent, and creates the reference to the DB_communication

Self.identity stores the public and the private key of this agent. We need these keys in order to accept payments and send tokens.

on_cfp : It sends a proposal to the client who asks for details.

on_accept: Sends a registered command to the client and starts a discussion.

on_decline: Prints a message on the terminal just to inform the user.

on_message : This is the messaging logic when we communicate with a client.

You can run the script by typing python3 weatherAgent.py agentId price on a new Terminal window

For example : python3 weatherAgent.py 2 30 . The id of the agent is weather_station_2 and the price for communicating with this agent is 30

Finally, let’s create a client in order to be able to communicate with this agent.

  1. Create a new file and name it weather_client.py
  2. Paste in the following code :
import base64
import hashlib
import binascii
from typing import List

from fetchai.ledger.api import TokenApi, TransactionApi
from fetchai.ledger.crypto import Identity
from oef.messages import PROPOSE_TYPES


from oef.agents import OEFAgent
from oef.schema import DataModel, AttributeSchema
from oef.query import Query, Constraint, Eq , NotEq ,Range,And


import weather_station_dataModel

import json
import datetime

import time


class ClientAgent(OEFAgent):
    """
    The class that defines the behaviour of the echo client agent.
    """
    def __init__(self, public_key: str, oef_addr: str, oef_port: int = 3333):
        super().__init__(public_key, oef_addr, oef_port)
        self.identity = Identity()
        self.txs = TransactionApi("ledger.economicagents.com" , 80)
        self.tokens = TokenApi("ledger.economicagents.com" , 80)
        print('Submitting wealth creation...')
        self.wait_for_tx(self.txs, self.tokens.wealth(self.identity.private_key_bytes, 1000))
        self.cost = 0
        self.pending_cfp = 0
        self.received_proposals = []
        self.received_declines = 0
    
    def wait_for_tx(self, txs: TransactionApi, tx: str):
        while True:
            if txs.status(tx) == "Executed":
                break
        time.sleep(1)  



    def on_message(self, msg_id: int, dialogue_id: int, origin: str, content: bytes):
        data = json.loads(content.decode())
       
        if "Public_Key" in data.keys():
            self.make_the_payment(data, origin, dialogue_id)
        if "Command" in data.keys() :
            if data['Command'] == "success" :
                for items in data['fetched_data'] :
                    #print(items) # Uncomment if you want to print out the data
                    pass
               self.stop()

            if "fail" in data.keys() :
                pass
                self.stop()
                

    def make_the_payment(self, data, origin,dialogue_id) :
        print("sending the correct amount")
        self.wait_for_tx(self.txs, self.tokens.transfer(self.identity.private_key_bytes,
                                               binascii.unhexlify(data['Public_Key'].encode()),
                                               self.cost))

        print("Sending executed command")
                
        command = {}
        command['Command'] = "Executed"

        print(self.tokens.balance(self.identity.public_key))
        msg = json.dumps(command)
        self.send_message(0,dialogue_id,origin,msg.encode())
       

    def on_search_result(self, search_id: int, agents: List[str]):
        """For every agent returned in the service search, send a CFP to obtain resources from them."""
        if len(agents) == 0:
            print("[{}]: No agent found. Stopping...".format(self.public_key))
            self.stop()
            return

        print("[{0}]: Agent found: {1}".format(self.public_key, agents))

        for agent in agents:
            
            print("[{0}]: Sending to agent {1}".format(self.public_key, agent))
            query = Query([Constraint("Date", Range( ("20/3/2019","21/3/2019") ))])
            self.pending_cfp += 1
            self.send_cfp(1, 0, agent, 0, query)

    def on_propose(self, msg_id: int, dialogue_id: int, origin: str, target: int, proposals: PROPOSE_TYPES):
        """When we receive a Propose message, answer with an Accept."""
        print("[{0}]: Received propose from agent {1}".format(self.public_key, origin))
             
        for i,p in enumerate(proposals):
            self.received_proposals.append({"agent" : origin, 
                                            "proposal":p.values})
        received_cfp = len(self.received_proposals) + self.received_declines 
        print(received_cfp)
        print(received_cfp == self.pending_cfp)
        print(self.pending_cfp)


        if received_cfp == self.pending_cfp :
            print("I am here")
            if len( self.received_proposals) >= 1 :
                self.received_proposals = sorted(self.received_proposals, key= lambda i : int(i['proposal']['Price']))
            
                
                for i in range(1 , len(self.received_proposals)) :
                    print("Sending decline !")
                    self.send_decline(msg_id,dialogue_id, self.received_proposals[i]['agent'],msg_id + 1)
               
                self.cost = int(self.received_proposals[0]['proposal']['Price'])    
                self.send_accept(msg_id,dialogue_id,self.received_proposals[0]['agent'],msg_id + 1)
            else : 
                print("No proposals for the specific dates.")
                self.stop()

    def on_decline(self, msg_id: int, dialogue_id: int, origin: str, target: int) :
        print("Received a decline!")
        self.received_declines += 1

if __name__ == '__main__':

    # define an OEF Agent
    client_agent = ClientAgent("agent_client", oef_addr="oef.economicagents.com", oef_port=3333)

    # connect it to the OEF Node
    client_agent.connect()

    # query OEF for DataService providers
    echo_query = Query([Constraint("country", Eq("UK"))],
                        weather_station_dataModel.WEATHER_STATION_DATAMODEL())

    print("Make search to the OEF")
    client_agent.search_services(0, echo_query)

    # wait for events
    client_agent.run()

__init__: Initialises the variables for the agent. Generates a wallet ( private and public key) and connects to the test network.

wait_for_tx: Waits for a message from the ledger to say that the transaction has been executed.

on_message: This is the main function that enables the client to “speak” with the agent-service.

make_the_payment: It sends the amount to the service and inform the agent that the transaction has been executed.

on_search_result: Return a list of available agents and asks them for a proposal. Based on the query.

on_propose: It receives the proposals from the service-agents, it appends them in a list. When the amount of proposals is equal to the number of agents it communicated, it sorts the list based on the price and it communicate with the cheaper service-agent.

You can find more details on how to develop agents here

Address

St. John's Innovation Centre,
Cowley Road,
Cambridge, CB4 0WS, UK

Go to map
Technical: [email protected]
Investors: [email protected]