资源说明:A sample elasticsearch river able to stream json data in
--- layout: tutorial title: Creating a streaming JSON river for data import cat: tutorials author: Alexander Reelsen tutorial_desc: Creating a river, which streams data in --- Disclaimer I: You can get the "source at github":https://github.com/spinscale/elasticsearch-river-streaming-json Disclaimer II: This article will not introduce you how to write rivers! h2. Using rivers for automated data import into elasticsearch One of the core questions of elasticsearch integration is how to index the data into elasticsearch. In most cases there will be an existing system, from which the data gets imported into elasticsearch. A simple case would be to import the data from the database directly into elasticsearch (for example by using the "JDBC river":https://github.com/jprante/elasticsearch-river-jdbc by Jörg Prante). However this only works out in case all the data is stored in the database and not calculated dynamically - prices for example might be calculated dynamically and not stored in the database. Furthermore this implies, that you have full access to the database. This article will cover a possible integration scenario, in case you will not be able to index directly from your database, but use a river, which imports new data via a JSON export from the main system. The samples used here will feature an ecommerce system using a SQL database, which uses elasticsearch as a product search engine. At least this is a setup where a concrete implementation of this river is running at my current employer "lusini.de":http://www.lusini.de. h2. Functional requirements of any importing river * Update often: As this is a product search engine, the product search engine needs to be updated in a lot of cases like a product running out of stock, price changes, product name or description changes, product deletions or product creations. As most ecommerce stores do not need realtime product information (though it is a nice to have feature), it is sufficient to query the ecommerce system every few seconds for changes. * Incremental updates: This is very important. If you implement a river, which queries the main ecommerce system every 30 seconds you will definately not want to do a full export everytime and possibly import hundreds of thousands of products, but only export products which have changed since the last update. * Handling deletes: A product may not be searchable anymore, if it cannot be bought. For example it might have run out of stock or was simply deleted in the ecommerce system. * Bulk updates: As you never know, how many products are updated in one run. If a new product import with thousands of products has happened, you need to import those, if only one product ran out of stock due to a checkout, you will only need to update one product. As a conclusion you should always use bulk updates in a river. * Streaming: As the amount of imported products is not known, the river should stream the data in. This means that data is already imported, while the data is not yet read completely from the ecommerce server. You always have to think about whether this is what you really want. If the import fails you might have already imported some data. This is practical in case of a product import like this, but might be very impractical in other cases. h2. The request The river has to query a certain endpoint in the ecommerce system which includes information about the last update, in order to only update the products as needed. In order to support incremental updates, every request from the river to the ecommerce system has to include something like a timestamp, from which on all changes should be exported. This can be a unix timestamp or a concrete date like these:http://ecommerce.system/export?fromTimestamp=1348084755 http://ecommerce.system/export?fromTimestamp=20120920-165523This timestamp has to be stored in the river configuration and updated after every data import. This allows the river always to know when the last import happened, even if you stop the elasticsearch server or the river import functionality. This even works in case the ecommerce system is not reachable, when you take the timestamp from the data returned from the ecommerce system. h2. Sample JSON data The JSON used to import or delete products from the river should look like this in order to be parsed from the river.{ "timestamp" : "2012-09-20 16:55:23", "products" : [ { "action" : "index", "product" : { "id" : "5", "name" : "IPhone 4s", "price" : 400.00, "stock" : 123, } }, { "action" : "index", "product" : { "id" : "12", "name" : "IPhone 5", "price" : 700.00 } }, { "action" : "delete", "product" : { "id" : "16", } } ] }You see the timestamp, which can simply be the timestamp when the river connected to the ecommerce system. Then there is a products array, which includes products to be indexed as well as those to be deleted. As you only need an ID for deletion, there is no need to submit the complete metadata of a product to be deleted - perhaps this data does not exist even more. h2. The river implementation In order to support streaming and importing new data into elasticsearch the river needs two core features: * It has to stream the data and parse it while the request has not finished yet and the data is still flowing in * It needs a data structure where the slurper (the thread reading the data from the ecommerce system) can put new data in, while the indexer (the thread doing the actual indexing) can read from it. Fortunately elasticsearch comes builtin with some of these structures. The one used in this example is a @TransferQueue@. It is declared in the @JsonRiver@ class like this:private final TransferQueueThe slurper uses this queue to add new products. A RiverProduct is a simple abstraction class, which includes an @id@ needed for indexing, the @action@ to be taken (index or delete) and the @product@ itself, a simple hashmap. In case a new product should be indexed, the slurper calls this code:stream = new LinkedTransferQueue (); queue.add(RiverProduct.index(id, product));Whereas for a deletion only an id is neededqueue.add(RiverProduct.delete(id));h2. Reading json data from a stream The slurper has a helper class called @RiverImporter@ which includes the logic to add new products to the queue after having read them from the stream. Adding products has been described above. The product is a simple @Map@. The first thing needed is an inputstream, which reads from an URL. You would want to use some nice HTTP client library like "async-http-client":https://github.com/sonatype/async-http-client for this, but after reading "this post":http://www.cowtowncoder.com/blog/archives/2012/05/entry_475.html, it turned out, that it does not really stream the data out. So sticking with a normal @HttpURLConnection@ in this case is sufficient. Do not use @HttpUrlConnection@, if you need to configure features like HTTP basic authentication or have to support SSL for self signed certificates (it works but is not nice to configure). As elasticsearch comes with the powerful "jackson JSON parsing library":http://jackson.codehaus.org/ builtin, you do not need any external libraries for streaming JSON in. This is the code used to add products while streaming json data in (from @RiverImporter@ class): public RiverProductImport executeImport(String lastIndexUpdate) { RiverProductImport result = new RiverProductImport(); XContentParser parser = null; InputStream is = null; String timestamp = null; try { is = getConnectionInputstream(lastIndexUpdate); parser = JsonXContent.jsonXContent.createParser(is); String action = null; MapLots of stuff happening here, time to go through it. After getting the input stream with the data in it, it is easy to create a JSON parser using this input stream. You might recognize the @action@, @product@ and @id@ variable as part of the @RiverProduct@. Per definition of the JSON we saw above, it is ok to have a product with either a delete @action@ and an @id@ field or to have a product with an @index@ action, an @id@ field and @product@ data in order to create a valid entry in the queue. The next step is to parse every (well, almost every) token the json parser returns. We need to check any token if it is one of @timestamp@, @action@, @id@ or @product@. In any of these cases we need to do something with the next token. If we receiveproduct = null; String id = null; @SuppressWarnings("unused") Token token = null; while ((token = parser.nextToken()) != null) { if ("timestamp".equals(parser.text())) { token = parser.nextToken(); timestamp = parser.text(); } else if ("action".equals(parser.text())) { token = parser.nextToken(); action = parser.text(); } else if ("id".equals(parser.text())) { token = parser.nextToken(); id = parser.text(); } else if ("product".equals(parser.text())) { token = parser.nextToken(); product = parser.map(); } if ("delete".equals(action) && id != null) { queue.add(RiverProduct.delete(id)); action = null; id = null; result.exportedProductCount++; } if ("index".equals(action) && product != null && id != null) { queue.add(RiverProduct.index(id, product)); action = null; product = null; id = null; result.exportedProductCount++; } } result.exportTimestamp = timestamp; } catch (IOException e) { logger.error("Could not get content with lastUpdatedTimestamp [{}]", e, lastIndexUpdate); } finally { Closeables.closeQuietly(is); if (parser != null) { parser.close(); } } return result; } "timestamp" : "2012-09-23 16:45:23"and @parser.text()@ receives timestamp, we are not interested to get this string, but rather the next token in order to store it after importing the whole stream in the river configuration. The parsing of the @action@ and @id@ fields works in the same manner. However the parsing of the product has a specialty. In case the parser detects a @product@ token, the logic in the while loop takes the whole content of the product and creates a map out of it, instead of reading it per token. As you can see, this method includes the logic to put products in the queue, even though the parser has not yet finished processing the whole HTTP request - exactly what you want, because the indexer can now start importing, while the slurper is still getting data. h2. Creating a bulk request with products from the queue The @Indexer@ is part of the @JsonRiver@ class and mainly consists of the @run()@ method, which looks like this:public void run() { while (!closed) { logger.debug("Indexer run() started"); sw = new StopWatch().start(); deletedDocuments = 0; insertedDocuments = 0; try { RiverProduct product = stream.take(); bulk = client.prepareBulk(); do { addProductToBulkRequest(product); } while ((product = stream.poll(250, TimeUnit.MILLISECONDS)) != null && deletedDocuments + insertedDocuments < RIVER_MAX_BULK_SIZE); } catch (InterruptedException e) { continue; } finally { bulk.execute().actionGet(); } logStatistics(); } }The starting point here is the code in the try-catch block, which starts with @stream.take()@. This methods blocks until someone puts a @RiverProduct@ in the @stream@ TransferQueue, which is to be done by the slurper after it successfully read in a @RiverProduct@ from the input stream. After that a bulk request is prepared. The next @while@ loop provides a very nice additional feature by setting useful break-conditions. The loop is stopped and the bulk request is executed under two circumstances: * Either @RIVER_MAX_BULK_SIZE@ is reached (for storing the products after having read 5000 products). This has two advantages. First, the products you read in first become searchable faster. Second, you will not have to keep all products in memory, but only as much as you have defined your bulk size. This ensures you will not run out of memory during a large import. * Or the TransferQueue is not filled with a new product for 250 milliseconds. There are several reasons for this behaviour. The normal one is, that the import executed in the @RiverImport@ class and triggered by the slurper is finished. Unusual ones might be, that the system the river pulls the data from is under such high load, that it does not write out the data as fast. Or there might happen a network outage right while the river is reading data. This ensures that no @RiverProduct@ entities are lurking around in the queue in memory instead of being written to elasticsearch. The @addProductToBulkRequest()@ method simply inserts an @IndexRequest@ or an @DeleteRequest@ to the product, depending on the content of the @RiverProduct@ being processed. The execution of the bulk request is done in the @finally()@ call to ensure it always gets written, even if elasticsearch or the river gets shut down. That's it basically - this is the streaming JSON river. No magic, just some code. :-) h2. Testing There are three tests in the @JsonRiverTest@ class. I have used the "spark micro web framework":http://sparkjava.com/ for setting up a river endpoint. As the spark framework does not feature creating JSON data, the built-in elasticsearch JSON builder was used. The spark framework code in the @setup()@ method checks for the lastUpdate parameter. If it is not set (this means this is the first request from the river), it returns two products to add and one to delete. If the parameter is set, it simply appends "1" to the given parameter and returns it as timestamp without returning any products. This means the river has one shot to get the data imported right. Furthermore the @setup()@ method creates a new product with the id @TODELETE@, which is expected to be deleted, as exactly this product is marked as deleted in the response rendered by the spark server. In order to be sure that the product is in the index, when the river slurper runs for the first time, the products index is flushed after the product has been added. The test tests for three important features of the river: * Creation of entities is working: @testThatIndexingWorks()@ * Deletion of entities is working @testThatDeletionWorks()@ * The lastUpdatedTimestamp is updated in order to prevent full export on every execution: @testThatLastUpdatedTimestampIsWritten()@ Do yourself a favour and write tests with huge data in order to make sure you will not hit any memory boundaries in your live environment. h2. Building, packaging and including in elasticsearch In case you want to play around with the river or include it in your elasticsearch installation (not recommended before changing it, see the last paragraph)git clone https://github.com/spinscale/elasticsearch-river-streaming-json.git mvn package path/to/es/bin/plugin -install json-river -url target/releases/elasticsearch-river-json-0.0.1-SNAPSHOT.zipYou might want to execute @mvn eclipse:eclipse@ as well in case you want to use it with some IDE. h2. Improvements This river is intended to be a minimal example, so you can rip it off and make it suit your needs. I think it is not too useful to create a generic-hyper-can-import-everything-from-everywhere river, as every integration with any systems tends to differ. So here are some suggestions to h3. Reading river configuration from river index Every configuration value is hardcoded in the river, which is useless. You want different configuration per river setup, which means that the configuration has to be supplied, when registering the river like this:curl -v -XPUT 'localhost:9200/_river/my_river/_meta' -d '{ "type" : "json", "configuration" : { "index" : "products", "type" : "product", "max_bulk_size" : 5000, "endpoint" : "http://localhost:4567/data?", "parameter" : "lastUpdate", "refresh" : "30s", "active" : true } }'This setup would allow you to to have an arbitrary amount of json rivers reading into arbitrary indexes. You would just need to create more rivers. Keep in mind, what you would have to reread your configuration always after or before your slurper and indexer thread is running in order to make sure it will change its configuration not only on startup of the river. It is also very useful to have an @active@ boolean in your river configuration, so you can stop it, if you either do not need it, or you are expecting some sort of outage or maintenance in your main ecommerce system. Reading a configuration item from the river works like this:GetResponse response = client.prepareGet().setIndex("_river").setType("my_river").setId("_meta").execute().actionGet(); Maph3. Use aliases Another hint from the operational trenches is to use "aliases":http://www.elasticsearch.org/guide/reference/api/admin-indices-aliases.html when using rivers to index data. Aliases allow you to index into different indexes than those used to search. This is very useful in case you do a long lasting full export with your river. An example setup is like this * @products@ is an alias to the index @products1@, which is full with data and currently used for live searches * You need a full export, which lasts 20 minutes. You cannot stop live search for 20 minutes or delete the index and wait for 20 minutes until all index data is back * You create and index @products2@, reconfigure your river to index into this index, delete the @lastUpdated@ timestamp for this river configuration, and wait until the export is done * You switch the @products@ alias from @products1@ to @products2@ * You can delete the products1 index now * Drawback: You will not get product updates in your live search as long the full export is running (you could do this by setting up another river, but you would generate more load in the system where the data gets pulled from) Thanks for reading and happy streaming. In case of questions you can reach me via "github":http://github.com/spinscale, "twitter":http://twitter.com/spinscale or "mail":mailto:alr@spinscale.deconfiguration = (Map ) response.getSource().get("configuration"); String endpoint = configuration.get("endpoint").toString();
本源码包内暂不包含可直接显示的源代码文件,请下载源码包。