Создание прослушивателей темы eth_newFilter с помощью Python

Ethereum JSON RPC позволяет прослушивать события контракта через вызов eth_newFilter . Основываясь на неавторитетных источниках, eth_newFilter принимает параметр темы , который каким-то образом представляет собой хэш, вычисляемый из имени и подписи события.

Где объясняется, что такое параметр топиков и как его вычислять?

Как рассчитать параметр темы для известной подписи события Solidy в Python?

solc contract.sol --asmи проверьте значения опкодов события push

Ответы (1)

Вот пример слушателя:

"""Poll Ethereum blockchain, install log hooks to call contracts.

Using geth JSON RPC API: https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter

Copyright 2016 Mikko Ohtamaa - Licensed under MIT license.
"""


import logging
from typing import Callable, Iterable, List, Optional
import datetime


from .ethjsonrpc import EthJsonRpc

from .utils import sha3

#: Default logger
_logger = logging.getLogger(__name__)


#: Called when we spot a fired event. Callback is (contract_address, event_signature, api_data)
callback_type = Callable[[str, str, dict], None]


class ContractStatus:
    """Hold information about the processing status of a single contract."""

    def __init__(self, filter_id, last_updated_at):
        self.filter_id = filter_id
        self.last_updated_at = last_updated_at


def now() -> datetime.datetime:
    """Get the current time as timezone-aware UTC timestamp."""
    return datetime.datetime.now(datetime.timezone.utc)


def calculate_event_signature(decl: str) -> str:
    """Calculate bytecode signature of an event Solidy declaration.

    Example:

    .. code-block:

        assert calculate_event_signature("VerifyTokenSet(address,uint256)") == "3D2E225F28C7AAA8014B84B0DD267E297CB25A0B24CB02AB9C9FCF76F660F05F"

    To verify signature from the contract push opcodes:

    .. code-block:: console

        solc contract.sol --asm

    To debug transactions on Morden testnet

    * https://morden.ether.camp/transaction/9685191fece0dd0ef5a02210a305738be3fceb4089003924bc53de0cce0c0103

    http://solidity.readthedocs.io/en/latest/contracts.html#events

    https://github.com/ethereum/wiki/wiki/Solidity-Features#events
    """
    return "0x" + sha3(decl.encode("utf-8")).hex().lower()


class ContractListener:
    """Fetch updates on events Solidy contract posts to Ethereum blockchain.

    """

    def __init__(self, client: EthJsonRpc, events: Iterable[str], callback: callback_type, logger=_logger):
        """Create a contract listener.

        Callbacks look like:

        .. code-block:: python

            def cb(address, event, api_data)
                pass

        :param client: EthJsonRpc instance we use to connect to geth node
        :param events: List of Solidy event signatures we want to listne like like ``["Transfer(address,address,uint256)]``
        :param callback: Callable that's going to get called for every new event detected.
        :param logger: Optional
        """
        self.logger = _logger
        self.client = client
        self.events = events
        self.callback = callback
        self.event_signatures = {calculate_event_signature(e): e for e in events}

        #: Mapping contract address -> ContractStatus
        self.currently_monitored_contracts = {}

    def install_filter(self, contract_address: str):
        """Set up event filtering for a single contract using eth_newFilter.

        :param contract_address: hex string
        """

        installed_filter_id = self.client.eth_newFilter(from_block=0, address=contract_address)
        status = ContractStatus(filter_id=installed_filter_id, last_updated_at=None)
        self.currently_monitored_contracts[contract_address] = status

    def process_events(self, status: ContractStatus, changes: Optional[List[dict]]) -> int:

        updates = 0

        # Nothing changed
        if changes is None:
            return 0

        for change in changes:

            contract_address = change["address"]

            if contract_address not in self.currently_monitored_contracts:
                self.logger.warn("Received a change for non-monitored contract %s", contract_address)
                continue

            topics = change["topics"]
            if not topics:
                self.logger.warn("Did not get topics with change data %s", change)
                continue

            event_type = topics[0]
            if event_type not in self.event_signatures:
                self.logger.warn("Unknown event signature %s", change)
                continue

            try:
                self.callback(contract_address, self.event_signatures[event_type], change)
                updates += 1
            except Exception as e:
                # IF we have bad code for processing one contract, don't stop at that but keep pushing for others
                self.logger.error("Failed to update contract %s", contract_address)
                self.logger.exception(e)

        status.last_updated_at = now()

        return updates

    def fetch_all(self, contract: str) -> int:
        status = self.currently_monitored_contracts[contract]
        filter_id = status.filter_id

        # Signature different as for newFilter :(
        changes = self.client.eth_getLogs(dict(fromBlock=0, address=contract))
        return self.process_events(status, changes)

    def fetch_changes(self, contract) -> int:
        """Fetch latest events from geth.

        .. note ::

                The some transction might be posted twice due to ramp up and poll calls running differently.
                Always make sure callbacks handle this.

        :param contracts: List of contract addresses as hex string we are interested in

        :return: Number of callbacks made
        """
        status = self.currently_monitored_contracts[contract]
        filter_id = status.filter_id
        changes = self.client.eth_getFilterChanges(filter_id=filter_id)
        return self.process_events(status, changes)

    def monitor_contract(self, contract_address) -> int:
        """Start monitoring a contract and run callback for its all past events.

        If contract is already added do nothing.

        :param contract_address:
        :return: Number of triggered callbacks
        """
        assert type(contract_address) == str
        assert contract_address.startswith("0x")
        contract_address = contract_address.lower()

        if contract_address in self.currently_monitored_contracts:
            return 0

        self.install_filter(contract_address)

        return self.fetch_all(contract_address)

    def remove_contract(self, contract_address):
        del self.currently_monitored_contracts["contract_address"]

    def poll(self):
        """Fetch changes to all monitored contracts.

        Note that some events might be posted twice due to time elapse between ``monitor_contract`` and ``poll``.

        :return: Number of triggered callbacks
        """
        updates = 0
        for c in self.currently_monitored_contracts.keys():
            updates += self.fetch_changes(c)
        return updates

