Affiliate Program

Part 3

## Recap * Architectural Overview * Contracts * Dapps
## Today * Affiliate Monitoring Microservice
## Affiliate Monitoring Service * Written in Go * One of about 20 OpenRelay microservices * One of about 5 OpenRelay services to monitor for events
A Tour of Go

Block Monitor

Boilerplate


							package blocks

							import (
								"context"
								"encoding/json"
								"time"
								"errors"
								"math/big"
								"github.com/ethereum/go-ethereum"
								"github.com/ethereum/go-ethereum/ethclient"
								"github.com/ethereum/go-ethereum/common"
								"github.com/ethereum/go-ethereum/core/types"
								"github.com/notegio/openrelay/channels"
								"log"
							)
						

Interfaces


							//Interface for getting block headers by hash and number
							type HeaderGetter interface {
								HeaderByNumber(context.Context, *big.Int) (*types.Header, error)
								HeaderByHash(context.Context, common.Hash) (*types.Header, error)
							}

							// Interface for persisting block numbers in case of restart
							type BlockRecorder interface {
								Record(*big.Int) (error)
								Get() (*big.Int, error)
							}
						

Structs


							// JSON encodable version of the block with data required by
							// downstream services
							type MiniBlock struct {
								Hash   common.Hash  `json:"hash"`
								Number *big.Int     `json:"number"`
								Bloom  types.Bloom  `json:"bloom"`
							}
							// The items tracked by the core block monitor
							type BlockMonitor struct {
								brb           *blockRingBuffer   // Keeps a fixed number of blocks
								headerGetter  HeaderGetter       // Ethereum client
								publisher     channels.Publisher // Channel publisher
								queryInterval time.Duration      // Frequency to poll RPC
								blockRecorder BlockRecorder      // Persist data for recovery
								quit          chan bool          // Tells us when to quit
							}
						

Processing Loop


						func (bm *BlockMonitor) Process() error {
						  // Start Resumption Process
						  blockNumber, err := bm.blockRecorder.Get()
						  // If we get an error retrieving the last block, log it, but continue.
							// A nil blockNumber will retrieve the latest headers.
						  if err != nil {
						    log.Printf("Error getting block number: %v", err.Error())
						  }
						  header, err := bm.headerGetter.HeaderByNumber(
						    context.Background(),
						    blockNumber
						  )
						  if err != nil { return err }
						  log.Printf("Starting Block: Number: '%v' - Hash: '%#x'", header.Number, header.Hash())
						  // Track the block in the RingBuffer to handle chain re-orgs.
						  bm.brb.Add(&MiniBlock{
						    header.Hash(),
						    header.Number,
						    header.Bloom,
						  })
						  // Only publish the initial block if blocknumber == 0. For later blocks, we
						  // should have published the block in an earlier iteration, so we don't need
						  // to publish it now.
						  if header.Number.Int64() == 0 {
						    if err := bm.publish(bm.brb.Get(0)); err != nil {
						      log.Printf("Error publishing block")
						      return err
						    }
						  }
						  MAIN_PROCESSING:
						  for {
						    select {
						    case _ = <-bm.quit:
						      // Return on the quite signal.
						      return nil
						    default:
						    }
						    // Ask the headerGetter for the last known block + 1.
						    header, err = bm.headerGetter.HeaderByNumber(
						      context.Background(),
						      new(big.Int).Add(bm.brb.Get(0).Number, big.NewInt(1))
						    )
						    if err == ethereum.NotFound {
						      // If no block is available, sleep for a bit and try again.
						      time.Sleep(bm.queryInterval)
						      continue
						    } else if err != nil {
						      // If we got an unexpected error, return it.
						      log.Printf("error getting header for block %v", new(big.Int).Add(bm.brb.Get(0).Number, big.NewInt(1)))
						      return err
						    }
						    // In the event of a chain reorg, the current block's parent won't be
						    // present in our block ring buffer. We need to follow the block's parents
						    // backwards until we find a recognized ancestor, or until we've exhausted
						    // our ring buffer.
						    counter := 0
						    for bm.brb.HashIndex(header.ParentHash) == -1 && counter < bm.brb.size {
						      if header.Number.Int64() == 0 {
						        break
						      }
						      log.Printf("Getting parent for header: %v - %#x (Probable chain reorg)", header.Number.Int64(), header.ParentHash)
						      counter++
						      parentHash := header.ParentHash
						      header, err = bm.headerGetter.HeaderByHash(context.Background(), header.ParentHash)
						      if err == ethereum.NotFound {
						        log.Printf("HeaderGetter is missing parent hash: %#x", header.ParentHash)
						        time.Sleep(bm.queryInterval)
						        continue MAIN_PROCESSING
						      } else if err != nil {
						        log.Printf("error getting header for hash %#x", parentHash)
						        return err
						      }
						    }
						    if bm.brb.HashIndex(header.Hash()) != -1 {
						      log.Fatalf("No parents found, but current block already exists. It's likely that block.Hash() is not being computed properly somewhere.")
						    }
						    // At this point we either have the next header, we have wound back to the
						    // beginning of a reorg, or we've wound back as far as we can given our
						    // ring buffer size
						    bm.brb.Add(&MiniBlock{
						      header.Hash(),
						      header.Number,
						      header.Bloom,
						    })
						    log.Printf("Published Block %v - %#x", bm.brb.Get(0).Number, bm.brb.Get(0).Hash)
						    if err := bm.publish(bm.brb.Get(0)); err != nil {
						      return err
						    }
						  }
						}
						

