Skip to content

Method: importFromFile(Path)

1: /*
2: * *********************************************************************************************************************
3: *
4: * blueMarine II: Semantic Media Centre
5: * http://tidalwave.it/projects/bluemarine2
6: *
7: * Copyright (C) 2015 - 2021 by Tidalwave s.a.s. (http://tidalwave.it)
8: *
9: * *********************************************************************************************************************
10: *
11: * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
12: * the License. You may obtain a copy of the License at
13: *
14: * http://www.apache.org/licenses/LICENSE-2.0
15: *
16: * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
17: * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
18: * specific language governing permissions and limitations under the License.
19: *
20: * *********************************************************************************************************************
21: *
22: * git clone https://bitbucket.org/tidalwave/bluemarine2-src
23: * git clone https://github.com/tidalwave-it/bluemarine2-src
24: *
25: * *********************************************************************************************************************
26: */
27: package it.tidalwave.bluemarine2.persistence.impl;
28:
29: import javax.annotation.Nonnull;
30: import javax.inject.Inject;
31: import java.util.Optional;
32: import java.util.concurrent.CountDownLatch;
33: import java.io.IOException;
34: import java.io.Reader;
35: import java.io.Writer;
36: import java.nio.file.Files;
37: import java.nio.file.Path;
38: import java.nio.file.Paths;
39: import org.apache.commons.io.FileUtils;
40: import org.eclipse.rdf4j.repository.Repository;
41: import org.eclipse.rdf4j.repository.RepositoryConnection;
42: import org.eclipse.rdf4j.repository.RepositoryException;
43: import org.eclipse.rdf4j.repository.sail.SailRepository;
44: import org.eclipse.rdf4j.rio.RDFFormat;
45: import org.eclipse.rdf4j.rio.RDFHandler;
46: import org.eclipse.rdf4j.rio.RDFHandlerException;
47: import org.eclipse.rdf4j.rio.RDFParseException;
48: import org.eclipse.rdf4j.rio.n3.N3Writer;
49: import org.eclipse.rdf4j.sail.Sail;
50: import org.eclipse.rdf4j.sail.memory.MemoryStore;
51: import org.eclipse.rdf4j.sail.nativerdf.NativeStore;
52: import it.tidalwave.util.TypeSafeMap;
53: import it.tidalwave.util.annotation.VisibleForTesting;
54: import it.tidalwave.messagebus.MessageBus;
55: import it.tidalwave.messagebus.annotation.ListensTo;
56: import it.tidalwave.messagebus.annotation.SimpleMessageSubscriber;
57: import it.tidalwave.bluemarine2.util.SortingRDFHandler;
58: import it.tidalwave.bluemarine2.message.PersistenceInitializedNotification;
59: import it.tidalwave.bluemarine2.message.PowerOffNotification;
60: import it.tidalwave.bluemarine2.message.PowerOnNotification;
61: import it.tidalwave.bluemarine2.persistence.Persistence;
62: import lombok.extern.slf4j.Slf4j;
63: import static java.util.concurrent.TimeUnit.SECONDS;
64: import static java.nio.charset.StandardCharsets.UTF_8;
65: import static it.tidalwave.bluemarine2.persistence.PersistencePropertyNames.*;
66:
67: /***********************************************************************************************************************
68: *
69: * @author Fabrizio Giudici
70: *
71: **********************************************************************************************************************/
72: @SimpleMessageSubscriber @Slf4j
73: public class DefaultPersistence implements Persistence
74: {
75: @Inject
76: private MessageBus messageBus;
77:
78: private final CountDownLatch initialized = new CountDownLatch(1);
79:
80: private Repository repository;
81:
82: @VisibleForTesting Sail sail;
83:
84: /*******************************************************************************************************************
85: *
86: * {@inheritDoc}
87: *
88: ******************************************************************************************************************/
89: @Override @Nonnull
90: public Repository getRepository()
91: {
92: waitForPowerOn();
93: return repository;
94: }
95:
96: /*******************************************************************************************************************
97: *
98: *
99: ******************************************************************************************************************/
100: @VisibleForTesting void onPowerOnNotification (@ListensTo @Nonnull final PowerOnNotification notification)
101: throws RepositoryException, IOException, RDFParseException
102: {
103: log.info("onPowerOnNotification({})", notification);
104: final TypeSafeMap properties = notification.getProperties();
105:
106: final Optional<Path> importFile = properties.getOptional(IMPORT_FILE);
107: final Optional<Path> storageFolder = properties.getOptional(STORAGE_FOLDER);
108:
109: if (storageFolder.isEmpty())
110: {
111: log.warn("No storage path: working in memory");
112: sail = new MemoryStore();
113: }
114: else
115: {
116: log.info("Disk storage at {}", storageFolder);
117:
118: if (importFile.isPresent() && Files.exists(importFile.get()))
119: {
120: log.warn("Scratching store ...");
121: FileUtils.deleteDirectory(storageFolder.get().toFile()); // FIXME: rename to backup folder with timestamp
122: }
123:
124: sail = new NativeStore(storageFolder.get().toFile());
125: }
126:
127: repository = new SailRepository(sail);
128: repository.initialize();
129:
130: if (importFile.isPresent() && Files.exists(importFile.get()))
131: {
132: importFromFile(importFile.get());
133:
134: if (properties.getOptional(RENAME_IMPORT_FILE).orElse(false))
135: {
136: Files.move(importFile.get(), Paths.get(importFile.get().toString() + "~"));
137: }
138: }
139:
140: initialized.countDown();
141: messageBus.publish(new PersistenceInitializedNotification());
142: }
143:
144: /*******************************************************************************************************************
145: *
146: *
147: ******************************************************************************************************************/
148: @VisibleForTesting void onPowerOffNotification (@ListensTo @Nonnull final PowerOffNotification notification)
149: throws RepositoryException, IOException, RDFParseException
150: {
151: log.info("onPowerOffNotification({})", notification);
152:
153: if (repository != null)
154: {
155: repository.shutDown();
156: }
157: }
158:
159: /*******************************************************************************************************************
160: *
161: * Exports the repository to the given file.
162: *
163: * @param path where to export the data to
164: * @throws RDFHandlerException
165: * @throws IOException
166: * @throws RepositoryException
167: *
168: ******************************************************************************************************************/
169: @Override
170: public void exportToFile (@Nonnull final Path path)
171: throws RDFHandlerException, IOException, RepositoryException
172: {
173: log.info("exportToFile({})", path);
174: Files.createDirectories(path.getParent());
175:
176: try (final Writer w = Files.newBufferedWriter(path, UTF_8);
177: final RepositoryConnection connection = repository.getConnection())
178: {
179: final RDFHandler writer = new SortingRDFHandler(new N3Writer(w));
180:
181: // FIXME: use Iterations - and sort
182: // for (final Namespace namespace : connection.getNamespaces().asList())
183: // {
184: // writer.handleNamespace(namespace.getPrefix(), namespace.getName());
185: // }
186:
187: writer.handleNamespace("bio", "http://purl.org/vocab/bio/0.1/");
188: writer.handleNamespace("bmmo", "http://bluemarine.tidalwave.it/2015/04/mo/");
189: writer.handleNamespace("dc", "http://purl.org/dc/elements/1.1/");
190: writer.handleNamespace("foaf", "http://xmlns.com/foaf/0.1/");
191: writer.handleNamespace("owl", "http://www.w3.org/2002/07/owl#");
192: writer.handleNamespace("mo", "http://purl.org/ontology/mo/");
193: writer.handleNamespace("rdfs", "http://www.w3.org/2000/01/rdf-schema#");
194: writer.handleNamespace("rel", "http://purl.org/vocab/relationship/");
195: writer.handleNamespace("vocab", "http://dbtune.org/musicbrainz/resource/vocab/");
196: writer.handleNamespace("xs", "http://www.w3.org/2001/XMLSchema#");
197:
198: connection.export(writer);
199: }
200: }
201:
202: /*******************************************************************************************************************
203: *
204: *
205: ******************************************************************************************************************/
206: @Override
207: public <E extends Exception> void runInTransaction (@Nonnull final TransactionalTask<E> task)
208: throws E, RepositoryException
209: {
210: log.info("runInTransaction({})", task);
211: waitForPowerOn();
212: final long baseTime = System.nanoTime();
213:
214: try (final RepositoryConnection connection = repository.getConnection()) // TODO: pool?
215: {
216: task.run(connection);
217: connection.commit();
218: }
219: catch (Exception e)
220: {
221: log.error("Transaction failed: {}", e.toString());
222: }
223:
224: if (log.isDebugEnabled())
225: {
226: log.debug(">>>> done in {} ms", (System.nanoTime() - baseTime) * 1E-6);
227: }
228: }
229:
230: /*******************************************************************************************************************
231: *
232: * Imports the repository from the given file.
233: *
234: * @param path where to import the data from
235: * @throws RDFHandlerException
236: * @throws IOException
237: * @throws RepositoryException
238: *
239: ******************************************************************************************************************/
240: private void importFromFile (@Nonnull final Path path)
241: throws IOException, RepositoryException, RDFParseException
242: {
243: try (final RepositoryConnection connection = repository.getConnection();
244: final Reader reader = Files.newBufferedReader(path, UTF_8))
245: {
246: log.info("Importing repository from {} ...", path);
247: connection.add(reader, path.toUri().toString(), RDFFormat.N3);
248: connection.commit();
249: }
250: }
251:
252: /*******************************************************************************************************************
253: *
254: *
255: ******************************************************************************************************************/
256: private void waitForPowerOn()
257: {
258: try
259: {
260: if (!initialized.await(10, SECONDS))
261: {
262: throw new IllegalStateException("Did not receive PowerOnNotification");
263: }
264: }
265: catch (InterruptedException ex)
266: {
267: throw new IllegalStateException("Interrupted while waiting for PowerOnNotification");
268: }
269: }
270: }