Вот пример использования:

import logging
import transaction

from sqlalchemy.orm import Session

from shareregistry.contractlistener import ContractListener
from shareregistry.models import Entity
from shareregistry.utils import bin_to_eth_address, eth_address_to_bin, txid_to_bin
from .ethjsonrpc import EthJsonRpc


logger = logging.getLogger(__name__)

class EntityUpdater:

    def __init__(self, client: EthJsonRpc, dbsession: Session):
        self.client = client
        self.dbsession = dbsession
        self.events = {
            "created": "Created(address,uint256,string,string)",
            "transfer": "Transfer(address,address,uint256)",
            "verify": "VerifyTokenSet(address,uint256)",
        }

        self.contract_listener = ContractListener(self.client, self.events.values(), self.on_blockchain_event)

    def on_blockchain_event(self, address: str, event: str, api_data: dict):
        """Called by ContractLister to tell us about the contract events.

        :param address: Contract address as a hex string
        :param event: One of event signatures
        :param api_data: eth_getFilterChanges() result https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
        :return:
        """
        logger.info("Received contract event %s for %s", event, address)

        bin_address = eth_address_to_bin(address)

        with transaction.manager:
            entity = self.dbsession.query(Entity).filter_by(contract_address=bin_address).one()

            if event == self.events["created"]:
                txid = txid_to_bin(api_data["transactionHash"])
                entity.initial_supply = 0
                entity.txid = txid
            elif event == self.events["transfer"]:
                from_address = api_data["topics"][1]
                to_address = api_data["topics"][2]
                value = int(api_data["data"][0])
                txid = api_data["transactionHash"]
            else:
                raise RuntimeError("Unknown event: {}".format(event))

    def update_all(self) -> int:
        """Poll geth and get updates for the contracts in our database."""
        updates = 0
        contracts = []

        # Get list of all known contracts in the database
        with transaction.manager:
            for e in self.dbsession.query(Entity).all():
                if not e.contract_address:
                    continue
                contracts.append(bin_to_eth_address(e.contract_address))

        # Each callback will run in its own db transaction context
        for c in contracts:
            updates += self.contract_listener.monitor_contract(c)

        updates += self.contract_listener.poll()

        return updates
Что, если в конечном итоге возникнет исключение «Не удалось обновить контракт», и некоторые изменения не будут записаны в вашу локальную БД? Разве это не означает, что вы не можете снова использовать eth_getFilterChanges , поскольку блокчейн будет думать, что вы уже получили эти изменения? Как вы справляетесь с этой ситуацией? Или я что-то упустил (в коде или в том, как работают фильтры)?
Я не поняла вашего вопроса. Пожалуйста, обратитесь к полному исходному коду github.com/websauna/websauna.wallet/blob/feat/ethereum-3/…
Я вижу, ты сделал это по-другому сейчас. Я просто хотел понять, как работает eth_getFilterChanges. AFAIU возвращает событие только один раз. Поэтому нужно убедиться, что его приложение не «потеряет» это событие. И если это произойдет, приложение должно будет получить это событие другим методом (например, как web3.get_logs).
@takeshi: я рекомендую не использовать состояние (новый фильтр, получить изменения фильтра), а управлять всем состоянием фильтра в коде приложения и использовать только getLogs.
@takeshi: у вас больше контроля, и вы сомневаетесь, что можете просто перечитать события и обновить базу данных приложения.
Да, это то, чем я сейчас занимаюсь. Спасибо, что развеяли мои сомнения!