Other Methods


							func (bm *BlockMonitor) publish(block *MiniBlock) error {
								data, err := json.Marshal(block)
								if err != nil {
									return err
								}
								result := bm.publisher.Publish(string(data))
								if result {
									return bm.blockRecorder.Record(block.Number)
								} else {
									return errors.New("Failed to publish block")
								}
							}
							func (bm *BlockMonitor) Stop() {
								bm.quit <- true
							}
						

Constructors


							func NewBlockMonitor(headerGetter HeaderGetter, publisher channels.Publisher, interval time.Duration, blockRecorder BlockRecorder, brbSize int) (*BlockMonitor) {
								return &BlockMonitor{
									newBlockRingBuffer(brbSize),
									headerGetter,
									publisher,
									interval,
									blockRecorder,
									make(chan bool),
								}
							}
							func NewRPCBlockMonitor(rpcURL string, publisher channels.Publisher, interval time.Duration, blockRecorder BlockRecorder, brbSize int) (*BlockMonitor, error) {
								client, err := ethclient.Dial(rpcURL)
								if err != nil {
									return nil, err
								}
								return NewBlockMonitor(client, publisher, interval, blockRecorder, brbSize), nil
							}
						
## Affiliate Monitoring Service

Boilerplate


							package affiliate

							import (
								"encoding/json"
								"math/big"
								"context"
								"github.com/ethereum/go-ethereum"
								"github.com/ethereum/go-ethereum/common"
								"github.com/ethereum/go-ethereum/ethclient"
								"github.com/notegio/openrelay/channels"
								"github.com/notegio/openrelay/types"
								"github.com/notegio/openrelay/affiliates"
								"github.com/notegio/openrelay/monitor/blocks"
								"gopkg.in/redis.v3"
								"log"
							)
						

Structs


							type affiliateBlockConsumer struct {
								affiliateSignupAddress   *big.Int
								affiliateService         affiliates.AffiliateService
								logFilter                ethereum.LogFilterer
							}
						

Consume


							func (consumer *affiliateBlockConsumer) Consume(delivery channels.Delivery) {
							  block := &blocks.MiniBlock{}
							  err := json.Unmarshal([]byte(delivery.Payload()), block)
							  if err != nil {
							    log.Printf("Error parsing payload: %v\n", err.Error())
							    delivery.Reject()
							    return
							  }
							  affiliateTopic := &big.Int{}
							  affiliateTopic.SetString("60dad0d232381238c031553102e3a2d779bda5a9507ec806820542b3da2801eb", 16)
							  if block.Bloom.Test(consumer.affiliateSignupAddress) && block.Bloom.Test(affiliateTopic) {
							    query := ethereum.FilterQuery{
							      FromBlock: block.Number,
							      ToBlock: block.Number,
							      Addresses: []common.Address{common.BigToAddress(consumer.affiliateSignupAddress)},
							      Topics: [][]common.Hash{
							        []common.Hash{common.BigToHash(affiliateTopic)},
							        nil,
							        nil,
							      },
							    }
							    logs, err := consumer.logFilter.FilterLogs(context.Background(), query)
							    if err != nil {
							      delivery.Return()
							      log.Fatalf("Failed to filter logs on block %v - aborting: %v", block.Number, err.Error())
							    }
							    for _, affiliateLog := range logs {
							      affiliate := affiliates.NewAffiliate(nil, 100)
							      affiliateAddress := &types.Address{}
							      copy(affiliateAddress[:], affiliateLog.Data[12:32])
							      if err := consumer.affiliateService.Set(affiliateAddress, affiliate); err != nil {
							        delivery.Return()
							        log.Fatalf("Error registering affiliate: %#x", affiliateAddress[:])
							      }
							      log.Printf("Added affiliate: %#x", affiliateAddress[:])
							    }
							  }
							  delivery.Ack()
							}
						

Command


							package main

							import (
							  "github.com/notegio/openrelay/monitor/affiliate"
							  "github.com/notegio/openrelay/channels"
							  "gopkg.in/redis.v3"
							  "os/signal"
							  "os"
							  "log"
							)

							func main() {
							  redisURL := os.Args[1]
							  rpcURL := os.Args[2]
							  src := os.Args[3]
							  affiliateSignupAddress := os.Args[4]
							  redisClient := redis.NewClient(&redis.Options{
							    Addr: redisURL,
							  })
							  consumerChannel, err := channels.ConsumerFromURI(src, redisClient)
							  if err != nil {  log.Fatalf("Error constructing consumer: %v", err.Error()) }
							  consumer, err := affiliate.NewRPCAffiliateBlockConsumer(rpcURL, affiliateSignupAddress, redisClient)
							  if err != nil { log.Fatalf("Error constructing affiliate monitor: %v", err.Error()) }
							  consumerChannel.AddConsumer(consumer)
							  consumerChannel.StartConsuming()
							  log.Printf("Started consuming blocks from channel %v for signUp %v", src, affiliateSignupAddress)
							  c := make(chan os.Signal, 1)
							  signal.Notify(c, os.Interrupt)
							  for _ = range c {
							    break
							  }
							  consumerChannel.StopConsuming()
							}

						
## Next time * LitElement Widgets